Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag

2016-11-14 Thread Renu Tewari
Is upping the magic byte to 2 needed?

In your example say
For broker api version at or above 0.10.2 the tombstone bit will be used
for log compaction deletion.
If the producerequest version is less than 0.10.2 and the message is null,
the broker will up convert to set the tombstone bit on
If the producerequest version is at or above 0.10.2 then the tombstone bit
value is unchanged
Either way the new version of broker only  uses the tombstone bit
internally.

thanks
Renu

On Mon, Nov 14, 2016 at 8:31 PM, Becket Qin  wrote:

> If we follow the current way of doing this format change, it would work the
> following way:
>
> 0. Bump up the magic byte to 2 to indicate the tombstone bit is used.
>
> 1. On receiving a ProduceRequest, broker always convert the messages to the
> configured message.format.version.
> 1.1 If the message version does not match the configured
> message.format.version, the broker will either up convert or down convert
> the message. In that case, users pay the performance cost of re-compression
> if needed.
> 1.2 If the message version matches the configured message.format.version,
> the broker will not do the format conversion and user may save the
> re-compression cost if the message.format.version is on or above 0.10.0.
>
> 2. On receiving a FetchRequest, the broker check the FetchRequest version
> to see if the consumer supports the configured message.format.version or
> not. If the consumer does not support it, down conversion is required and
> zero copy is lost. Otherwise zero copy is used to return the FetchResponse.
>
> Notice that setting message.format.version to 0.10.2 is equivalent to
> always up convert if needed, but that also means to always down convert if
> there is an old consumer.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Mon, Nov 14, 2016 at 1:43 PM, Michael Pearce 
> wrote:
>
> > I like the idea of up converting and then just having the logic to look
> > for tombstones. It makes that quite clean in nature.
> >
> > It's quite late here in the UK, so I fully understand / confirm I
> > understand what you propose could you write it on the kip wiki or fully
> > describe exactly how you see it working, so uk morning I could read
> through?
> >
> > Thanks all for the input on this it is appreciated.
> >
> >
> > Sent using OWA for iPhone
> > 
> > From: Mayuresh Gharat 
> > Sent: Monday, November 14, 2016 9:28:16 PM
> > To: dev@kafka.apache.org
> > Subject: Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag
> >
> > Hi Michael,
> >
> > Just another thing that came up during my discussion with Renu and I
> wanted
> > to share it.
> >
> > Other thing we can do to handle a mixture of old and new clients is when
> > once the new broker with this KIP is deployed, the new code should check
> > the request version from older producer we can up convert it with a
> > tombstone marker when appending the message to the log. This is similar
> to
> > down converting messages for older clients.
> >
> > If this is possible then the broker in this case has to rely only on the
> > tombstone marker for log compaction. Using this approach we preserve the
> > description of when to update the magic byte as described here :
> > https://kafka.apache.org/documentation#messageformat (1 byte "magic"
> > identifier to allow format changes).
> >
> > In stage 2, if we don't want open source kafka to make the decision of
> > deprecation of null value for log compaction (which is the approach I
> would
> > prefer as an end state) since there are some concerns on this, individual
> > companies/orgs can then have a horizontal initiative at their convenience
> > to move to stage 2 by asking all app users to upgrade to new kafka
> clients.
> >
> > Thanks,
> >
> > Mayuresh
> >
> > On Mon, Nov 14, 2016 at 11:50 AM, Becket Qin 
> wrote:
> >
> > > Michael,
> > >
> > > Yes, I am OK with stage 1. We can discuss about stage 2 later but this
> > > sounds really an organization specific decision to deprecate an API. It
> > > does not seem a general need in open source Kafka to only support
> > tombstone
> > > bit , which is a bad thing for people who are still running old clients
> > in
> > > the community. This is exactly why we want to have magic byte bump -
> > there
> > > should be only one definitive way to interpret a message with a given
> > magic
> > > byte. We should avoid re-define the interpretation of a message with an
> > > existing magic byte. The interpretation of an old message format should
> > not
> > > be changed and should always be supported until deprecated.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Mon, Nov 14, 2016 at 11:28 AM, Mayuresh Gharat <
> > > gharatmayures...@gmail.com> wrote:
> > >
> > > > I am not sure about "If I understand correctly, you want to let the
> > > broker
> > > > to reject requests
> > > > from old 

[GitHub] kafka pull request #2132: add a space to separate two words

2016-11-14 Thread ZhengQian1
GitHub user ZhengQian1 opened a pull request:

https://github.com/apache/kafka/pull/2132

add a space to separate two words

I think we should add a space here, otherwise the two words will join 
together.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ZTE-PaaS/kafka trunk

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2132.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2132


commit 8eb612a38c880d1a72be3b5f26efd569932e6399
Author: 郑谦00117553 <00117553@zte.intra>
Date:   2016-11-15T05:58:19Z

add a space to separate two words




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag

2016-11-14 Thread Becket Qin
If we follow the current way of doing this format change, it would work the
following way:

0. Bump up the magic byte to 2 to indicate the tombstone bit is used.

1. On receiving a ProduceRequest, broker always convert the messages to the
configured message.format.version.
1.1 If the message version does not match the configured
message.format.version, the broker will either up convert or down convert
the message. In that case, users pay the performance cost of re-compression
if needed.
1.2 If the message version matches the configured message.format.version,
the broker will not do the format conversion and user may save the
re-compression cost if the message.format.version is on or above 0.10.0.

2. On receiving a FetchRequest, the broker check the FetchRequest version
to see if the consumer supports the configured message.format.version or
not. If the consumer does not support it, down conversion is required and
zero copy is lost. Otherwise zero copy is used to return the FetchResponse.

Notice that setting message.format.version to 0.10.2 is equivalent to
always up convert if needed, but that also means to always down convert if
there is an old consumer.

Thanks,

Jiangjie (Becket) Qin



On Mon, Nov 14, 2016 at 1:43 PM, Michael Pearce 
wrote:

> I like the idea of up converting and then just having the logic to look
> for tombstones. It makes that quite clean in nature.
>
> It's quite late here in the UK, so I fully understand / confirm I
> understand what you propose could you write it on the kip wiki or fully
> describe exactly how you see it working, so uk morning I could read through?
>
> Thanks all for the input on this it is appreciated.
>
>
> Sent using OWA for iPhone
> 
> From: Mayuresh Gharat 
> Sent: Monday, November 14, 2016 9:28:16 PM
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag
>
> Hi Michael,
>
> Just another thing that came up during my discussion with Renu and I wanted
> to share it.
>
> Other thing we can do to handle a mixture of old and new clients is when
> once the new broker with this KIP is deployed, the new code should check
> the request version from older producer we can up convert it with a
> tombstone marker when appending the message to the log. This is similar to
> down converting messages for older clients.
>
> If this is possible then the broker in this case has to rely only on the
> tombstone marker for log compaction. Using this approach we preserve the
> description of when to update the magic byte as described here :
> https://kafka.apache.org/documentation#messageformat (1 byte "magic"
> identifier to allow format changes).
>
> In stage 2, if we don't want open source kafka to make the decision of
> deprecation of null value for log compaction (which is the approach I would
> prefer as an end state) since there are some concerns on this, individual
> companies/orgs can then have a horizontal initiative at their convenience
> to move to stage 2 by asking all app users to upgrade to new kafka clients.
>
> Thanks,
>
> Mayuresh
>
> On Mon, Nov 14, 2016 at 11:50 AM, Becket Qin  wrote:
>
> > Michael,
> >
> > Yes, I am OK with stage 1. We can discuss about stage 2 later but this
> > sounds really an organization specific decision to deprecate an API. It
> > does not seem a general need in open source Kafka to only support
> tombstone
> > bit , which is a bad thing for people who are still running old clients
> in
> > the community. This is exactly why we want to have magic byte bump -
> there
> > should be only one definitive way to interpret a message with a given
> magic
> > byte. We should avoid re-define the interpretation of a message with an
> > existing magic byte. The interpretation of an old message format should
> not
> > be changed and should always be supported until deprecated.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Mon, Nov 14, 2016 at 11:28 AM, Mayuresh Gharat <
> > gharatmayures...@gmail.com> wrote:
> >
> > > I am not sure about "If I understand correctly, you want to let the
> > broker
> > > to reject requests
> > > from old clients to ensure everyone in an organization has upgraded,
> > > right?"
> > >
> > > I don't think we will be rejecting requests. What phase2 (stage 2)
> meant
> > > was we will only do log compaction based on tombstone marker and
> nothing
> > > else.
> > > I am not sure about making this configurable, as a configuration makes
> it
> > > sound like the broker has the capability to support null and tombstone
> > > marker bit, for log compaction, for life long.
> > >
> > > I agree with Michael's idea of delivering phase 1 (stage1) right now.
> It
> > > will also be good to put a note saying that we will be deprecating the
> > > older way of log compaction in next release.
> > > Phase 2 (stage 2) will actually only rely on tombstone marker for doing
> > log
> > 

Jenkins build is back to normal : kafka-trunk-jdk8 #1041

2016-11-14 Thread Apache Jenkins Server
See 



[jira] [Comment Edited] (KAFKA-4404) Add knowledge of sign to numeric schema types

2016-11-14 Thread Andy Bryant (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15665817#comment-15665817
 ] 

Andy Bryant edited comment on KAFKA-4404 at 11/15/16 2:40 AM:
--

Hi Ewen

I'd actually be fine with Kafka Connect only supporting signed numeric values, 
however this should be made obvious in the API. I couldn't see any reference to 
what Schema.INT32 referred to in the docs or code.

I ran into an issue with the Confluent kafka-connect-jdbc connector where an 
unsigned integer caused failures. See 
https://github.com/confluentinc/kafka-connect-jdbc/issues/165. If the Schema 
type was specific about it being signed, then connect developers should be made 
aware they need to think about it.

In this case, they could check the metadata and read the unsigned int with 
resultSet.getLong. 
There's actually more than one of these issues with the kafka-connect-jdbc 
connector. For instance the SQL TINYINT type is 0..255 which will cause 
resultSet.getByte to fail half the time.




was (Author: kiwiandy):
Hi Ewen

I'd actually be fine with Kafka Connect only supporting signed numeric values, 
however this should be made obvious in the API. I couldn't see any reference to 
what Schema.INT32 referred to in the docs or code.

I ran into an issue with the Confluent kafka-connect-jdbc connector where an 
unsigned integer caused failures. See 
https://github.com/confluentinc/kafka-connect-jdbc/issues/165. If the Schema 
type was specific about it being signed, then connect developers should be made 
aware they need to think about it.

In this case, they could check the metadata and ready the unsigned int with 
resultSet.getLong. 
There's actually more than one of these issues with the kafka-connect-jdbc 
connector. For instance the SQL TINYINT type is 0..255 which will cause 
resultSet.getByte to fail half the time.



> Add knowledge of sign to numeric schema types
> -
>
> Key: KAFKA-4404
> URL: https://issues.apache.org/jira/browse/KAFKA-4404
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.10.0.1
>Reporter: Andy Bryant
>Assignee: Ewen Cheslack-Postava
>Priority: Minor
>
> For KafkaConnect schemas there is currently no concept of whether a numeric 
> field is signed or unsigned. 
> Add an additional `signed` attribute (like optional) or make it explicit that 
> numeric types must be signed.
> You could encode this as a parameter on the schema but this would not be 
> standard across all connectors.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4404) Add knowledge of sign to numeric schema types

2016-11-14 Thread Andy Bryant (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15665817#comment-15665817
 ] 

Andy Bryant commented on KAFKA-4404:


Hi Ewen

I'd actually be fine with Kafka Connect only supporting signed numeric values, 
however this should be made obvious in the API. I couldn't see any reference to 
what Schema.INT32 referred to in the docs or code.

I ran into an issue with the Confluent kafka-connect-jdbc connector where an 
unsigned integer caused failures. See 
https://github.com/confluentinc/kafka-connect-jdbc/issues/165. If the Schema 
type was specific about it being signed, then connect developers should be made 
aware they need to think about it.

In this case, they could check the metadata and ready the unsigned int with 
resultSet.getLong. 
There's actually more than one of these issues with the kafka-connect-jdbc 
connector. For instance the SQL TINYINT type is 0..255 which will cause 
resultSet.getByte to fail half the time.



> Add knowledge of sign to numeric schema types
> -
>
> Key: KAFKA-4404
> URL: https://issues.apache.org/jira/browse/KAFKA-4404
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.10.0.1
>Reporter: Andy Bryant
>Assignee: Ewen Cheslack-Postava
>Priority: Minor
>
> For KafkaConnect schemas there is currently no concept of whether a numeric 
> field is signed or unsigned. 
> Add an additional `signed` attribute (like optional) or make it explicit that 
> numeric types must be signed.
> You could encode this as a parameter on the schema but this would not be 
> standard across all connectors.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Kafka 0.10 Monitoring tool

2016-11-14 Thread Otis Gospodnetić
Hi,

Why are these tools not working perfectly for you?
Does it *have to* be open-source?  If not, Sematext SPM collects a lot of
Kafka metrics, with consumer lag being one of them --
https://sematext.com/blog/2016/06/07/kafka-consumer-lag-offsets-monitoring/

Otis
--
Monitoring - Log Management - Alerting - Anomaly Detection
Solr & Elasticsearch Consulting Support Training - http://sematext.com/


On Mon, Nov 14, 2016 at 5:16 PM, Ghosh, Achintya (Contractor) <
achintya_gh...@comcast.com> wrote:

> Hi there,
> What is the best open source tool for Kafka monitoring mainly to check the
> offset lag. We tried the following tools:
>
>
> 1.   Burrow
>
> 2.   KafkaOffsetMonitor
>
> 3.   Prometheus and Grafana
>
> 4.   Kafka Manager
>
> But nothing is working perfectly. Please help us on this.
>
> Thanks
> Achintya
>
>


[GitHub] kafka pull request #2131: Remove failing ConnectDistributedTest.test_bad_con...

2016-11-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2131


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #2131: Remove failing ConnectDistributedTest.test_bad_con...

2016-11-14 Thread shikhar
GitHub user shikhar opened a pull request:

https://github.com/apache/kafka/pull/2131

Remove failing ConnectDistributedTest.test_bad_connector_class

Since #1911 was merged it is hard to externally test a connector 
transitioning to FAILED state due to an initialization failure, which is what 
this test was attempting to verify.

The unit test added in #1778 already exercises exception-handling around 
Connector instantiation.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shikhar/kafka test_bad_connector_class

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2131.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2131


commit 32a2b8e7a09f5b002e5e058d70bdcec90cb12944
Author: Shikhar Bhushan 
Date:   2016-11-15T00:50:31Z

Remove failing ConnectDistributedTest.test_bad_connector_class

Since #1911 was merged it is hard to externally test a connector 
transitioning to FAILED state due to an initialization failure, which is what 
this test was attempting to verify.

The unit test added in #1778 already exercises exception-handling around 
Connector instantiation.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4409) ZK consumer shutdown/topic event deadlock

2016-11-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15665564#comment-15665564
 ] 

ASF GitHub Bot commented on KAFKA-4409:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2129


> ZK consumer shutdown/topic event deadlock
> -
>
> Key: KAFKA-4409
> URL: https://issues.apache.org/jira/browse/KAFKA-4409
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, core
>Affects Versions: 0.10.1.0
>Reporter: Joel Koshy
>Assignee: Joel Koshy
> Fix For: 0.10.1.1
>
>
> This only applies to the old zookeeper consumer. It is trivial enough to fix.
> The consumer can deadlock on shutdown if a topic event fires during shutdown. 
> The shutdown acquires the rebalance lock and then the topic-event-watcher 
> lock. The topic event watcher acquires these in the reverse order. Shutdown 
> should not need to acquire the topic-event-watcher’s lock - all it does is 
> unsubscribes from topic events.
> Stack trace:
> {noformat}
> "mirrormaker-thread-0":
> at 
> kafka.consumer.ZookeeperTopicEventWatcher.shutdown(ZookeeperTopicEventWatcher.scala:50)
> - waiting to lock <0x00072a65d508> (a java.lang.Object)
> at 
> kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:216)
> - locked <0x0007103c69c0> (a java.lang.Object)
> at 
> kafka.tools.MirrorMaker$MirrorMakerOldConsumer.cleanup(MirrorMaker.scala:519)
> at 
> kafka.tools.MirrorMaker$MirrorMakerThread$$anonfun$run$3.apply$mcV$sp(MirrorMaker.scala:441)
> at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:76)
> at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
> at kafka.utils.CoreUtils$.swallowWarn(CoreUtils.scala:47)
> at kafka.utils.Logging$class.swallow(Logging.scala:94)
> at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:47)
> at 
> kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:441)
> "ZkClient-EventThread-58-":
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:639)
> - waiting to lock <0x0007103c69c0> (a java.lang.Object)
> at 
> kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:982)
> at 
> kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.handleTopicEvent(ZookeeperConsumerConnector.scala:1048)
> at 
> kafka.consumer.ZookeeperTopicEventWatcher$ZkTopicEventListener.liftedTree1$1(ZookeeperTopicEventWatcher.scala:69)
> at 
> kafka.consumer.ZookeeperTopicEventWatcher$ZkTopicEventListener.handleChildChange(ZookeeperTopicEventWatcher.scala:65)
> - locked <0x00072a65d508> (a java.lang.Object)
> at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> Found one Java-level deadlock:
> =
> "mirrormaker-thread-0":
>   waiting to lock monitor 0x7f1f38029748 (object 0x00072a65d508, a 
> java.lang.Object),
>   which is held by "ZkClient-EventThread-58-"
> "ZkClient-EventThread-58-":
>   waiting to lock monitor 0x7f1e900249a8 (object 0x0007103c69c0, a 
> java.lang.Object),
>   which is held by "mirrormaker-thread-0"
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2066) Replace FetchRequest / FetchResponse with their org.apache.kafka.common.requests equivalents

2016-11-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15665565#comment-15665565
 ] 

ASF GitHub Bot commented on KAFKA-2066:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2069


> Replace FetchRequest / FetchResponse with their 
> org.apache.kafka.common.requests equivalents
> 
>
> Key: KAFKA-2066
> URL: https://issues.apache.org/jira/browse/KAFKA-2066
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Gwen Shapira
>Assignee: Jason Gustafson
> Fix For: 0.10.2.0
>
>
> Replace FetchRequest / FetchResponse with their 
> org.apache.kafka.common.requests equivalents.
> Note that they can't be completely removed until we deprecate the 
> SimpleConsumer API (and it will require very careful patchwork for the places 
> where core modules actually use the SimpleConsumer API).
> This also requires a solution on how to stream from memory-mapped files 
> (similar to what existing code does with FileMessageSet. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-2066) Replace FetchRequest / FetchResponse with their org.apache.kafka.common.requests equivalents

2016-11-14 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-2066.

   Resolution: Fixed
Fix Version/s: 0.10.2.0

Issue resolved by pull request 2069
[https://github.com/apache/kafka/pull/2069]

> Replace FetchRequest / FetchResponse with their 
> org.apache.kafka.common.requests equivalents
> 
>
> Key: KAFKA-2066
> URL: https://issues.apache.org/jira/browse/KAFKA-2066
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Gwen Shapira
>Assignee: Jason Gustafson
> Fix For: 0.10.2.0
>
>
> Replace FetchRequest / FetchResponse with their 
> org.apache.kafka.common.requests equivalents.
> Note that they can't be completely removed until we deprecate the 
> SimpleConsumer API (and it will require very careful patchwork for the places 
> where core modules actually use the SimpleConsumer API).
> This also requires a solution on how to stream from memory-mapped files 
> (similar to what existing code does with FileMessageSet. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2069: KAFKA-2066: Use client-side FetchRequest/FetchResp...

2016-11-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2069


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #2129: KAFKA-4409; Fix deadlock between topic event handl...

2016-11-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2129


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [VOTE] KIP-84: Support SASL SCRAM mechanisms

2016-11-14 Thread Jun Rao
Hi, Rajini,

Thanks for the proposal. +1. A few minor comments.

30. Could you add that the broker config sasl.enabled.mechanisms can now
take more values?

31. Could you document the meaning of s,t,k,i used in /config/users/alice
in ZK?

32. In the rejected section, could you document why we decided not to bump
up the version of SaslHandshakeRequest?

Jun


On Mon, Nov 14, 2016 at 5:57 AM, Rajini Sivaram <
rajinisiva...@googlemail.com> wrote:

> Hi all,
>
> I would like to initiate the voting process for *KIP-84: Support SASL/SCRAM
> mechanisms*:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 84%3A+Support+SASL+SCRAM+mechanisms
>
> This KIP adds support for four SCRAM mechanisms (SHA-224, SHA-256, SHA-384
> and SHA-512) for SASL authentication, giving more choice for users to
> configure security. When delegation token support is added to Kafka, SCRAM
> will also support secure authentication using delegation tokens.
>
> Thank you...
>
> Regards,
>
> Rajini
>


[jira] [Updated] (KAFKA-3462) Allow SinkTasks to disable consumer offset commit

2016-11-14 Thread Shikhar Bhushan (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shikhar Bhushan updated KAFKA-3462:
---
Resolution: Fixed
Status: Resolved  (was: Patch Available)

This will be handled with KIP-89 / KAFKA-4161. Tasks that wish to disable 
framework-managed offset commits can return an empty map from 
{{SinkTask.preCommit()}} to make it a no-op.

> Allow SinkTasks to disable consumer offset commit 
> --
>
> Key: KAFKA-3462
> URL: https://issues.apache.org/jira/browse/KAFKA-3462
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.10.1.0
>Reporter: Liquan Pei
>Assignee: Liquan Pei
>Priority: Minor
> Fix For: 0.10.2.0
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
>  SinkTasks should be able to disable consumer offset commit if they manage 
> offsets in the sink data store rather than using Kafka consumer offsets.  For 
> example, an HDFS connector might record offsets in HDFS to provide exactly 
> once delivery. When the SinkTask is started or a rebalance occurs, the task 
> would reload offsets from HDFS. In this case, disabling consumer offset 
> commit will save some CPU cycles and network IOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4401) Change the KafkaServerTestHarness and IntegrationTestHarness from trait to abstract class.

2016-11-14 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15665238#comment-15665238
 ] 

Ismael Juma commented on KAFKA-4401:


Kafka Streams also has helpers for tests that are nicer to use from Java 
(EmbeddedKafkaCluster, KafkaEmbedded). If we want to encourage people to use 
our test harness classes, we should think about compatibility guarantees. And 
that implies exposing a simple and supportable interface instead 
(IntegrationTestHarness and KafkaServerTestHarness expose too much IMO). Having 
said that, I think it's a good idea, if done right.

> Change the KafkaServerTestHarness and IntegrationTestHarness from trait to 
> abstract class.
> --
>
> Key: KAFKA-4401
> URL: https://issues.apache.org/jira/browse/KAFKA-4401
> Project: Kafka
>  Issue Type: Task
>  Components: unit tests
>Affects Versions: 0.10.1.0
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Fix For: 0.10.1.1
>
>
> The IntegartionTestHarness and KafkaServerTestHarness are useful not only in 
> Kafka unit test, but also useful for the unit tests in other products that 
> depend on Kafka.
> Currently there are two issues making those two test harness classes hard to 
> use by other Java users.
> 1. The two classes are Scala traits. This makes it difficult for people to 
> write Java unit test code. 
> 2. Some of the interfaces are Scala only. 
> It will be good to expose those two classes for more general usage and make 
> them Java friendly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Kafka 0.10 Monitoring tool

2016-11-14 Thread Ghosh, Achintya (Contractor)
Hi there,
What is the best open source tool for Kafka monitoring mainly to check the 
offset lag. We tried the following tools:


1.   Burrow

2.   KafkaOffsetMonitor

3.   Prometheus and Grafana

4.   Kafka Manager

But nothing is working perfectly. Please help us on this.

Thanks
Achintya



[GitHub] kafka pull request #2130: MINOR: Extract SCALA_BINARY_VERSION from SCALA_VER...

2016-11-14 Thread kkonstantine
GitHub user kkonstantine opened a pull request:

https://github.com/apache/kafka/pull/2130

MINOR: Extract SCALA_BINARY_VERSION from SCALA_VERSION

Will allow users to set one fewer environment variable if they need to
change scala version. Still, SCALA_BINARY_VERSION can be explicitly set.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kkonstantine/kafka 
MINOR-Extract-SCALA_BINARY_VERSION-from-SCALA_VERSION

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2130.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2130


commit a663178090ba3750dea315d1792d6fce9a97c016
Author: Konstantine Karantasis 
Date:   2016-11-14T22:03:24Z

MINOR: Extract SCALA_BINARY_VERSION from SCALA_VERSION

Will allow users to set one fewer environment variable if they need to
change scala version. Still, SCALA_BINARY_VERSION can be explicitly set.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4322) StateRestoreCallback begin and end indication

2016-11-14 Thread Mark Shelton (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15665194#comment-15665194
 ] 

Mark Shelton commented on KAFKA-4322:
-

>> Is there any particular reasons that you want to commit offsets after 
>> restoration?

This is the purpose of diagnosis, logging and recovery statistics only. This is 
to make it easier to tell me that N keys were restored since a begin and end 
callback allow me to log details and report progress and metrics regarding 
restoration. I do not want to commit offsets and the data that is made 
available is only for metrics.

>>  I'm wondering if you have other common requests that would benefit from the 
>> additional callbacks?

Not currently no.

>> If you feel that this is still a common feature that we should add to Kafka

This is such a minor change with zero impact I don't see that one needs a KIP 
for that.


> StateRestoreCallback begin and end indication
> -
>
> Key: KAFKA-4322
> URL: https://issues.apache.org/jira/browse/KAFKA-4322
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Mark Shelton
>Assignee: Mark Shelton
>Priority: Minor
>
> In Kafka Streams, the StateRestoreCallback interface provides only a single 
> method "restore(byte[] key, byte[] value)" that is called for every key-value 
> pair to be restored. 
> It would be nice to have "beginRestore" and "endRestore" methods as part of 
> StateRestoreCallback.
> Kafka Streams would call "beginRestore" before restoring any keys, and would 
> call "endRestore" when it determines that it is done. This allows an 
> implementation, for example, to report on the number of keys restored and 
> perform a commit after the last key was restored. Other uses are conceivable.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-4011) allow sizing RequestQueue in bytes

2016-11-14 Thread radai rosenblatt (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

radai rosenblatt resolved KAFKA-4011.
-
   Resolution: Won't Fix
Fix Version/s: (was: 0.10.2.0)

this was the original KIP-72 implementation. its no longer relevant.

> allow sizing RequestQueue in bytes
> --
>
> Key: KAFKA-4011
> URL: https://issues.apache.org/jira/browse/KAFKA-4011
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.10.0.0
>Reporter: radai rosenblatt
>
> currently RequestChannel's requestQueue is sized in number of requests:
> {code:title=RequestChannel.scala|borderStyle=solid}
> private val requestQueue = new 
> ArrayBlockingQueue[RequestChannel.Request](queueSize)
> {code}
> under the assumption that the end goal is a bound on server memory 
> consumption, this requires the admin to know the avg request size.
> I would like to propose sizing the requestQueue not by number of requests, 
> but by their accumulated size (Request.buffer.capacity). this would probably 
> make configuring and sizing an instance easier.
> there would need to be a new configuration setting for this 
> (queued.max.bytes?) - which could be either in addition to or instead of the 
> current queued.max.requests setting



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4011) allow sizing RequestQueue in bytes

2016-11-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15665144#comment-15665144
 ] 

ASF GitHub Bot commented on KAFKA-4011:
---

Github user radai-rosenblatt closed the pull request at:

https://github.com/apache/kafka/pull/1714


> allow sizing RequestQueue in bytes
> --
>
> Key: KAFKA-4011
> URL: https://issues.apache.org/jira/browse/KAFKA-4011
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.10.0.0
>Reporter: radai rosenblatt
> Fix For: 0.10.2.0
>
>
> currently RequestChannel's requestQueue is sized in number of requests:
> {code:title=RequestChannel.scala|borderStyle=solid}
> private val requestQueue = new 
> ArrayBlockingQueue[RequestChannel.Request](queueSize)
> {code}
> under the assumption that the end goal is a bound on server memory 
> consumption, this requires the admin to know the avg request size.
> I would like to propose sizing the requestQueue not by number of requests, 
> but by their accumulated size (Request.buffer.capacity). this would probably 
> make configuring and sizing an instance easier.
> there would need to be a new configuration setting for this 
> (queued.max.bytes?) - which could be either in addition to or instead of the 
> current queued.max.requests setting



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4322) StateRestoreCallback begin and end indication

2016-11-14 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4322:
---
Affects Version/s: (was: 0.10.0.1)
   0.10.1.0

> StateRestoreCallback begin and end indication
> -
>
> Key: KAFKA-4322
> URL: https://issues.apache.org/jira/browse/KAFKA-4322
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Mark Shelton
>Assignee: Mark Shelton
>Priority: Minor
>
> In Kafka Streams, the StateRestoreCallback interface provides only a single 
> method "restore(byte[] key, byte[] value)" that is called for every key-value 
> pair to be restored. 
> It would be nice to have "beginRestore" and "endRestore" methods as part of 
> StateRestoreCallback.
> Kafka Streams would call "beginRestore" before restoring any keys, and would 
> call "endRestore" when it determines that it is done. This allows an 
> implementation, for example, to report on the number of keys restored and 
> perform a commit after the last key was restored. Other uses are conceivable.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4322) StateRestoreCallback begin and end indication

2016-11-14 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4322:
---
Status: Patch Available  (was: Open)

> StateRestoreCallback begin and end indication
> -
>
> Key: KAFKA-4322
> URL: https://issues.apache.org/jira/browse/KAFKA-4322
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Mark Shelton
>Assignee: Mark Shelton
>Priority: Minor
>
> In Kafka Streams, the StateRestoreCallback interface provides only a single 
> method "restore(byte[] key, byte[] value)" that is called for every key-value 
> pair to be restored. 
> It would be nice to have "beginRestore" and "endRestore" methods as part of 
> StateRestoreCallback.
> Kafka Streams would call "beginRestore" before restoring any keys, and would 
> call "endRestore" when it determines that it is done. This allows an 
> implementation, for example, to report on the number of keys restored and 
> perform a commit after the last key was restored. Other uses are conceivable.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1714: KAFKA-4011 - fix issues and beef up tests around B...

2016-11-14 Thread radai-rosenblatt
Github user radai-rosenblatt closed the pull request at:

https://github.com/apache/kafka/pull/1714


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag

2016-11-14 Thread Mayuresh Gharat
Hi Michael,

Just another thing that came up during my discussion with Renu and I wanted
to share it.

Other thing we can do to handle a mixture of old and new clients is when
once the new broker with this KIP is deployed, the new code should check
the request version from older producer we can up convert it with a
tombstone marker when appending the message to the log. This is similar to
down converting messages for older clients.

If this is possible then the broker in this case has to rely only on the
tombstone marker for log compaction. Using this approach we preserve the
description of when to update the magic byte as described here :
https://kafka.apache.org/documentation#messageformat (1 byte "magic"
identifier to allow format changes).

In stage 2, if we don't want open source kafka to make the decision of
deprecation of null value for log compaction (which is the approach I would
prefer as an end state) since there are some concerns on this, individual
companies/orgs can then have a horizontal initiative at their convenience
to move to stage 2 by asking all app users to upgrade to new kafka clients.

Thanks,

Mayuresh

On Mon, Nov 14, 2016 at 11:50 AM, Becket Qin  wrote:

> Michael,
>
> Yes, I am OK with stage 1. We can discuss about stage 2 later but this
> sounds really an organization specific decision to deprecate an API. It
> does not seem a general need in open source Kafka to only support tombstone
> bit , which is a bad thing for people who are still running old clients in
> the community. This is exactly why we want to have magic byte bump - there
> should be only one definitive way to interpret a message with a given magic
> byte. We should avoid re-define the interpretation of a message with an
> existing magic byte. The interpretation of an old message format should not
> be changed and should always be supported until deprecated.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Mon, Nov 14, 2016 at 11:28 AM, Mayuresh Gharat <
> gharatmayures...@gmail.com> wrote:
>
> > I am not sure about "If I understand correctly, you want to let the
> broker
> > to reject requests
> > from old clients to ensure everyone in an organization has upgraded,
> > right?"
> >
> > I don't think we will be rejecting requests. What phase2 (stage 2) meant
> > was we will only do log compaction based on tombstone marker and nothing
> > else.
> > I am not sure about making this configurable, as a configuration makes it
> > sound like the broker has the capability to support null and tombstone
> > marker bit, for log compaction, for life long.
> >
> > I agree with Michael's idea of delivering phase 1 (stage1) right now. It
> > will also be good to put a note saying that we will be deprecating the
> > older way of log compaction in next release.
> > Phase 2 (stage 2) will actually only rely on tombstone marker for doing
> log
> > compaction.
> > So this actually meets our end state of having tombstone marker for log
> > compaction support.
> >
> > Thanks,
> >
> > Mayuresh
> >
> >
> > On Mon, Nov 14, 2016 at 11:09 AM, Michael Pearce 
> > wrote:
> >
> > > I'm ok with this, but I'd like to at least get phase 1 in, for the next
> > > release, this is what I'm very keen for,
> > >
> > > As such shall we say we have
> > > Kip-87a that delivers phase 1
> > >
> > > And then
> > > Kip-87b in release after that delivers phase 2 but has a dependency on
> > > support of deprecating old api versions
> > >
> > > How does this approach sound?
> > >
> > >
> > > 
> > > From: Becket Qin 
> > > Sent: Monday, November 14, 2016 6:14:44 PM
> > > To: dev@kafka.apache.org
> > > Subject: Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag
> > >
> > > Hey Michael and Mayuresh,
> > >
> > > If I understand correctly, you want to let the broker to reject
> requests
> > > from old clients to ensure everyone in an organization has upgraded,
> > right?
> > > This is essentially deprecating an old protocol. I agree it is useful
> and
> > > that is why we have that baked in KIP-35. However, we haven't thought
> > > through how to deprecate an API so far. From broker's point of view.,
> it
> > > will always support all the old protocols according to Kafka's widely
> > known
> > > compatibility guarantee. If people decide to deprecate an API within an
> > > organization, we can provide some additional configuration to let
> people
> > do
> > > this, which is not a bad idea but seems better to be discussed in
> another
> > > KIP.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Mon, Nov 14, 2016 at 8:52 AM, Michael Pearce  >
> > > wrote:
> > >
> > > > I agree with Mayuresh.
> > > >
> > > > I don't see how having a magic byte helps here.
> > > >
> > > > What we are saying is that on day 1 after an upgrade both tombstone
> > flag
> > > > or a null value will be treated as a marker to delete on compacted
> > 

[jira] [Commented] (KAFKA-4401) Change the KafkaServerTestHarness and IntegrationTestHarness from trait to abstract class.

2016-11-14 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15665054#comment-15665054
 ] 

Jiangjie Qin commented on KAFKA-4401:
-

[~ewencp] Interestingly, we have done the same thing as well for a few of our 
Java projects :)

I completely agree that it is important to reduce the time to run unit test and 
we should avoid doing so in our own tests whenever possible. That said, I found 
it is handy for many external projects to have the capability to run some 
simple functional test with in memory Kafka setup. Otherwise they may have to 
write more costly and complicated system test. So it seems useful for Kafka to 
provide the test harness tool to facilitate the external project testing. But 
the external projects should use their own judgement to decide when to use the 
test harness. For example, just like you mentioned, there are some tests that 
can be run relatively quickly with the test harness in the Confluent schema 
registry while others may more suitable for the system test.

> Change the KafkaServerTestHarness and IntegrationTestHarness from trait to 
> abstract class.
> --
>
> Key: KAFKA-4401
> URL: https://issues.apache.org/jira/browse/KAFKA-4401
> Project: Kafka
>  Issue Type: Task
>  Components: unit tests
>Affects Versions: 0.10.1.0
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Fix For: 0.10.1.1
>
>
> The IntegartionTestHarness and KafkaServerTestHarness are useful not only in 
> Kafka unit test, but also useful for the unit tests in other products that 
> depend on Kafka.
> Currently there are two issues making those two test harness classes hard to 
> use by other Java users.
> 1. The two classes are Scala traits. This makes it difficult for people to 
> write Java unit test code. 
> 2. Some of the interfaces are Scala only. 
> It will be good to expose those two classes for more general usage and make 
> them Java friendly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-82 - Add Record Headers

2016-11-14 Thread Ignacio Solis
1) Yes - Headers are worthwhile
2) Yes - Headers should be a top level option

On Mon, Nov 14, 2016 at 9:16 AM, Michael Pearce 
wrote:

> Hi Roger,
>
> The kip details/examples the original proposal for key spacing , not the
> new mentioned as per discussion namespace idea.
>
> We will need to update the kip, when we get agreement this is a better
> approach (which seems to be the case if I have understood the general
> feeling in the conversation)
>
> Re the variable ints, at very early stage we did think about this. I think
> the added complexity for the saving isn't worth it. I'd rather go with, if
> we want to reduce overheads and size int16 (2bytes) keys as it keeps it
> simple.
>
> On the note of no headers, there is as per the kip as we use an attribute
> bit to denote if headers are present or not as such provides a zero
> overhead currently if headers are not used.
>
> I think as radai mentions would be good first if we can get clarity if do
> we now have general consensus that (1) headers are worthwhile and useful,
> and (2) we want it as a top level entity.
>
>
> Just to state the obvious i believe (1) headers are worthwhile and (2)
> agree as a top level entity.
>
> Cheers
> Mike
> 
> From: Roger Hoover 
> Sent: Wednesday, November 9, 2016 9:10:47 PM
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-82 - Add Record Headers
>
> Sorry for going a little in the weeds but thanks for the replies regarding
> varint.
>
> Agreed that a prefix and {int, int} can be the same.  It doesn't look like
> that's what the KIP is saying the "Open" section.   The example shows
> 211
> for New Relic and 210002 for App Dynamics implying that the New Relic
> organization will have only a single header id to work with.  Or is 211
> a prefix?  The main point of a namespace or prefix is to reduce the
> overhead of config mapping or registration depending on how
> namespaces/prefixes are managed.
>
> Would love to hear more feedback on the higher-level questions though...
>
> Cheers,
>
> Roger
>
>
> On Wed, Nov 9, 2016 at 11:38 AM, radai  wrote:
>
> > I think this discussion is getting a bit into the weeds on technical
> > implementation details.
> > I'd liek to step back a minute and try and establish where we are in the
> > larger picture:
> >
> > (re-wording nacho's last paragraph)
> > 1. are we all in agreement that headers are a worthwhile and useful
> > addition to have? this was contested early on
> > 2. are we all in agreement on headers as top level entity vs headers
> > squirreled-away in V?
> >
> > if there are still concerns around these #2 points (#jay? #jun?)?
> >
> > (and now back to our normal programming ...)
> >
> > varints are nice. having said that, its adding complexity (see
> > https://github.com/addthis/stream-lib/blob/master/src/
> > main/java/com/clearspring/analytics/util/Varint.java
> > as 1st google result) and would require anyone writing other clients (C?
> > Python? Go? Bash? ;-) ) to get/implement the same, and for relatively
> > little gain (int vs string is order of magnitude, this isnt).
> >
> > int namespacing vs {int, int} namespacing are basically the same thing -
> > youre just namespacing an int64 and giving people while 2^32 ranges at a
> > time. the part i like about this is letting people have a large swath of
> > numbers with one registration so they dont have to come back for every
> > single plugin/header they want to "reserve".
> >
> >
> > On Wed, Nov 9, 2016 at 11:01 AM, Roger Hoover 
> > wrote:
> >
> > > Since some of the debate has been about overhead + performance, I'm
> > > wondering if we have considered a varint encoding (
> > > https://developers.google.com/protocol-buffers/docs/encoding#varints)
> > for
> > > the header length field (int32 in the proposal) and for header ids?  If
> > you
> > > don't use headers, the overhead would be a single byte and for each
> > header
> > > id < 128 would also need only a single byte?
> > >
> > >
> > >
> > > On Wed, Nov 9, 2016 at 6:43 AM, radai 
> > wrote:
> > >
> > > > @magnus - and very dangerous (youre essentially downloading and
> > executing
> > > > arbitrary code off the internet on your servers ... bad idea without
> a
> > > > sandbox, even with)
> > > >
> > > > as for it being a purely administrative task - i disagree.
> > > >
> > > > i wish it would, really, because then my earlier point on the
> > complexity
> > > of
> > > > the remapping process would be invalid, but at linkedin, for example,
> > we
> > > > (the team im in) run kafka as a service. we dont really know what our
> > > users
> > > > (developing applications that use kafka) are up to at any given
> moment.
> > > it
> > > > is very possible (given the existance of headers and a corresponding
> > > plugin
> > > > ecosystem) for some application to "equip" their producers and
> > 

Re: [VOTE] KIP-89: Allow sink connectors to decouple flush and offset commit

2016-11-14 Thread Shikhar Bhushan
The vote passed with +3 binding votes. Thanks all!

On Sun, Nov 13, 2016 at 1:42 PM Gwen Shapira  wrote:

+1 (binding)

On Nov 9, 2016 2:17 PM, "Shikhar Bhushan"  wrote:

> Hi,
>
> I would like to initiate a vote on KIP-89
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 89%3A+Allow+sink+connectors+to+decouple+flush+and+offset+commit
>
> Best,
>
> Shikhar
>


[jira] [Assigned] (KAFKA-4306) Connect workers won't shut down if brokers are not available

2016-11-14 Thread Konstantine Karantasis (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis reassigned KAFKA-4306:
-

Assignee: Konstantine Karantasis  (was: Ewen Cheslack-Postava)

> Connect workers won't shut down if brokers are not available
> 
>
> Key: KAFKA-4306
> URL: https://issues.apache.org/jira/browse/KAFKA-4306
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.1.0
>Reporter: Gwen Shapira
>Assignee: Konstantine Karantasis
>
> If brokers are not available and we try to shut down connect workers, sink 
> connectors will be stuck in a loop retrying to commit offsets:
> 2016-10-17 09:39:14,907] INFO Marking the coordinator 192.168.1.9:9092 (id: 
> 2147483647 rack: null) dead for group connect-dump-kafka-config1 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:600)
> [2016-10-17 09:39:14,907] ERROR Commit of 
> WorkerSinkTask{id=dump-kafka-config1-0} offsets threw an unexpected 
> exception:  (org.apache.kafka.connect.runtime.WorkerSinkTask:194)
> org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset 
> commit failed with a retriable exception. You should retry committing offsets.
> Caused by: 
> org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException
> We should probably limit the number of retries before doing "unclean" 
> shutdown.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-4154) Kafka Connect fails to shutdown if it has not completed startup

2016-11-14 Thread Konstantine Karantasis (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis reassigned KAFKA-4154:
-

Assignee: Konstantine Karantasis  (was: Shikhar Bhushan)

> Kafka Connect fails to shutdown if it has not completed startup
> ---
>
> Key: KAFKA-4154
> URL: https://issues.apache.org/jira/browse/KAFKA-4154
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Shikhar Bhushan
>Assignee: Konstantine Karantasis
>
> To reproduce:
> 1. Start Kafka Connect in distributed mode without Kafka running 
> {{./bin/connect-distributed.sh config/connect-distributed.properties}}
> 2. Ctrl+C fails to terminate the process
> thread dump:
> {noformat}
> Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.92-b14 mixed mode):
> "Thread-1" #13 prio=5 os_prio=31 tid=0x7fc29a18a800 nid=0x7007 waiting on 
> condition [0x73129000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0x0007bd7d91d8> (a 
> java.util.concurrent.CountDownLatch$Sync)
>   at 
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>   at 
> java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.stop(DistributedHerder.java:357)
>   at 
> org.apache.kafka.connect.runtime.Connect.stop(Connect.java:71)
>   at 
> org.apache.kafka.connect.runtime.Connect$ShutdownHook.run(Connect.java:93)
> "SIGINT handler" #27 daemon prio=9 os_prio=31 tid=0x7fc29aa6a000 
> nid=0x560f in Object.wait() [0x71a61000]
>java.lang.Thread.State: WAITING (on object monitor)
>   at java.lang.Object.wait(Native Method)
>   - waiting on <0x0007bd63db38> (a 
> org.apache.kafka.connect.runtime.Connect$ShutdownHook)
>   at java.lang.Thread.join(Thread.java:1245)
>   - locked <0x0007bd63db38> (a 
> org.apache.kafka.connect.runtime.Connect$ShutdownHook)
>   at java.lang.Thread.join(Thread.java:1319)
>   at 
> java.lang.ApplicationShutdownHooks.runHooks(ApplicationShutdownHooks.java:106)
>   at 
> java.lang.ApplicationShutdownHooks$1.run(ApplicationShutdownHooks.java:46)
>   at java.lang.Shutdown.runHooks(Shutdown.java:123)
>   at java.lang.Shutdown.sequence(Shutdown.java:167)
>   at java.lang.Shutdown.exit(Shutdown.java:212)
>   - locked <0x0007b0244600> (a java.lang.Class for 
> java.lang.Shutdown)
>   at java.lang.Terminator$1.handle(Terminator.java:52)
>   at sun.misc.Signal$1.run(Signal.java:212)
>   at java.lang.Thread.run(Thread.java:745)
> "kafka-producer-network-thread | producer-1" #15 daemon prio=5 os_prio=31 
> tid=0x7fc29a0b7000 nid=0x7a03 runnable [0x72608000]
>java.lang.Thread.State: RUNNABLE
>   at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
>   at 
> sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198)
>   at 
> sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:117)
>   at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
>   - locked <0x0007bd7788d8> (a sun.nio.ch.Util$2)
>   - locked <0x0007bd7788e8> (a 
> java.util.Collections$UnmodifiableSet)
>   - locked <0x0007bd77> (a sun.nio.ch.KQueueSelectorImpl)
>   at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
>   at 
> org.apache.kafka.common.network.Selector.select(Selector.java:470)
>   at 
> org.apache.kafka.common.network.Selector.poll(Selector.java:286)
>   at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134)
>   at java.lang.Thread.run(Thread.java:745)
> "DistributedHerder" #14 prio=5 os_prio=31 tid=0x7fc29a11e000 nid=0x7803 
> waiting on condition [0x72505000]
>java.lang.Thread.State: TIMED_WAITING (sleeping)
>   at 

[jira] [Assigned] (KAFKA-3008) Connect should parallelize task start/stop

2016-11-14 Thread Konstantine Karantasis (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis reassigned KAFKA-3008:
-

Assignee: Konstantine Karantasis  (was: Liquan Pei)

> Connect should parallelize task start/stop
> --
>
> Key: KAFKA-3008
> URL: https://issues.apache.org/jira/browse/KAFKA-3008
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.9.0.0
>Reporter: Ewen Cheslack-Postava
>Assignee: Konstantine Karantasis
>Priority: Minor
>
> The Herder implementations currently iterate over all connectors/tasks and 
> sequentially start/stop them. We should parallelize this. This is less 
> critical for {{StandaloneHerder}}, but pretty important for 
> {{DistributedHerder}} since it will generally be managing more tasks and any 
> delay starting/stopping a single task will impact every other task on the 
> node (and can ultimately result in incorrect behavior in the case of a single 
> offset commit in one connector taking too long preventing all of the rest 
> from committing offsets).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag

2016-11-14 Thread Becket Qin
Michael,

Yes, I am OK with stage 1. We can discuss about stage 2 later but this
sounds really an organization specific decision to deprecate an API. It
does not seem a general need in open source Kafka to only support tombstone
bit , which is a bad thing for people who are still running old clients in
the community. This is exactly why we want to have magic byte bump - there
should be only one definitive way to interpret a message with a given magic
byte. We should avoid re-define the interpretation of a message with an
existing magic byte. The interpretation of an old message format should not
be changed and should always be supported until deprecated.

Thanks,

Jiangjie (Becket) Qin

On Mon, Nov 14, 2016 at 11:28 AM, Mayuresh Gharat <
gharatmayures...@gmail.com> wrote:

> I am not sure about "If I understand correctly, you want to let the broker
> to reject requests
> from old clients to ensure everyone in an organization has upgraded,
> right?"
>
> I don't think we will be rejecting requests. What phase2 (stage 2) meant
> was we will only do log compaction based on tombstone marker and nothing
> else.
> I am not sure about making this configurable, as a configuration makes it
> sound like the broker has the capability to support null and tombstone
> marker bit, for log compaction, for life long.
>
> I agree with Michael's idea of delivering phase 1 (stage1) right now. It
> will also be good to put a note saying that we will be deprecating the
> older way of log compaction in next release.
> Phase 2 (stage 2) will actually only rely on tombstone marker for doing log
> compaction.
> So this actually meets our end state of having tombstone marker for log
> compaction support.
>
> Thanks,
>
> Mayuresh
>
>
> On Mon, Nov 14, 2016 at 11:09 AM, Michael Pearce 
> wrote:
>
> > I'm ok with this, but I'd like to at least get phase 1 in, for the next
> > release, this is what I'm very keen for,
> >
> > As such shall we say we have
> > Kip-87a that delivers phase 1
> >
> > And then
> > Kip-87b in release after that delivers phase 2 but has a dependency on
> > support of deprecating old api versions
> >
> > How does this approach sound?
> >
> >
> > 
> > From: Becket Qin 
> > Sent: Monday, November 14, 2016 6:14:44 PM
> > To: dev@kafka.apache.org
> > Subject: Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag
> >
> > Hey Michael and Mayuresh,
> >
> > If I understand correctly, you want to let the broker to reject requests
> > from old clients to ensure everyone in an organization has upgraded,
> right?
> > This is essentially deprecating an old protocol. I agree it is useful and
> > that is why we have that baked in KIP-35. However, we haven't thought
> > through how to deprecate an API so far. From broker's point of view., it
> > will always support all the old protocols according to Kafka's widely
> known
> > compatibility guarantee. If people decide to deprecate an API within an
> > organization, we can provide some additional configuration to let people
> do
> > this, which is not a bad idea but seems better to be discussed in another
> > KIP.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Mon, Nov 14, 2016 at 8:52 AM, Michael Pearce 
> > wrote:
> >
> > > I agree with Mayuresh.
> > >
> > > I don't see how having a magic byte helps here.
> > >
> > > What we are saying is that on day 1 after an upgrade both tombstone
> flag
> > > or a null value will be treated as a marker to delete on compacted
> topic.
> > >
> > > During this time we expect organisations to migrate themselves over
> onto
> > > the new producers and consumers that call the new Api, changing their
> > > application logic. during this point producers should start setting the
> > > tombstone marker and consumers to start behaving based on the tombstone
> > > marker.
> > >
> > > Only after all producers and consumers are upgraded then we enter
> phase 2
> > > disabling the use of a null value as a delete marker. And the old apis
> > > would not be allowed to be called as the broker now expects all clients
> > to
> > > be using latest api only.
> > >
> > > At this second phase stage only also we can support a null value in a
> > > compacted topic without it being treated as deletion. We solely base on
> > the
> > > tombstone marker.
> > >
> > >
> > > 
> > > From: Mayuresh Gharat 
> > > Sent: Saturday, November 12, 2016 2:18:19 AM
> > > To: dev@kafka.apache.org
> > > Subject: Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag
> > >
> > > I think "In second stage we move on to supporting only the attribute
> flag
> > > for log
> > > compaction." means that it will no longer support request from older
> > > clients.
> > >
> > > Thanks,
> > >
> > > Mayuresh
> > >
> > > On Fri, Nov 11, 2016 at 10:02 AM, Becket Qin 
> > wrote:
> > >
> > > > Hey 

Re: [VOTE] KIP-72 - Allow putting a bound on memory consumed by Incoming requests

2016-11-14 Thread Rajini Sivaram
+1

Thank you for the KIP, Radai.

On Mon, Nov 14, 2016 at 6:07 PM, Mickael Maison 
wrote:

> +1. We've also been hit by OOMs on the broker because we were not able
> to properly bound its memory usage.
>
> On Mon, Nov 14, 2016 at 5:56 PM, radai  wrote:
> > @rajini - fixed the hasBytesBuffered() method. also updated poll() so
> that
> > no latency is added for picking up data stuck in ssl buffers (timeout is
> > set to 0, just like with immediately connected keys and staged receives).
> > thank you for pointing these out.
> > added ssl (re) testing to the KIP testing plan.
> >
> >
> >
> >
> > On Mon, Nov 14, 2016 at 7:24 AM, Rajini Sivaram <
> > rajinisiva...@googlemail.com> wrote:
> >
> >> Open point 1. I would just retain the current long value that specifies
> >> queued.max.bytes as long and not as %heap since it is simple and easy to
> >> use. And keeps it consistent with other ".bytes" configs.
> >>
> >> Point 3. ssl buffers - I am not quite sure the implementation looks
> >> correct. hasBytesBuffered() is checking position() of buffers == 0. And
> the
> >> code checks this only when poll with a timeout returns (adding a delay
> when
> >> there is nothing else to read).
> >> But since this and open point 2 (optimization) are implementation
> details,
> >> they can be looked at during PR review.
> >>
> >> It will be good to add SSL testing to the test plan as well, since
> there is
> >> additional code to test for SSL.
> >>
> >>
> >> On Fri, Nov 11, 2016 at 9:03 PM, radai 
> wrote:
> >>
> >> > ok, i've made the following changes:
> >> >
> >> > 1. memory.pool.class.name has been removed
> >> > 2. the code now only uses SimpleMemoryPool. the gc variant is left
> >> (unused)
> >> > as a developement aid and is unsettable via configuration.
> >> > 3. I've resolved the issue of stale data getting stuck in intermediate
> >> > (ssl) buffers.
> >> > 4. default value for queued.max.bytes is -1, so off by default. any
> <=0
> >> > value is interpreted as off by the underlying code.
> >> >
> >> > open points:
> >> >
> >> > 1. the kafka config framework doesnt allow a value to be either long
> or
> >> > double, so in order to pull off the queued.max.bytes = 100 or
> >> > queued.max.bytes = 0.3 thing i'd need to define the config as type
> >> string,
> >> > which is ugly to me. do we want to support setting queued.max.bytes
> to %
> >> of
> >> > heap ? if so, by way of making queued.max.bytes of type string, or by
> way
> >> > of a 2nd config param (with the resulting either/all/combination?
> >> > validation). my personal opinion is string because i think a single
> >> > queued.max.bytes with overloaded meaning is more understandable to
> users.
> >> > i'll await other people's opinions before doing anything.
> >> > 2. i still need to evaluate rajini's optimization. sounds doable.
> >> >
> >> > asides:
> >> >
> >> > 1. i think you guys misunderstood the intent behind the gc pool. it
> was
> >> > never meant to be a magic pool that automatically releases buffers
> >> (because
> >> > just as rajini stated the performance implications would be
> horrible). it
> >> > was meant to catch leaks early. since that is indeed a dev-only
> concern
> >> it
> >> > wont ever get used in production.
> >> > 2. i said this on some other kip discussion: i think the nice thing
> about
> >> > the pool API is it "scales" from just keeping a memory bound to
> actually
> >> > re-using buffers without changing the calling code. i think
> >> actuallypooling
> >> > large buffers will result in a significant performance impact, but
> thats
> >> > outside the scope of this kip. at that point i think more pool
> >> > implementations (that actually pool) would be written. i agree with
> the
> >> > ideal of exposing as few knobs as possible, but switching pools (or
> pool
> >> > params) for tuning may happen at some later point.
> >> >
> >> >
> >> >
> >> > On Fri, Nov 11, 2016 at 11:44 AM, Rajini Sivaram <
> >> > rajinisiva...@googlemail.com> wrote:
> >> >
> >> > > 13. At the moment, I think channels are not muted if:
> >> > > channel.receive != null && channel.receive.buffer != null
> >> > > This mutes all channels that aren't holding onto a incomplete
> buffer.
> >> > They
> >> > > may or may not have read the 4-byte size.
> >> > >
> >> > > I was thinking you could avoid muting channels if:
> >> > > channel.receive == null || channel.receive.size.remaining()
> >> > > This will not mute channels that are holding onto a buffer (as
> above).
> >> In
> >> > > addition, it will not mute channels that haven't read the 4-byte
> size.
> >> A
> >> > > client that is closed gracefully while the pool is full will not be
> >> muted
> >> > > in this case and the server can process close without waiting for
> the
> >> > pool
> >> > > to free up. Once the 4-byte size is read, the channel will be muted
> if
> >> > the
> >> > > pool is still out of memory - for each 

[jira] [Work started] (KAFKA-4393) Improve invalid/negative TS handling

2016-11-14 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-4393 started by Matthias J. Sax.
--
> Improve invalid/negative TS handling
> 
>
> Key: KAFKA-4393
> URL: https://issues.apache.org/jira/browse/KAFKA-4393
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
> Fix For: 0.10.2.0
>
>
> Currently, Kafka Streams does not handle invalid/negative timestamps returned 
> from the {{TimestampExtractor}} gracefully, but fails with an exception, 
> because negative timestamps cannot get handled in a meaningful way for any 
> time based (ie, window) operators like window aggregates and joins.
> We want to change Streams to a auto-drop behavior for negative timestamps for 
> those records (without any further user notification about dropped record) to 
> enable users to "step over" those records and keep going (instead of an 
> exception). To guard the user from silently dropping messages by default (and 
> kept current fail-fast behavior), we change the default extractor 
> {{ConsumerRecordTimestampExtractor}} to check the extracted meta-data record 
> timestamp and raise an exception if it is negative. Furthermore, we add a 
> "drop-and-log" extractor, as this seems to be a common behavior user might 
> want to have. For any other behavior, users can still provide a custom 
> TS-Extractor implementation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag

2016-11-14 Thread Mayuresh Gharat
I am not sure about "If I understand correctly, you want to let the broker
to reject requests
from old clients to ensure everyone in an organization has upgraded,
right?"

I don't think we will be rejecting requests. What phase2 (stage 2) meant
was we will only do log compaction based on tombstone marker and nothing
else.
I am not sure about making this configurable, as a configuration makes it
sound like the broker has the capability to support null and tombstone
marker bit, for log compaction, for life long.

I agree with Michael's idea of delivering phase 1 (stage1) right now. It
will also be good to put a note saying that we will be deprecating the
older way of log compaction in next release.
Phase 2 (stage 2) will actually only rely on tombstone marker for doing log
compaction.
So this actually meets our end state of having tombstone marker for log
compaction support.

Thanks,

Mayuresh


On Mon, Nov 14, 2016 at 11:09 AM, Michael Pearce 
wrote:

> I'm ok with this, but I'd like to at least get phase 1 in, for the next
> release, this is what I'm very keen for,
>
> As such shall we say we have
> Kip-87a that delivers phase 1
>
> And then
> Kip-87b in release after that delivers phase 2 but has a dependency on
> support of deprecating old api versions
>
> How does this approach sound?
>
>
> 
> From: Becket Qin 
> Sent: Monday, November 14, 2016 6:14:44 PM
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag
>
> Hey Michael and Mayuresh,
>
> If I understand correctly, you want to let the broker to reject requests
> from old clients to ensure everyone in an organization has upgraded, right?
> This is essentially deprecating an old protocol. I agree it is useful and
> that is why we have that baked in KIP-35. However, we haven't thought
> through how to deprecate an API so far. From broker's point of view., it
> will always support all the old protocols according to Kafka's widely known
> compatibility guarantee. If people decide to deprecate an API within an
> organization, we can provide some additional configuration to let people do
> this, which is not a bad idea but seems better to be discussed in another
> KIP.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Mon, Nov 14, 2016 at 8:52 AM, Michael Pearce 
> wrote:
>
> > I agree with Mayuresh.
> >
> > I don't see how having a magic byte helps here.
> >
> > What we are saying is that on day 1 after an upgrade both tombstone flag
> > or a null value will be treated as a marker to delete on compacted topic.
> >
> > During this time we expect organisations to migrate themselves over onto
> > the new producers and consumers that call the new Api, changing their
> > application logic. during this point producers should start setting the
> > tombstone marker and consumers to start behaving based on the tombstone
> > marker.
> >
> > Only after all producers and consumers are upgraded then we enter phase 2
> > disabling the use of a null value as a delete marker. And the old apis
> > would not be allowed to be called as the broker now expects all clients
> to
> > be using latest api only.
> >
> > At this second phase stage only also we can support a null value in a
> > compacted topic without it being treated as deletion. We solely base on
> the
> > tombstone marker.
> >
> >
> > 
> > From: Mayuresh Gharat 
> > Sent: Saturday, November 12, 2016 2:18:19 AM
> > To: dev@kafka.apache.org
> > Subject: Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag
> >
> > I think "In second stage we move on to supporting only the attribute flag
> > for log
> > compaction." means that it will no longer support request from older
> > clients.
> >
> > Thanks,
> >
> > Mayuresh
> >
> > On Fri, Nov 11, 2016 at 10:02 AM, Becket Qin 
> wrote:
> >
> > > Hey Michael,
> > >
> > > The way Kafka implements backwards compatibility is to let the brokers
> > > support old protocols. So the brokers have to support older clients
> that
> > do
> > > not understand the new attribute bit. That means we will not be able to
> > get
> > > away with the null value as a tombstone. So we need a good definition
> of
> > > whether we should treat it as a tombstone or not. This is why I think
> we
> > > need a magic value bump.
> > >
> > > My confusion on the second stage is exactly about "we want to end up
> just
> > > supporting tombstone marker (not both)", or in the original statement:
> > "2)
> > > In second stage we move on to supporting only the attribute flag for
> log
> > > compaction." - We will always support the null value as a tombstone as
> > long
> > > as the message format version (i.e. magic byte) is less than 2 for the
> > > reason I mentioned above.
> > >
> > > Although the way to interpret the message bytes at producing time can
> be
> > > inferred from the 

Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag

2016-11-14 Thread Michael Pearce
I'm ok with this, but I'd like to at least get phase 1 in, for the next 
release, this is what I'm very keen for,

As such shall we say we have
Kip-87a that delivers phase 1

And then
Kip-87b in release after that delivers phase 2 but has a dependency on support 
of deprecating old api versions

How does this approach sound?



From: Becket Qin 
Sent: Monday, November 14, 2016 6:14:44 PM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag

Hey Michael and Mayuresh,

If I understand correctly, you want to let the broker to reject requests
from old clients to ensure everyone in an organization has upgraded, right?
This is essentially deprecating an old protocol. I agree it is useful and
that is why we have that baked in KIP-35. However, we haven't thought
through how to deprecate an API so far. From broker's point of view., it
will always support all the old protocols according to Kafka's widely known
compatibility guarantee. If people decide to deprecate an API within an
organization, we can provide some additional configuration to let people do
this, which is not a bad idea but seems better to be discussed in another
KIP.

Thanks,

Jiangjie (Becket) Qin

On Mon, Nov 14, 2016 at 8:52 AM, Michael Pearce 
wrote:

> I agree with Mayuresh.
>
> I don't see how having a magic byte helps here.
>
> What we are saying is that on day 1 after an upgrade both tombstone flag
> or a null value will be treated as a marker to delete on compacted topic.
>
> During this time we expect organisations to migrate themselves over onto
> the new producers and consumers that call the new Api, changing their
> application logic. during this point producers should start setting the
> tombstone marker and consumers to start behaving based on the tombstone
> marker.
>
> Only after all producers and consumers are upgraded then we enter phase 2
> disabling the use of a null value as a delete marker. And the old apis
> would not be allowed to be called as the broker now expects all clients to
> be using latest api only.
>
> At this second phase stage only also we can support a null value in a
> compacted topic without it being treated as deletion. We solely base on the
> tombstone marker.
>
>
> 
> From: Mayuresh Gharat 
> Sent: Saturday, November 12, 2016 2:18:19 AM
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag
>
> I think "In second stage we move on to supporting only the attribute flag
> for log
> compaction." means that it will no longer support request from older
> clients.
>
> Thanks,
>
> Mayuresh
>
> On Fri, Nov 11, 2016 at 10:02 AM, Becket Qin  wrote:
>
> > Hey Michael,
> >
> > The way Kafka implements backwards compatibility is to let the brokers
> > support old protocols. So the brokers have to support older clients that
> do
> > not understand the new attribute bit. That means we will not be able to
> get
> > away with the null value as a tombstone. So we need a good definition of
> > whether we should treat it as a tombstone or not. This is why I think we
> > need a magic value bump.
> >
> > My confusion on the second stage is exactly about "we want to end up just
> > supporting tombstone marker (not both)", or in the original statement:
> "2)
> > In second stage we move on to supporting only the attribute flag for log
> > compaction." - We will always support the null value as a tombstone as
> long
> > as the message format version (i.e. magic byte) is less than 2 for the
> > reason I mentioned above.
> >
> > Although the way to interpret the message bytes at producing time can be
> > inferred from the the ProduceRequest version, this version information
> > won't be stored in the broker. So when an old consumer comes, we need to
> > decide whether we have to do down conversion or not.. With a clear
> existing
> > configuration of message version config (i.e. magic value), we know for
> > sure when to down convert the messages to adapt to older clients.
> Otherwise
> > we will have to always scan all the messages. It would probably work but
> > relies on guess or inference.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Fri, Nov 11, 2016 at 8:42 AM, Mayuresh Gharat <
> > gharatmayures...@gmail.com
> > > wrote:
> >
> > > Sounds good Michael.
> > >
> > > Thanks,
> > >
> > > Mayuresh
> > >
> > > On Fri, Nov 11, 2016 at 12:48 AM, Michael Pearce <
> michael.pea...@ig.com>
> > > wrote:
> > >
> > > > @Mayuresh i don't think you've missed anything -
> > > >
> > > > as per earlier in the discussion.
> > > >
> > > > We're providing new api versions, but not planning on bumping the
> magic
> > > > number as there is no structural changes, we are simply using up a
> new
> > > > attribute bit (as like adding new compression support just uses up
> > > > additional attribute bits)

Build failed in Jenkins: kafka-trunk-jdk8 #1040

2016-11-14 Thread Apache Jenkins Server
See 

Changes:

[becket.qin] KAFKA-4409; Fix deadlock between topic event handling and shutdown 
in

--
[...truncated 3875 lines...]

kafka.utils.CommandLineUtilsTest > testParseSingleArg STARTED

kafka.utils.CommandLineUtilsTest > testParseSingleArg PASSED

kafka.utils.CommandLineUtilsTest > testParseArgs STARTED

kafka.utils.CommandLineUtilsTest > testParseArgs PASSED

kafka.utils.CommandLineUtilsTest > testParseEmptyArgAsValid STARTED

kafka.utils.CommandLineUtilsTest > testParseEmptyArgAsValid PASSED

kafka.utils.ReplicationUtilsTest > testUpdateLeaderAndIsr STARTED

kafka.utils.ReplicationUtilsTest > testUpdateLeaderAndIsr PASSED

kafka.utils.ReplicationUtilsTest > testGetLeaderIsrAndEpochForPartition STARTED

kafka.utils.ReplicationUtilsTest > testGetLeaderIsrAndEpochForPartition PASSED

kafka.utils.JsonTest > testJsonEncoding STARTED

kafka.utils.JsonTest > testJsonEncoding PASSED

kafka.utils.SchedulerTest > testMockSchedulerNonPeriodicTask STARTED

kafka.utils.SchedulerTest > testMockSchedulerNonPeriodicTask PASSED

kafka.utils.SchedulerTest > testMockSchedulerPeriodicTask STARTED

kafka.utils.SchedulerTest > testMockSchedulerPeriodicTask PASSED

kafka.utils.SchedulerTest > testNonPeriodicTask STARTED

kafka.utils.SchedulerTest > testNonPeriodicTask PASSED

kafka.utils.SchedulerTest > testRestart STARTED

kafka.utils.SchedulerTest > testRestart PASSED

kafka.utils.SchedulerTest > testReentrantTaskInMockScheduler STARTED

kafka.utils.SchedulerTest > testReentrantTaskInMockScheduler PASSED

kafka.utils.SchedulerTest > testPeriodicTask STARTED

kafka.utils.SchedulerTest > testPeriodicTask PASSED

kafka.utils.ZkUtilsTest > testAbortedConditionalDeletePath STARTED

kafka.utils.ZkUtilsTest > testAbortedConditionalDeletePath PASSED

kafka.utils.ZkUtilsTest > testSuccessfulConditionalDeletePath STARTED

kafka.utils.ZkUtilsTest > testSuccessfulConditionalDeletePath PASSED

kafka.utils.ZkUtilsTest > testClusterIdentifierJsonParsing STARTED

kafka.utils.ZkUtilsTest > testClusterIdentifierJsonParsing PASSED

kafka.utils.IteratorTemplateTest > testIterator STARTED

kafka.utils.IteratorTemplateTest > testIterator PASSED

kafka.utils.UtilsTest > testGenerateUuidAsBase64 STARTED

kafka.utils.UtilsTest > testGenerateUuidAsBase64 PASSED

kafka.utils.UtilsTest > testAbs STARTED

kafka.utils.UtilsTest > testAbs PASSED

kafka.utils.UtilsTest > testReplaceSuffix STARTED

kafka.utils.UtilsTest > testReplaceSuffix PASSED

kafka.utils.UtilsTest > testCircularIterator STARTED

kafka.utils.UtilsTest > testCircularIterator PASSED

kafka.utils.UtilsTest > testReadBytes STARTED

kafka.utils.UtilsTest > testReadBytes PASSED

kafka.utils.UtilsTest > testCsvList STARTED

kafka.utils.UtilsTest > testCsvList PASSED

kafka.utils.UtilsTest > testReadInt STARTED

kafka.utils.UtilsTest > testReadInt PASSED

kafka.utils.UtilsTest > testUrlSafeBase64EncodeUUID STARTED

kafka.utils.UtilsTest > testUrlSafeBase64EncodeUUID PASSED

kafka.utils.UtilsTest > testCsvMap STARTED

kafka.utils.UtilsTest > testCsvMap PASSED

kafka.utils.UtilsTest > testInLock STARTED

kafka.utils.UtilsTest > testInLock PASSED

kafka.utils.UtilsTest > testSwallow STARTED

kafka.utils.UtilsTest > testSwallow PASSED

kafka.utils.ByteBoundedBlockingQueueTest > testByteBoundedBlockingQueue STARTED

kafka.utils.ByteBoundedBlockingQueueTest > testByteBoundedBlockingQueue PASSED

kafka.producer.AsyncProducerTest > testFailedSendRetryLogic STARTED

kafka.producer.AsyncProducerTest > testFailedSendRetryLogic PASSED

kafka.producer.AsyncProducerTest > testQueueTimeExpired STARTED

kafka.producer.AsyncProducerTest > testQueueTimeExpired PASSED

kafka.producer.AsyncProducerTest > testPartitionAndCollateEvents STARTED

kafka.producer.AsyncProducerTest > testPartitionAndCollateEvents PASSED

kafka.producer.AsyncProducerTest > testBatchSize STARTED

kafka.producer.AsyncProducerTest > testBatchSize PASSED

kafka.producer.AsyncProducerTest > testSerializeEvents STARTED

kafka.producer.AsyncProducerTest > testSerializeEvents PASSED

kafka.producer.AsyncProducerTest > testProducerQueueSize STARTED

kafka.producer.AsyncProducerTest > testProducerQueueSize PASSED

kafka.producer.AsyncProducerTest > testRandomPartitioner STARTED

kafka.producer.AsyncProducerTest > testRandomPartitioner PASSED

kafka.producer.AsyncProducerTest > testInvalidConfiguration STARTED

kafka.producer.AsyncProducerTest > testInvalidConfiguration PASSED

kafka.producer.AsyncProducerTest > testInvalidPartition STARTED

kafka.producer.AsyncProducerTest > testInvalidPartition PASSED

kafka.producer.AsyncProducerTest > testNoBroker STARTED

kafka.producer.AsyncProducerTest > testNoBroker PASSED

kafka.producer.AsyncProducerTest > testProduceAfterClosed STARTED

kafka.producer.AsyncProducerTest > testProduceAfterClosed PASSED

kafka.producer.AsyncProducerTest > testJavaProducer STARTED

kafka.producer.AsyncProducerTest 

Re: [DISCUSS] KIP-81: Max in-flight fetches

2016-11-14 Thread radai
+1 - there's is a need for an effective way to control kafka memory
consumption - both on the broker and on clients.
i think we could even reuse the exact same param name - *queued.max.bytes *-
as it would serve the exact same purpose.

also (and again its the same across the broker and clients) this bound
should also cover decompression, at some point.
the problem with that is that to the best of my knowledge the current wire
protocol does not declare the final, uncompressed size of anything up front
- all we know is the size of the compressed buffer. this may require a
format change in the future to properly support?

On Mon, Nov 14, 2016 at 10:03 AM, Mickael Maison 
wrote:

> Thanks for all the replies.
>
> I've updated the KIP:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 81%3A+Bound+Fetch+memory+usage+in+the+consumer
> The main point is to selectively read from sockets instead of
> throttling FetchRequests sends. I also mentioned it will be reusing
> the MemoryPool implementation created in KIP-72 instead of adding
> another memory tracking method.
>
> Please have another look. As always, comments are welcome !
>
> On Thu, Nov 10, 2016 at 2:47 AM, radai  wrote:
> > selectively reading from sockets achieves memory control (up to and not
> > including talk of (de)compression)
> >
> > this is exactly what i (also, even mostly) did for kip-72 - which i hope
> in
> > itself should be a reason to think about both KIPs at the same time
> because
> > the changes will be similar (at least in intent) and might result in
> > duplicated effort.
> >
> > a pool API is a way to "scale" all the way from just maintaining a
> variable
> > holding amount of available memory (which is what my current kip-72 code
> > does and what this kip proposes IIUC) all the way up to actually re-using
> > buffers without any changes to the code using the pool - just drop in a
> > different pool impl.
> >
> > for "edge nodes" (producer/consumer) the performance gain in actually
> > pooling large buffers may be arguable, but i suspect for brokers
> regularly
> > operating on 1MB-sized requests (which is the norm at linkedin) the
> > resulting memory fragmentation is an actual bottleneck (i have initial
> > micro-benchmark results to back this up but have not had the time to do a
> > full profiling run).
> >
> > so basically I'm saying we may be doing (very) similar things in mostly
> the
> > same areas of code.
> >
> > On Wed, Nov 2, 2016 at 11:35 AM, Mickael Maison <
> mickael.mai...@gmail.com>
> > wrote:
> >
> >> electively reading from the socket should enable to
> >> control the memory usage without impacting performance. I've had look
> >> at that today and I can see how that would work.
> >> I'll update the KIP accordingly tomorrow.
> >>
>


Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag

2016-11-14 Thread Becket Qin
Hey Michael and Mayuresh,

If I understand correctly, you want to let the broker to reject requests
from old clients to ensure everyone in an organization has upgraded, right?
This is essentially deprecating an old protocol. I agree it is useful and
that is why we have that baked in KIP-35. However, we haven't thought
through how to deprecate an API so far. From broker's point of view., it
will always support all the old protocols according to Kafka's widely known
compatibility guarantee. If people decide to deprecate an API within an
organization, we can provide some additional configuration to let people do
this, which is not a bad idea but seems better to be discussed in another
KIP.

Thanks,

Jiangjie (Becket) Qin

On Mon, Nov 14, 2016 at 8:52 AM, Michael Pearce 
wrote:

> I agree with Mayuresh.
>
> I don't see how having a magic byte helps here.
>
> What we are saying is that on day 1 after an upgrade both tombstone flag
> or a null value will be treated as a marker to delete on compacted topic.
>
> During this time we expect organisations to migrate themselves over onto
> the new producers and consumers that call the new Api, changing their
> application logic. during this point producers should start setting the
> tombstone marker and consumers to start behaving based on the tombstone
> marker.
>
> Only after all producers and consumers are upgraded then we enter phase 2
> disabling the use of a null value as a delete marker. And the old apis
> would not be allowed to be called as the broker now expects all clients to
> be using latest api only.
>
> At this second phase stage only also we can support a null value in a
> compacted topic without it being treated as deletion. We solely base on the
> tombstone marker.
>
>
> 
> From: Mayuresh Gharat 
> Sent: Saturday, November 12, 2016 2:18:19 AM
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag
>
> I think "In second stage we move on to supporting only the attribute flag
> for log
> compaction." means that it will no longer support request from older
> clients.
>
> Thanks,
>
> Mayuresh
>
> On Fri, Nov 11, 2016 at 10:02 AM, Becket Qin  wrote:
>
> > Hey Michael,
> >
> > The way Kafka implements backwards compatibility is to let the brokers
> > support old protocols. So the brokers have to support older clients that
> do
> > not understand the new attribute bit. That means we will not be able to
> get
> > away with the null value as a tombstone. So we need a good definition of
> > whether we should treat it as a tombstone or not. This is why I think we
> > need a magic value bump.
> >
> > My confusion on the second stage is exactly about "we want to end up just
> > supporting tombstone marker (not both)", or in the original statement:
> "2)
> > In second stage we move on to supporting only the attribute flag for log
> > compaction." - We will always support the null value as a tombstone as
> long
> > as the message format version (i.e. magic byte) is less than 2 for the
> > reason I mentioned above.
> >
> > Although the way to interpret the message bytes at producing time can be
> > inferred from the the ProduceRequest version, this version information
> > won't be stored in the broker. So when an old consumer comes, we need to
> > decide whether we have to do down conversion or not.. With a clear
> existing
> > configuration of message version config (i.e. magic value), we know for
> > sure when to down convert the messages to adapt to older clients.
> Otherwise
> > we will have to always scan all the messages. It would probably work but
> > relies on guess or inference.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Fri, Nov 11, 2016 at 8:42 AM, Mayuresh Gharat <
> > gharatmayures...@gmail.com
> > > wrote:
> >
> > > Sounds good Michael.
> > >
> > > Thanks,
> > >
> > > Mayuresh
> > >
> > > On Fri, Nov 11, 2016 at 12:48 AM, Michael Pearce <
> michael.pea...@ig.com>
> > > wrote:
> > >
> > > > @Mayuresh i don't think you've missed anything -
> > > >
> > > > as per earlier in the discussion.
> > > >
> > > > We're providing new api versions, but not planning on bumping the
> magic
> > > > number as there is no structural changes, we are simply using up a
> new
> > > > attribute bit (as like adding new compression support just uses up
> > > > additional attribute bits)
> > > >
> > > >
> > > > I think also difference between null vs 0 length byte array is
> covered.
> > > >
> > > > @Becket,
> > > >
> > > > The two stage approach is because we want to end up just supporting
> > > > tombstone marker (not both)
> > > >
> > > > But we accept we need to allow organisations and systems a period of
> > > > transition (this is what stage 1 provides)
> > > >
> > > >
> > > >
> > > >
> > > > 
> > > > From: Mayuresh Gharat 
> > > > Sent: 

[jira] [Updated] (KAFKA-4409) ZK consumer shutdown/topic event deadlock

2016-11-14 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-4409:

Affects Version/s: 0.10.1.0
Fix Version/s: 0.10.1.1
  Component/s: core
   consumer

> ZK consumer shutdown/topic event deadlock
> -
>
> Key: KAFKA-4409
> URL: https://issues.apache.org/jira/browse/KAFKA-4409
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, core
>Affects Versions: 0.10.1.0
>Reporter: Joel Koshy
>Assignee: Joel Koshy
> Fix For: 0.10.1.1
>
>
> This only applies to the old zookeeper consumer. It is trivial enough to fix.
> The consumer can deadlock on shutdown if a topic event fires during shutdown. 
> The shutdown acquires the rebalance lock and then the topic-event-watcher 
> lock. The topic event watcher acquires these in the reverse order. Shutdown 
> should not need to acquire the topic-event-watcher’s lock - all it does is 
> unsubscribes from topic events.
> Stack trace:
> {noformat}
> "mirrormaker-thread-0":
> at 
> kafka.consumer.ZookeeperTopicEventWatcher.shutdown(ZookeeperTopicEventWatcher.scala:50)
> - waiting to lock <0x00072a65d508> (a java.lang.Object)
> at 
> kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:216)
> - locked <0x0007103c69c0> (a java.lang.Object)
> at 
> kafka.tools.MirrorMaker$MirrorMakerOldConsumer.cleanup(MirrorMaker.scala:519)
> at 
> kafka.tools.MirrorMaker$MirrorMakerThread$$anonfun$run$3.apply$mcV$sp(MirrorMaker.scala:441)
> at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:76)
> at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
> at kafka.utils.CoreUtils$.swallowWarn(CoreUtils.scala:47)
> at kafka.utils.Logging$class.swallow(Logging.scala:94)
> at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:47)
> at 
> kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:441)
> "ZkClient-EventThread-58-":
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:639)
> - waiting to lock <0x0007103c69c0> (a java.lang.Object)
> at 
> kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:982)
> at 
> kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.handleTopicEvent(ZookeeperConsumerConnector.scala:1048)
> at 
> kafka.consumer.ZookeeperTopicEventWatcher$ZkTopicEventListener.liftedTree1$1(ZookeeperTopicEventWatcher.scala:69)
> at 
> kafka.consumer.ZookeeperTopicEventWatcher$ZkTopicEventListener.handleChildChange(ZookeeperTopicEventWatcher.scala:65)
> - locked <0x00072a65d508> (a java.lang.Object)
> at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> Found one Java-level deadlock:
> =
> "mirrormaker-thread-0":
>   waiting to lock monitor 0x7f1f38029748 (object 0x00072a65d508, a 
> java.lang.Object),
>   which is held by "ZkClient-EventThread-58-"
> "ZkClient-EventThread-58-":
>   waiting to lock monitor 0x7f1e900249a8 (object 0x0007103c69c0, a 
> java.lang.Object),
>   which is held by "mirrormaker-thread-0"
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-4409) ZK consumer shutdown/topic event deadlock

2016-11-14 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin resolved KAFKA-4409.
-
Resolution: Fixed
  Assignee: Joel Koshy
  Reviewer: Jiangjie Qin

> ZK consumer shutdown/topic event deadlock
> -
>
> Key: KAFKA-4409
> URL: https://issues.apache.org/jira/browse/KAFKA-4409
> Project: Kafka
>  Issue Type: Bug
>Reporter: Joel Koshy
>Assignee: Joel Koshy
>
> This only applies to the old zookeeper consumer. It is trivial enough to fix.
> The consumer can deadlock on shutdown if a topic event fires during shutdown. 
> The shutdown acquires the rebalance lock and then the topic-event-watcher 
> lock. The topic event watcher acquires these in the reverse order. Shutdown 
> should not need to acquire the topic-event-watcher’s lock - all it does is 
> unsubscribes from topic events.
> Stack trace:
> {noformat}
> "mirrormaker-thread-0":
> at 
> kafka.consumer.ZookeeperTopicEventWatcher.shutdown(ZookeeperTopicEventWatcher.scala:50)
> - waiting to lock <0x00072a65d508> (a java.lang.Object)
> at 
> kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:216)
> - locked <0x0007103c69c0> (a java.lang.Object)
> at 
> kafka.tools.MirrorMaker$MirrorMakerOldConsumer.cleanup(MirrorMaker.scala:519)
> at 
> kafka.tools.MirrorMaker$MirrorMakerThread$$anonfun$run$3.apply$mcV$sp(MirrorMaker.scala:441)
> at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:76)
> at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
> at kafka.utils.CoreUtils$.swallowWarn(CoreUtils.scala:47)
> at kafka.utils.Logging$class.swallow(Logging.scala:94)
> at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:47)
> at 
> kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:441)
> "ZkClient-EventThread-58-":
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:639)
> - waiting to lock <0x0007103c69c0> (a java.lang.Object)
> at 
> kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:982)
> at 
> kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.handleTopicEvent(ZookeeperConsumerConnector.scala:1048)
> at 
> kafka.consumer.ZookeeperTopicEventWatcher$ZkTopicEventListener.liftedTree1$1(ZookeeperTopicEventWatcher.scala:69)
> at 
> kafka.consumer.ZookeeperTopicEventWatcher$ZkTopicEventListener.handleChildChange(ZookeeperTopicEventWatcher.scala:65)
> - locked <0x00072a65d508> (a java.lang.Object)
> at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> Found one Java-level deadlock:
> =
> "mirrormaker-thread-0":
>   waiting to lock monitor 0x7f1f38029748 (object 0x00072a65d508, a 
> java.lang.Object),
>   which is held by "ZkClient-EventThread-58-"
> "ZkClient-EventThread-58-":
>   waiting to lock monitor 0x7f1e900249a8 (object 0x0007103c69c0, a 
> java.lang.Object),
>   which is held by "mirrormaker-thread-0"
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-72 - Allow putting a bound on memory consumed by Incoming requests

2016-11-14 Thread Mickael Maison
+1. We've also been hit by OOMs on the broker because we were not able
to properly bound its memory usage.

On Mon, Nov 14, 2016 at 5:56 PM, radai  wrote:
> @rajini - fixed the hasBytesBuffered() method. also updated poll() so that
> no latency is added for picking up data stuck in ssl buffers (timeout is
> set to 0, just like with immediately connected keys and staged receives).
> thank you for pointing these out.
> added ssl (re) testing to the KIP testing plan.
>
>
>
>
> On Mon, Nov 14, 2016 at 7:24 AM, Rajini Sivaram <
> rajinisiva...@googlemail.com> wrote:
>
>> Open point 1. I would just retain the current long value that specifies
>> queued.max.bytes as long and not as %heap since it is simple and easy to
>> use. And keeps it consistent with other ".bytes" configs.
>>
>> Point 3. ssl buffers - I am not quite sure the implementation looks
>> correct. hasBytesBuffered() is checking position() of buffers == 0. And the
>> code checks this only when poll with a timeout returns (adding a delay when
>> there is nothing else to read).
>> But since this and open point 2 (optimization) are implementation details,
>> they can be looked at during PR review.
>>
>> It will be good to add SSL testing to the test plan as well, since there is
>> additional code to test for SSL.
>>
>>
>> On Fri, Nov 11, 2016 at 9:03 PM, radai  wrote:
>>
>> > ok, i've made the following changes:
>> >
>> > 1. memory.pool.class.name has been removed
>> > 2. the code now only uses SimpleMemoryPool. the gc variant is left
>> (unused)
>> > as a developement aid and is unsettable via configuration.
>> > 3. I've resolved the issue of stale data getting stuck in intermediate
>> > (ssl) buffers.
>> > 4. default value for queued.max.bytes is -1, so off by default. any <=0
>> > value is interpreted as off by the underlying code.
>> >
>> > open points:
>> >
>> > 1. the kafka config framework doesnt allow a value to be either long or
>> > double, so in order to pull off the queued.max.bytes = 100 or
>> > queued.max.bytes = 0.3 thing i'd need to define the config as type
>> string,
>> > which is ugly to me. do we want to support setting queued.max.bytes to %
>> of
>> > heap ? if so, by way of making queued.max.bytes of type string, or by way
>> > of a 2nd config param (with the resulting either/all/combination?
>> > validation). my personal opinion is string because i think a single
>> > queued.max.bytes with overloaded meaning is more understandable to users.
>> > i'll await other people's opinions before doing anything.
>> > 2. i still need to evaluate rajini's optimization. sounds doable.
>> >
>> > asides:
>> >
>> > 1. i think you guys misunderstood the intent behind the gc pool. it was
>> > never meant to be a magic pool that automatically releases buffers
>> (because
>> > just as rajini stated the performance implications would be horrible). it
>> > was meant to catch leaks early. since that is indeed a dev-only concern
>> it
>> > wont ever get used in production.
>> > 2. i said this on some other kip discussion: i think the nice thing about
>> > the pool API is it "scales" from just keeping a memory bound to actually
>> > re-using buffers without changing the calling code. i think
>> actuallypooling
>> > large buffers will result in a significant performance impact, but thats
>> > outside the scope of this kip. at that point i think more pool
>> > implementations (that actually pool) would be written. i agree with the
>> > ideal of exposing as few knobs as possible, but switching pools (or pool
>> > params) for tuning may happen at some later point.
>> >
>> >
>> >
>> > On Fri, Nov 11, 2016 at 11:44 AM, Rajini Sivaram <
>> > rajinisiva...@googlemail.com> wrote:
>> >
>> > > 13. At the moment, I think channels are not muted if:
>> > > channel.receive != null && channel.receive.buffer != null
>> > > This mutes all channels that aren't holding onto a incomplete buffer.
>> > They
>> > > may or may not have read the 4-byte size.
>> > >
>> > > I was thinking you could avoid muting channels if:
>> > > channel.receive == null || channel.receive.size.remaining()
>> > > This will not mute channels that are holding onto a buffer (as above).
>> In
>> > > addition, it will not mute channels that haven't read the 4-byte size.
>> A
>> > > client that is closed gracefully while the pool is full will not be
>> muted
>> > > in this case and the server can process close without waiting for the
>> > pool
>> > > to free up. Once the 4-byte size is read, the channel will be muted if
>> > the
>> > > pool is still out of memory - for each channel, at most one failed read
>> > > attempt would be made while the pool is out of memory. I think this
>> would
>> > > also delay muting of SSL channels since they can continue to read into
>> > > their (already allocated) network buffers and unwrap the data and block
>> > > only when they need to allocate a buffer from the pool.
>> > >
>> > > On Fri, Nov 11, 

Re: [DISCUSS] KIP-81: Max in-flight fetches

2016-11-14 Thread Mickael Maison
Thanks for all the replies.

I've updated the KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-81%3A+Bound+Fetch+memory+usage+in+the+consumer
The main point is to selectively read from sockets instead of
throttling FetchRequests sends. I also mentioned it will be reusing
the MemoryPool implementation created in KIP-72 instead of adding
another memory tracking method.

Please have another look. As always, comments are welcome !

On Thu, Nov 10, 2016 at 2:47 AM, radai  wrote:
> selectively reading from sockets achieves memory control (up to and not
> including talk of (de)compression)
>
> this is exactly what i (also, even mostly) did for kip-72 - which i hope in
> itself should be a reason to think about both KIPs at the same time because
> the changes will be similar (at least in intent) and might result in
> duplicated effort.
>
> a pool API is a way to "scale" all the way from just maintaining a variable
> holding amount of available memory (which is what my current kip-72 code
> does and what this kip proposes IIUC) all the way up to actually re-using
> buffers without any changes to the code using the pool - just drop in a
> different pool impl.
>
> for "edge nodes" (producer/consumer) the performance gain in actually
> pooling large buffers may be arguable, but i suspect for brokers regularly
> operating on 1MB-sized requests (which is the norm at linkedin) the
> resulting memory fragmentation is an actual bottleneck (i have initial
> micro-benchmark results to back this up but have not had the time to do a
> full profiling run).
>
> so basically I'm saying we may be doing (very) similar things in mostly the
> same areas of code.
>
> On Wed, Nov 2, 2016 at 11:35 AM, Mickael Maison 
> wrote:
>
>> electively reading from the socket should enable to
>> control the memory usage without impacting performance. I've had look
>> at that today and I can see how that would work.
>> I'll update the KIP accordingly tomorrow.
>>


Re: [VOTE] KIP-72 - Allow putting a bound on memory consumed by Incoming requests

2016-11-14 Thread radai
@rajini - fixed the hasBytesBuffered() method. also updated poll() so that
no latency is added for picking up data stuck in ssl buffers (timeout is
set to 0, just like with immediately connected keys and staged receives).
thank you for pointing these out.
added ssl (re) testing to the KIP testing plan.




On Mon, Nov 14, 2016 at 7:24 AM, Rajini Sivaram <
rajinisiva...@googlemail.com> wrote:

> Open point 1. I would just retain the current long value that specifies
> queued.max.bytes as long and not as %heap since it is simple and easy to
> use. And keeps it consistent with other ".bytes" configs.
>
> Point 3. ssl buffers - I am not quite sure the implementation looks
> correct. hasBytesBuffered() is checking position() of buffers == 0. And the
> code checks this only when poll with a timeout returns (adding a delay when
> there is nothing else to read).
> But since this and open point 2 (optimization) are implementation details,
> they can be looked at during PR review.
>
> It will be good to add SSL testing to the test plan as well, since there is
> additional code to test for SSL.
>
>
> On Fri, Nov 11, 2016 at 9:03 PM, radai  wrote:
>
> > ok, i've made the following changes:
> >
> > 1. memory.pool.class.name has been removed
> > 2. the code now only uses SimpleMemoryPool. the gc variant is left
> (unused)
> > as a developement aid and is unsettable via configuration.
> > 3. I've resolved the issue of stale data getting stuck in intermediate
> > (ssl) buffers.
> > 4. default value for queued.max.bytes is -1, so off by default. any <=0
> > value is interpreted as off by the underlying code.
> >
> > open points:
> >
> > 1. the kafka config framework doesnt allow a value to be either long or
> > double, so in order to pull off the queued.max.bytes = 100 or
> > queued.max.bytes = 0.3 thing i'd need to define the config as type
> string,
> > which is ugly to me. do we want to support setting queued.max.bytes to %
> of
> > heap ? if so, by way of making queued.max.bytes of type string, or by way
> > of a 2nd config param (with the resulting either/all/combination?
> > validation). my personal opinion is string because i think a single
> > queued.max.bytes with overloaded meaning is more understandable to users.
> > i'll await other people's opinions before doing anything.
> > 2. i still need to evaluate rajini's optimization. sounds doable.
> >
> > asides:
> >
> > 1. i think you guys misunderstood the intent behind the gc pool. it was
> > never meant to be a magic pool that automatically releases buffers
> (because
> > just as rajini stated the performance implications would be horrible). it
> > was meant to catch leaks early. since that is indeed a dev-only concern
> it
> > wont ever get used in production.
> > 2. i said this on some other kip discussion: i think the nice thing about
> > the pool API is it "scales" from just keeping a memory bound to actually
> > re-using buffers without changing the calling code. i think
> actuallypooling
> > large buffers will result in a significant performance impact, but thats
> > outside the scope of this kip. at that point i think more pool
> > implementations (that actually pool) would be written. i agree with the
> > ideal of exposing as few knobs as possible, but switching pools (or pool
> > params) for tuning may happen at some later point.
> >
> >
> >
> > On Fri, Nov 11, 2016 at 11:44 AM, Rajini Sivaram <
> > rajinisiva...@googlemail.com> wrote:
> >
> > > 13. At the moment, I think channels are not muted if:
> > > channel.receive != null && channel.receive.buffer != null
> > > This mutes all channels that aren't holding onto a incomplete buffer.
> > They
> > > may or may not have read the 4-byte size.
> > >
> > > I was thinking you could avoid muting channels if:
> > > channel.receive == null || channel.receive.size.remaining()
> > > This will not mute channels that are holding onto a buffer (as above).
> In
> > > addition, it will not mute channels that haven't read the 4-byte size.
> A
> > > client that is closed gracefully while the pool is full will not be
> muted
> > > in this case and the server can process close without waiting for the
> > pool
> > > to free up. Once the 4-byte size is read, the channel will be muted if
> > the
> > > pool is still out of memory - for each channel, at most one failed read
> > > attempt would be made while the pool is out of memory. I think this
> would
> > > also delay muting of SSL channels since they can continue to read into
> > > their (already allocated) network buffers and unwrap the data and block
> > > only when they need to allocate a buffer from the pool.
> > >
> > > On Fri, Nov 11, 2016 at 6:00 PM, Jay Kreps  wrote:
> > >
> > > > Hey Radai,
> > > >
> > > > +1 on deprecating and eventually removing the old config. The
> intention
> > > was
> > > > absolutely bounding memory usage. I think having two ways of doing
> > this,
> > > > one that gives a crisp 

[jira] [Commented] (KAFKA-4409) ZK consumer shutdown/topic event deadlock

2016-11-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15664535#comment-15664535
 ] 

ASF GitHub Bot commented on KAFKA-4409:
---

GitHub user jjkoshy opened a pull request:

https://github.com/apache/kafka/pull/2129

KAFKA-4409; Fix deadlock between topic event handling and shutdown in…

The consumer can deadlock on shutdown if a topic event fires during 
shutdown. The shutdown acquires the rebalance lock and then the 
topic-event-watcher lock. The topic event watcher acquires these in the reverse 
order. Shutdown should not need to acquire the topic-event-watcher’s lock - all 
it does is unsubscribes from topic events.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jjkoshy/kafka KAFKA-4409

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2129.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2129


commit 2a5a276822452b03d816c3ae2880a7b7bf15ea46
Author: Joel Koshy 
Date:   2016-11-14T17:48:16Z

KAFKA-4409; Fix deadlock between topic event handling and shutdown in the 
old consumer.




> ZK consumer shutdown/topic event deadlock
> -
>
> Key: KAFKA-4409
> URL: https://issues.apache.org/jira/browse/KAFKA-4409
> Project: Kafka
>  Issue Type: Bug
>Reporter: Joel Koshy
>
> This only applies to the old zookeeper consumer. It is trivial enough to fix.
> The consumer can deadlock on shutdown if a topic event fires during shutdown. 
> The shutdown acquires the rebalance lock and then the topic-event-watcher 
> lock. The topic event watcher acquires these in the reverse order. Shutdown 
> should not need to acquire the topic-event-watcher’s lock - all it does is 
> unsubscribes from topic events.
> Stack trace:
> {noformat}
> "mirrormaker-thread-0":
> at 
> kafka.consumer.ZookeeperTopicEventWatcher.shutdown(ZookeeperTopicEventWatcher.scala:50)
> - waiting to lock <0x00072a65d508> (a java.lang.Object)
> at 
> kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:216)
> - locked <0x0007103c69c0> (a java.lang.Object)
> at 
> kafka.tools.MirrorMaker$MirrorMakerOldConsumer.cleanup(MirrorMaker.scala:519)
> at 
> kafka.tools.MirrorMaker$MirrorMakerThread$$anonfun$run$3.apply$mcV$sp(MirrorMaker.scala:441)
> at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:76)
> at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
> at kafka.utils.CoreUtils$.swallowWarn(CoreUtils.scala:47)
> at kafka.utils.Logging$class.swallow(Logging.scala:94)
> at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:47)
> at 
> kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:441)
> "ZkClient-EventThread-58-":
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:639)
> - waiting to lock <0x0007103c69c0> (a java.lang.Object)
> at 
> kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:982)
> at 
> kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.handleTopicEvent(ZookeeperConsumerConnector.scala:1048)
> at 
> kafka.consumer.ZookeeperTopicEventWatcher$ZkTopicEventListener.liftedTree1$1(ZookeeperTopicEventWatcher.scala:69)
> at 
> kafka.consumer.ZookeeperTopicEventWatcher$ZkTopicEventListener.handleChildChange(ZookeeperTopicEventWatcher.scala:65)
> - locked <0x00072a65d508> (a java.lang.Object)
> at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> Found one Java-level deadlock:
> =
> "mirrormaker-thread-0":
>   waiting to lock monitor 0x7f1f38029748 (object 0x00072a65d508, a 
> java.lang.Object),
>   which is held by "ZkClient-EventThread-58-"
> "ZkClient-EventThread-58-":
>   waiting to lock monitor 0x7f1e900249a8 (object 0x0007103c69c0, a 
> java.lang.Object),
>   which is held by "mirrormaker-thread-0"
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2129: KAFKA-4409; Fix deadlock between topic event handl...

2016-11-14 Thread jjkoshy
GitHub user jjkoshy opened a pull request:

https://github.com/apache/kafka/pull/2129

KAFKA-4409; Fix deadlock between topic event handling and shutdown in…

The consumer can deadlock on shutdown if a topic event fires during 
shutdown. The shutdown acquires the rebalance lock and then the 
topic-event-watcher lock. The topic event watcher acquires these in the reverse 
order. Shutdown should not need to acquire the topic-event-watcher’s lock - 
all it does is unsubscribes from topic events.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jjkoshy/kafka KAFKA-4409

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2129.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2129


commit 2a5a276822452b03d816c3ae2880a7b7bf15ea46
Author: Joel Koshy 
Date:   2016-11-14T17:48:16Z

KAFKA-4409; Fix deadlock between topic event handling and shutdown in the 
old consumer.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (KAFKA-4409) ZK consumer shutdown/topic event deadlock

2016-11-14 Thread Joel Koshy (JIRA)
Joel Koshy created KAFKA-4409:
-

 Summary: ZK consumer shutdown/topic event deadlock
 Key: KAFKA-4409
 URL: https://issues.apache.org/jira/browse/KAFKA-4409
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy


This only applies to the old zookeeper consumer. It is trivial enough to fix.

The consumer can deadlock on shutdown if a topic event fires during shutdown. 
The shutdown acquires the rebalance lock and then the topic-event-watcher lock. 
The topic event watcher acquires these in the reverse order. Shutdown should 
not need to acquire the topic-event-watcher’s lock - all it does is 
unsubscribes from topic events.

Stack trace:
{noformat}
"mirrormaker-thread-0":
at 
kafka.consumer.ZookeeperTopicEventWatcher.shutdown(ZookeeperTopicEventWatcher.scala:50)
- waiting to lock <0x00072a65d508> (a java.lang.Object)
at 
kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:216)
- locked <0x0007103c69c0> (a java.lang.Object)
at 
kafka.tools.MirrorMaker$MirrorMakerOldConsumer.cleanup(MirrorMaker.scala:519)
at 
kafka.tools.MirrorMaker$MirrorMakerThread$$anonfun$run$3.apply$mcV$sp(MirrorMaker.scala:441)
at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:76)
at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
at kafka.utils.CoreUtils$.swallowWarn(CoreUtils.scala:47)
at kafka.utils.Logging$class.swallow(Logging.scala:94)
at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:47)
at kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:441)
"ZkClient-EventThread-58-":
at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:639)
- waiting to lock <0x0007103c69c0> (a java.lang.Object)
at 
kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:982)
at 
kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.handleTopicEvent(ZookeeperConsumerConnector.scala:1048)
at 
kafka.consumer.ZookeeperTopicEventWatcher$ZkTopicEventListener.liftedTree1$1(ZookeeperTopicEventWatcher.scala:69)
at 
kafka.consumer.ZookeeperTopicEventWatcher$ZkTopicEventListener.handleChildChange(ZookeeperTopicEventWatcher.scala:65)
- locked <0x00072a65d508> (a java.lang.Object)
at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)
at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
Found one Java-level deadlock:
=
"mirrormaker-thread-0":
  waiting to lock monitor 0x7f1f38029748 (object 0x00072a65d508, a 
java.lang.Object),
  which is held by "ZkClient-EventThread-58-"
"ZkClient-EventThread-58-":
  waiting to lock monitor 0x7f1e900249a8 (object 0x0007103c69c0, a 
java.lang.Object),
  which is held by "mirrormaker-thread-0"
{noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-92 - Add per partition lag metrics to KafkaConsumer

2016-11-14 Thread Becket Qin
Hey Michael,

Thanks for the comments. Exposing the lag on the client side may serve some
cases a little different from monitoring. For example, one of the use case
we have was that an application has some high priority and low priority
topics to consume. They want to switch between consuming from high priority
and low priority topics alternately based on the lag on the partition. i.e.
if the consume has already caught up with the log end offset of a high
priority topic, the application will switch to consume from the low
priority topics. Otherwise it will continue consuming from the high
priority topics. We have seen a few other similar use cases that require a
programmatic access to the lag. Although people can always use
offsetsForTimes() to get the LEO, but it is more expensive call involving
an RPC and is a blocking call.

Thanks,

Jiangjie (Becket) Qin



On Mon, Nov 14, 2016 at 9:18 AM, Michael Pearce 
wrote:

> Should state I have no objections adding this client side, just more a
> question to why we don't look and propose to add this broker side also.
>
> Sent using OWA for iPhone
> 
> From: Michael Pearce 
> Sent: Monday, November 14, 2016 4:58:45 PM
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-92 - Add per partition lag metrics to
> KafkaConsumer
>
> Why do we not look to expose the lag broker side centrally?
>
> Eg like burrow.
>
> From an operations point it's a lot easier to monitor lag centrally than
> per application. Also then you'd be able to see lag of consumers not alive
> or stalled.
>
> The information if the consumer uses Kafka based or zookeeper offsets is
> available to the broker.
> 
> From: Becket Qin 
> Sent: Sunday, November 13, 2016 4:13:01 AM
> To: dev@kafka.apache.org
> Subject: [DISCUSS] KIP-92 - Add per partition lag metrics to KafkaConsumer
>
> Hi,
>
> We created KIP-92 to propose adding per partition lag metrics to
> KafkaConsumer.
>
> The KIP wiki link is the following:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 92+-+Add+per+partition+lag+metrics+to+KafkaConsumer
>
> Comments are welcome.
>
> Thanks,
>
> Jiangjie (Becket) Qin
> The information contained in this email is strictly confidential and for
> the use of the addressee only, unless otherwise indicated. If you are not
> the intended recipient, please do not read, copy, use or disclose to others
> this message or any attachment. Please also notify the sender by replying
> to this email or by telephone (+44(020 7896 0011) and then delete the email
> and any copies of it. Opinions, conclusion (etc) that do not relate to the
> official business of this company shall be understood as neither given nor
> endorsed by it. IG is a trading name of IG Markets Limited (a company
> registered in England and Wales, company number 04008957) and IG Index
> Limited (a company registered in England and Wales, company number
> 01190902). Registered address at Cannon Bridge House, 25 Dowgate Hill,
> London EC4R 2YA. Both IG Markets Limited (register number 195355) and IG
> Index Limited (register number 114059) are authorised and regulated by the
> Financial Conduct Authority.
>


Re: [DISCUSS] KIP-92 - Add per partition lag metrics to KafkaConsumer

2016-11-14 Thread Michael Pearce
Should state I have no objections adding this client side, just more a question 
to why we don't look and propose to add this broker side also.

Sent using OWA for iPhone

From: Michael Pearce 
Sent: Monday, November 14, 2016 4:58:45 PM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-92 - Add per partition lag metrics to KafkaConsumer

Why do we not look to expose the lag broker side centrally?

Eg like burrow.

>From an operations point it's a lot easier to monitor lag centrally than per 
>application. Also then you'd be able to see lag of consumers not alive or 
>stalled.

The information if the consumer uses Kafka based or zookeeper offsets is 
available to the broker.

From: Becket Qin 
Sent: Sunday, November 13, 2016 4:13:01 AM
To: dev@kafka.apache.org
Subject: [DISCUSS] KIP-92 - Add per partition lag metrics to KafkaConsumer

Hi,

We created KIP-92 to propose adding per partition lag metrics to
KafkaConsumer.

The KIP wiki link is the following:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-92+-+Add+per+partition+lag+metrics+to+KafkaConsumer

Comments are welcome.

Thanks,

Jiangjie (Becket) Qin
The information contained in this email is strictly confidential and for the 
use of the addressee only, unless otherwise indicated. If you are not the 
intended recipient, please do not read, copy, use or disclose to others this 
message or any attachment. Please also notify the sender by replying to this 
email or by telephone (+44(020 7896 0011) and then delete the email and any 
copies of it. Opinions, conclusion (etc) that do not relate to the official 
business of this company shall be understood as neither given nor endorsed by 
it. IG is a trading name of IG Markets Limited (a company registered in England 
and Wales, company number 04008957) and IG Index Limited (a company registered 
in England and Wales, company number 01190902). Registered address at Cannon 
Bridge House, 25 Dowgate Hill, London EC4R 2YA. Both IG Markets Limited 
(register number 195355) and IG Index Limited (register number 114059) are 
authorised and regulated by the Financial Conduct Authority.


Re: [DISCUSS] KIP-82 - Add Record Headers

2016-11-14 Thread Michael Pearce
Hi Roger,

The kip details/examples the original proposal for key spacing , not the new 
mentioned as per discussion namespace idea.

We will need to update the kip, when we get agreement this is a better approach 
(which seems to be the case if I have understood the general feeling in the 
conversation)

Re the variable ints, at very early stage we did think about this. I think the 
added complexity for the saving isn't worth it. I'd rather go with, if we want 
to reduce overheads and size int16 (2bytes) keys as it keeps it simple.

On the note of no headers, there is as per the kip as we use an attribute bit 
to denote if headers are present or not as such provides a zero overhead 
currently if headers are not used.

I think as radai mentions would be good first if we can get clarity if do we 
now have general consensus that (1) headers are worthwhile and useful, and (2) 
we want it as a top level entity.


Just to state the obvious i believe (1) headers are worthwhile and (2) agree as 
a top level entity.

Cheers
Mike

From: Roger Hoover 
Sent: Wednesday, November 9, 2016 9:10:47 PM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-82 - Add Record Headers

Sorry for going a little in the weeds but thanks for the replies regarding
varint.

Agreed that a prefix and {int, int} can be the same.  It doesn't look like
that's what the KIP is saying the "Open" section.   The example shows 211
for New Relic and 210002 for App Dynamics implying that the New Relic
organization will have only a single header id to work with.  Or is 211
a prefix?  The main point of a namespace or prefix is to reduce the
overhead of config mapping or registration depending on how
namespaces/prefixes are managed.

Would love to hear more feedback on the higher-level questions though...

Cheers,

Roger


On Wed, Nov 9, 2016 at 11:38 AM, radai  wrote:

> I think this discussion is getting a bit into the weeds on technical
> implementation details.
> I'd liek to step back a minute and try and establish where we are in the
> larger picture:
>
> (re-wording nacho's last paragraph)
> 1. are we all in agreement that headers are a worthwhile and useful
> addition to have? this was contested early on
> 2. are we all in agreement on headers as top level entity vs headers
> squirreled-away in V?
>
> if there are still concerns around these #2 points (#jay? #jun?)?
>
> (and now back to our normal programming ...)
>
> varints are nice. having said that, its adding complexity (see
> https://github.com/addthis/stream-lib/blob/master/src/
> main/java/com/clearspring/analytics/util/Varint.java
> as 1st google result) and would require anyone writing other clients (C?
> Python? Go? Bash? ;-) ) to get/implement the same, and for relatively
> little gain (int vs string is order of magnitude, this isnt).
>
> int namespacing vs {int, int} namespacing are basically the same thing -
> youre just namespacing an int64 and giving people while 2^32 ranges at a
> time. the part i like about this is letting people have a large swath of
> numbers with one registration so they dont have to come back for every
> single plugin/header they want to "reserve".
>
>
> On Wed, Nov 9, 2016 at 11:01 AM, Roger Hoover 
> wrote:
>
> > Since some of the debate has been about overhead + performance, I'm
> > wondering if we have considered a varint encoding (
> > https://developers.google.com/protocol-buffers/docs/encoding#varints)
> for
> > the header length field (int32 in the proposal) and for header ids?  If
> you
> > don't use headers, the overhead would be a single byte and for each
> header
> > id < 128 would also need only a single byte?
> >
> >
> >
> > On Wed, Nov 9, 2016 at 6:43 AM, radai 
> wrote:
> >
> > > @magnus - and very dangerous (youre essentially downloading and
> executing
> > > arbitrary code off the internet on your servers ... bad idea without a
> > > sandbox, even with)
> > >
> > > as for it being a purely administrative task - i disagree.
> > >
> > > i wish it would, really, because then my earlier point on the
> complexity
> > of
> > > the remapping process would be invalid, but at linkedin, for example,
> we
> > > (the team im in) run kafka as a service. we dont really know what our
> > users
> > > (developing applications that use kafka) are up to at any given moment.
> > it
> > > is very possible (given the existance of headers and a corresponding
> > plugin
> > > ecosystem) for some application to "equip" their producers and
> consumers
> > > with the required plugin without us knowing. i dont mean to imply thats
> > > bad, i just want to make the point that its not as simple keeping it in
> > > sync across a large-enough organization.
> > >
> > >
> > > On Wed, Nov 9, 2016 at 6:17 AM, Magnus Edenhill 
> > > wrote:
> > >
> > > > I think there is a piece missing in the Strings 

[jira] [Commented] (KAFKA-4404) Add knowledge of sign to numeric schema types

2016-11-14 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15664439#comment-15664439
 ] 

Ewen Cheslack-Postava commented on KAFKA-4404:
--

What is the motivation for this? Support for signed and unsigned types is 
spotty at best in serialization formats, see 
https://cwiki.apache.org/confluence/display/KAFKA/Copycat+Data+API for some of 
the survey done when initially working on the API (basically protobufs is the 
only format that has complete support). Being exhaustive in type support ends 
up being a burden for connector and converter developers as it requires more 
effort to support all the types, so I think it's important to make sure any 
additions to the set of supported types are well motivated.

> Add knowledge of sign to numeric schema types
> -
>
> Key: KAFKA-4404
> URL: https://issues.apache.org/jira/browse/KAFKA-4404
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.10.0.1
>Reporter: Andy Bryant
>Assignee: Ewen Cheslack-Postava
>Priority: Minor
>
> For KafkaConnect schemas there is currently no concept of whether a numeric 
> field is signed or unsigned. 
> Add an additional `signed` attribute (like optional) or make it explicit that 
> numeric types must be signed.
> You could encode this as a parameter on the schema but this would not be 
> standard across all connectors.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-92 - Add per partition lag metrics to KafkaConsumer

2016-11-14 Thread Michael Pearce
Why do we not look to expose the lag broker side centrally?

Eg like burrow.

>From an operations point it's a lot easier to monitor lag centrally than per 
>application. Also then you'd be able to see lag of consumers not alive or 
>stalled.

The information if the consumer uses Kafka based or zookeeper offsets is 
available to the broker.

From: Becket Qin 
Sent: Sunday, November 13, 2016 4:13:01 AM
To: dev@kafka.apache.org
Subject: [DISCUSS] KIP-92 - Add per partition lag metrics to KafkaConsumer

Hi,

We created KIP-92 to propose adding per partition lag metrics to
KafkaConsumer.

The KIP wiki link is the following:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-92+-+Add+per+partition+lag+metrics+to+KafkaConsumer

Comments are welcome.

Thanks,

Jiangjie (Becket) Qin
The information contained in this email is strictly confidential and for the 
use of the addressee only, unless otherwise indicated. If you are not the 
intended recipient, please do not read, copy, use or disclose to others this 
message or any attachment. Please also notify the sender by replying to this 
email or by telephone (+44(020 7896 0011) and then delete the email and any 
copies of it. Opinions, conclusion (etc) that do not relate to the official 
business of this company shall be understood as neither given nor endorsed by 
it. IG is a trading name of IG Markets Limited (a company registered in England 
and Wales, company number 04008957) and IG Index Limited (a company registered 
in England and Wales, company number 01190902). Registered address at Cannon 
Bridge House, 25 Dowgate Hill, London EC4R 2YA. Both IG Markets Limited 
(register number 195355) and IG Index Limited (register number 114059) are 
authorised and regulated by the Financial Conduct Authority.


Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag

2016-11-14 Thread Michael Pearce
I agree with Mayuresh.

I don't see how having a magic byte helps here.

What we are saying is that on day 1 after an upgrade both tombstone flag or a 
null value will be treated as a marker to delete on compacted topic.

During this time we expect organisations to migrate themselves over onto the 
new producers and consumers that call the new Api, changing their application 
logic. during this point producers should start setting the tombstone marker 
and consumers to start behaving based on the tombstone marker.

Only after all producers and consumers are upgraded then we enter phase 2 
disabling the use of a null value as a delete marker. And the old apis would 
not be allowed to be called as the broker now expects all clients to be using 
latest api only.

At this second phase stage only also we can support a null value in a compacted 
topic without it being treated as deletion. We solely base on the tombstone 
marker.



From: Mayuresh Gharat 
Sent: Saturday, November 12, 2016 2:18:19 AM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag

I think "In second stage we move on to supporting only the attribute flag
for log
compaction." means that it will no longer support request from older
clients.

Thanks,

Mayuresh

On Fri, Nov 11, 2016 at 10:02 AM, Becket Qin  wrote:

> Hey Michael,
>
> The way Kafka implements backwards compatibility is to let the brokers
> support old protocols. So the brokers have to support older clients that do
> not understand the new attribute bit. That means we will not be able to get
> away with the null value as a tombstone. So we need a good definition of
> whether we should treat it as a tombstone or not. This is why I think we
> need a magic value bump.
>
> My confusion on the second stage is exactly about "we want to end up just
> supporting tombstone marker (not both)", or in the original statement: "2)
> In second stage we move on to supporting only the attribute flag for log
> compaction." - We will always support the null value as a tombstone as long
> as the message format version (i.e. magic byte) is less than 2 for the
> reason I mentioned above.
>
> Although the way to interpret the message bytes at producing time can be
> inferred from the the ProduceRequest version, this version information
> won't be stored in the broker. So when an old consumer comes, we need to
> decide whether we have to do down conversion or not.. With a clear existing
> configuration of message version config (i.e. magic value), we know for
> sure when to down convert the messages to adapt to older clients. Otherwise
> we will have to always scan all the messages. It would probably work but
> relies on guess or inference.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Fri, Nov 11, 2016 at 8:42 AM, Mayuresh Gharat <
> gharatmayures...@gmail.com
> > wrote:
>
> > Sounds good Michael.
> >
> > Thanks,
> >
> > Mayuresh
> >
> > On Fri, Nov 11, 2016 at 12:48 AM, Michael Pearce 
> > wrote:
> >
> > > @Mayuresh i don't think you've missed anything -
> > >
> > > as per earlier in the discussion.
> > >
> > > We're providing new api versions, but not planning on bumping the magic
> > > number as there is no structural changes, we are simply using up a new
> > > attribute bit (as like adding new compression support just uses up
> > > additional attribute bits)
> > >
> > >
> > > I think also difference between null vs 0 length byte array is covered.
> > >
> > > @Becket,
> > >
> > > The two stage approach is because we want to end up just supporting
> > > tombstone marker (not both)
> > >
> > > But we accept we need to allow organisations and systems a period of
> > > transition (this is what stage 1 provides)
> > >
> > >
> > >
> > >
> > > 
> > > From: Mayuresh Gharat 
> > > Sent: Thursday, November 10, 2016 8:57 PM
> > > To: dev@kafka.apache.org
> > > Subject: Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag
> > >
> > > I am not sure if we are bumping magicByte value as the KIP does not
> > mention
> > > it (If I didn't miss anything).
> > >
> > > @Nacho : I did not understand what you meant by : Conversion from one
> to
> > > the other may be possible but I would rather not have to do it.
> > >
> > > You will have to do the conversion for the scenario that Becket has
> > > mentioned above where an old consumer talks to the new broker, right?
> > >
> > > Thanks,
> > >
> > > Mayuresh
> > >
> > >
> > > On Thu, Nov 10, 2016 at 11:54 AM, Becket Qin 
> > wrote:
> > >
> > > > Nacho,
> > > >
> > > > In Kafka protocol, a negative length is null, a zero length means
> empty
> > > > byte array.
> > > >
> > > > I am still confused about the two stages. It seems that the broker
> only
> > > > needs to understand three versions of messages.
> > > > 1. MagicValue=0 - no timestamp, 

[jira] [Created] (KAFKA-4408) KTable doesn't work with ProcessorTopologyTestDriver in Kafka 0.10.1.0

2016-11-14 Thread Byron Nikolaidis (JIRA)
Byron Nikolaidis created KAFKA-4408:
---

 Summary: KTable doesn't work with ProcessorTopologyTestDriver in 
Kafka 0.10.1.0
 Key: KAFKA-4408
 URL: https://issues.apache.org/jira/browse/KAFKA-4408
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.1.0
 Environment: Linux
Reporter: Byron Nikolaidis


In Kafka 0.10.1.0, the ProcessorTopologyTestDriver no longer works with 
KTables.  The below test code worked fine under Kafka 0.10.0.1 but now produces 
this error:

Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: 
task [0_0] Could not find partition info for topic: alertInputTopic
at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:174)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:123)
at 
org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175)
at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:85)
at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:62)
at 
org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81)
at 
org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:120)
at 
org.apache.kafka.test.ProcessorTopologyTestDriver.(ProcessorTopologyTestDriver.java:174)
at 
mil.navy.icap.kafka.streams.processor.track.ProcessorDriverTest2.main(ProcessorDriverTest2.java:41)

{code}
package mil.navy.icap.kafka.streams.processor.track;


import java.io.IOException;
import java.util.Properties;


import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.test.ProcessorTopologyTestDriver;


public class ProcessorDriverTest2 {
 
 public static void main(String[] args) throws IOException, 
InterruptedException {


 System.out.println("ProcessorDriverTest2");
 
 Properties props = new Properties();
 props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "ProcessorDriverTest2");
 props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
 props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
 props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
 props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
 props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
 
 StreamsConfig streamsConfig = new StreamsConfig(props);
 
 // topology
 KStreamBuilder kstreamBuilder = new KStreamBuilder();
 StringSerde stringSerde = new StringSerde();
 KTable table = kstreamBuilder.table(stringSerde,
 stringSerde, "alertInputTopic");
 table.to(stringSerde, stringSerde, "alertOutputTopic");
 
 // create test driver
 ProcessorTopologyTestDriver testDriver = new ProcessorTopologyTestDriver(
 streamsConfig, 
 kstreamBuilder, 
 "alertStore");


 StringSerializer serializer = new StringSerializer();
 StringDeserializer deserializer = new StringDeserializer();


 // send data to input topic
 testDriver.process("alertInputTopic", 
 "the Key", "the Value", serializer, serializer);
 
 // read data from output topic
 ProducerRecord rec = testDriver.readOutput("alertOutputTopic", 
 deserializer, deserializer);
 
 System.out.println("rec: " + rec);
 }
}
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4407) Java consumer does not always send LEAVE_GROUP request during shut down

2016-11-14 Thread Igor (JIRA)
Igor created KAFKA-4407:
---

 Summary: Java consumer does not always send LEAVE_GROUP request 
during shut down
 Key: KAFKA-4407
 URL: https://issues.apache.org/jira/browse/KAFKA-4407
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.9.0.1, 0.10.1.0
Reporter: Igor


Normally, KafkaConsumer.close method sends LEAVE_GROUP request to the broker 
during shut down, since the method AbstractCoordinator.maybeLeaveGroup is 
called inside AbstractCoordinator.close method. However, maybeLeaveGroup does 
not actually care if request is sent, and since network client is closed nearly 
after the consumer coordinator during shut down, the request could be never 
sent under certain circumstances.
As a result, Kafka broker will wait for session.timeout to remove the consumer 
from its group, and if the consumer reconnects within this time interval, it 
won't receive just came messages.
If waiting for LEAVE_GROUP request is not a desired option, could its timeout 
be at least configured?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-72 - Allow putting a bound on memory consumed by Incoming requests

2016-11-14 Thread Rajini Sivaram
Open point 1. I would just retain the current long value that specifies
queued.max.bytes as long and not as %heap since it is simple and easy to
use. And keeps it consistent with other ".bytes" configs.

Point 3. ssl buffers - I am not quite sure the implementation looks
correct. hasBytesBuffered() is checking position() of buffers == 0. And the
code checks this only when poll with a timeout returns (adding a delay when
there is nothing else to read).
But since this and open point 2 (optimization) are implementation details,
they can be looked at during PR review.

It will be good to add SSL testing to the test plan as well, since there is
additional code to test for SSL.


On Fri, Nov 11, 2016 at 9:03 PM, radai  wrote:

> ok, i've made the following changes:
>
> 1. memory.pool.class.name has been removed
> 2. the code now only uses SimpleMemoryPool. the gc variant is left (unused)
> as a developement aid and is unsettable via configuration.
> 3. I've resolved the issue of stale data getting stuck in intermediate
> (ssl) buffers.
> 4. default value for queued.max.bytes is -1, so off by default. any <=0
> value is interpreted as off by the underlying code.
>
> open points:
>
> 1. the kafka config framework doesnt allow a value to be either long or
> double, so in order to pull off the queued.max.bytes = 100 or
> queued.max.bytes = 0.3 thing i'd need to define the config as type string,
> which is ugly to me. do we want to support setting queued.max.bytes to % of
> heap ? if so, by way of making queued.max.bytes of type string, or by way
> of a 2nd config param (with the resulting either/all/combination?
> validation). my personal opinion is string because i think a single
> queued.max.bytes with overloaded meaning is more understandable to users.
> i'll await other people's opinions before doing anything.
> 2. i still need to evaluate rajini's optimization. sounds doable.
>
> asides:
>
> 1. i think you guys misunderstood the intent behind the gc pool. it was
> never meant to be a magic pool that automatically releases buffers (because
> just as rajini stated the performance implications would be horrible). it
> was meant to catch leaks early. since that is indeed a dev-only concern it
> wont ever get used in production.
> 2. i said this on some other kip discussion: i think the nice thing about
> the pool API is it "scales" from just keeping a memory bound to actually
> re-using buffers without changing the calling code. i think actuallypooling
> large buffers will result in a significant performance impact, but thats
> outside the scope of this kip. at that point i think more pool
> implementations (that actually pool) would be written. i agree with the
> ideal of exposing as few knobs as possible, but switching pools (or pool
> params) for tuning may happen at some later point.
>
>
>
> On Fri, Nov 11, 2016 at 11:44 AM, Rajini Sivaram <
> rajinisiva...@googlemail.com> wrote:
>
> > 13. At the moment, I think channels are not muted if:
> > channel.receive != null && channel.receive.buffer != null
> > This mutes all channels that aren't holding onto a incomplete buffer.
> They
> > may or may not have read the 4-byte size.
> >
> > I was thinking you could avoid muting channels if:
> > channel.receive == null || channel.receive.size.remaining()
> > This will not mute channels that are holding onto a buffer (as above). In
> > addition, it will not mute channels that haven't read the 4-byte size. A
> > client that is closed gracefully while the pool is full will not be muted
> > in this case and the server can process close without waiting for the
> pool
> > to free up. Once the 4-byte size is read, the channel will be muted if
> the
> > pool is still out of memory - for each channel, at most one failed read
> > attempt would be made while the pool is out of memory. I think this would
> > also delay muting of SSL channels since they can continue to read into
> > their (already allocated) network buffers and unwrap the data and block
> > only when they need to allocate a buffer from the pool.
> >
> > On Fri, Nov 11, 2016 at 6:00 PM, Jay Kreps  wrote:
> >
> > > Hey Radai,
> > >
> > > +1 on deprecating and eventually removing the old config. The intention
> > was
> > > absolutely bounding memory usage. I think having two ways of doing
> this,
> > > one that gives a crisp bound on memory and one that is hard to reason
> > about
> > > is pretty confusing. I think people will really appreciate having one
> > > config which instead lets them directly control the thing they actually
> > > care about (memory).
> > >
> > > I also want to second Jun's concern on the complexity of the self-GCing
> > > memory pool. I wrote the memory pool for the producer. In that area the
> > > pooling of messages is the single biggest factor in performance of the
> > > client so I believed it was worth some sophistication/complexity if
> there
> > > was performance payoff. All the 

[VOTE] KIP-84: Support SASL SCRAM mechanisms

2016-11-14 Thread Rajini Sivaram
Hi all,

I would like to initiate the voting process for *KIP-84: Support SASL/SCRAM
mechanisms*:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-84%3A+Support+SASL+SCRAM+mechanisms

This KIP adds support for four SCRAM mechanisms (SHA-224, SHA-256, SHA-384
and SHA-512) for SASL authentication, giving more choice for users to
configure security. When delegation token support is added to Kafka, SCRAM
will also support secure authentication using delegation tokens.

Thank you...

Regards,

Rajini


[jira] [Created] (KAFKA-4406) Add support for custom Java Security Providers in configuration

2016-11-14 Thread Magnus Reftel (JIRA)
Magnus Reftel created KAFKA-4406:


 Summary: Add support for custom Java Security Providers in 
configuration
 Key: KAFKA-4406
 URL: https://issues.apache.org/jira/browse/KAFKA-4406
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.10.0.1
Reporter: Magnus Reftel
Priority: Minor


Currently, the only way to add a custom security provider is though adding a 
-Djava.security.properties= option to the command line, e.g. though 
KAFKA_OPTS. It would be more convenient if this could be done though the config 
file, like all the other SSL related options.
I propose adding a new configuration option, ssl.provider.classes, which holds 
a list of names of security provider classes that will be loaded, instantiated, 
and added before creating SSL contexts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-4268) huge server.logs during the error frequently happen "Message format version for partition 200 not found"

2016-11-14 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-4268.

Resolution: Duplicate

Marking as a duplicate of KAFKA-4362 even though this was filed first as 
KAFKA-4362 has more information and a PR has been submitted.

> huge server.logs during the error frequently happen "Message format version 
> for partition 200 not found" 
> -
>
> Key: KAFKA-4268
> URL: https://issues.apache.org/jira/browse/KAFKA-4268
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.1
> Environment: Ubuntu 14.04.2 LTS
>Reporter: Peyton Peng
>
> We have three broker running on three different ubuntu servers, there are 
> about more than 200 topics served for different type of data. Each topic have 
> 3 partitions. While I added a new consumer with the group "wlas" on two 
> different machine(centos), which is running within flume, to subscribe all 
> above topics in one consumer, after a while, we got the exception as below, 
> the exception throws in server.log, and loops frequently, which cause the 
> disk space leak.
> I doubt if the issue related with "one consumer subscribe so many topics", 
> and running on two machine.
> (even got this exception, the topic data consume logic is running well)
> -- exception segment:
> [2016-10-07 08:49:27,982] INFO [GroupCoordinator 1]: Assignment received from 
> leader for group wlas for generation 10 (kafka.coordinator.GroupCoordinator)
> [2016-10-07 08:49:27,982] ERROR [KafkaApi-1] Error when handling request 
> {group_id=wlas,generation_id=10,member_id=flume-1011-6ec8e841-9fa8-4014-97b8-211b2ee2465e,group_assignment=[{member_id=flume-1011-6ec8e841-9fa8-4014-97b8-211b2ee2465e,member_assignment=java.nio.HeapByteBuffer[pos=0
>  lim=2301 
> cap=5023]},{member_id=flume-1009-7f6eacb5-8885-4131-8286-3848ed17b2a9,member_assignment=java.nio.HeapByteBuffer[pos=0
>  lim=2669 cap=2669]}]} (kafka.server.KafkaApis)
> java.lang.IllegalArgumentException: Message format version for partition 200 
> not found
>at 
> kafka.coordinator.GroupMetadataManager$$anonfun$14.apply(GroupMetadataManager.scala:633)
>at 
> kafka.coordinator.GroupMetadataManager$$anonfun$14.apply(GroupMetadataManager.scala:633)
>at scala.Option.getOrElse(Option.scala:121)
>at 
> kafka.coordinator.GroupMetadataManager.kafka$coordinator$GroupMetadataManager$$getMessageFormatVersionAndTimestamp(GroupMetadataManager.scala:632)
>at 
> kafka.coordinator.GroupMetadataManager.prepareStoreGroup(GroupMetadataManager.scala:171)
>at 
> kafka.coordinator.GroupCoordinator.doSyncGroup(GroupCoordinator.scala:276)
>at 
> kafka.coordinator.GroupCoordinator.handleSyncGroup(GroupCoordinator.scala:240)
>at kafka.server.KafkaApis.handleSyncGroupRequest(KafkaApis.scala:933)
>at kafka.server.KafkaApis.handle(KafkaApis.scala:90)
>at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>at java.lang.Thread.run(Thread.java:745) 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4405) Kafka consumer improperly send prefetch request

2016-11-14 Thread ysysberserk (JIRA)
ysysberserk created KAFKA-4405:
--

 Summary: Kafka consumer improperly send prefetch request
 Key: KAFKA-4405
 URL: https://issues.apache.org/jira/browse/KAFKA-4405
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.10.0.1
Reporter: ysysberserk


Now kafka consumer has added max.poll.records to limit the count of messages 
return by poll().

According to KIP-41, to implement  max.poll.records, the prefetch request 
should only be sent when the total number of retained records is less than 
max.poll.records.

But in the code of 0.10.0.1 , the consumer will send a prefetch request if it 
retained any records and never check if total number of retained records is 
less than max.poll.records..

If max.poll.records is set to a count much less than the count of message 
fetched , the poll() loop will send a lot of requests than expected.

So before sending a  prefetch request , the consumer must check if total number 
of retained records is less than max.poll.records.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)