[jira] [Commented] (KAFKA-2376) Add Copycat metrics

2017-02-02 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15851186#comment-15851186
 ] 

Ewen Cheslack-Postava commented on KAFKA-2376:
--

Not quite a direct comparison, but some helpful ideas about framework-level 
metrics we could add:
{quote}
One thing we found super helpful in samza was giving a metric that rolls up the 
percent of time getting records, time processing records, and time sending 
records to be able to debug these bottleneck issues. Technically you can kind 
of get this from the clients, but might be nice to roll that up to make it easy.
{quote}

We still will need to expose connector-specific metrics, but these are generic 
ones that can add a lot of value.

> Add Copycat metrics
> ---
>
> Key: KAFKA-2376
> URL: https://issues.apache.org/jira/browse/KAFKA-2376
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.9.0.0
>Reporter: Ewen Cheslack-Postava
>  Labels: needs-kip
>
> Copycat needs good metrics for monitoring since that will be the primary 
> insight into the health of connectors as they copy data.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: Trying to understand design decision about producer ack and min.insync.replicas

2017-02-02 Thread Ewen Cheslack-Postava
On Thu, Feb 2, 2017 at 11:21 PM, James Cheng  wrote:

> Ewen,
>
> Ah right, that's a good point.
>
> My initial reaction to your examples was that "well, those should be in
> separate topics", but then I realized that people choose their topics for a
> variety of reasons. Sometimes they organize it based on their producers,
> sometimes they organize it based on the nature of the data, but sometimes
> (as you gave examples about), they may organize it based on the consuming
> application. And there are valid reason to want different data types in a
> single topic:
>
> 1) You get global ordering
> 2) You get persistent ordering in the case of re-reads (where as reading 2
> topics would cause different ordering upon re-reads.)
> 3) Logically-related data types all co-located.
>
> I do still think it'd be convenient to only have to set
> min.insync.replicas on a topic and not have to require producing
> applications to also set acks=all. It'd then be a single thing you have to
> configure, instead of the current 2 things. (since, as currently
> implemented, you have to set both things, in order to achieve high
> durability.)
>

I entirely agree, I think the default should be acks=all and then this
would be true :) Similar to the unclean leader election setting, I think
defaulting to durable by default is a better choice. I understand
historically why a different choice was made (Kafka didn't start out as a
replicated, durable storage system), but given how it has evolved I think
durable by default would be a better choice on both the broker and producer.


>
> But I admit that it's hard to find the balance of 
> features/simplicity/complexity,
> to handle all the use cases.
>

Perhaps the KIP-106 adjustment to unclean leader election could benefit
from a sister KIP for adjusting the default producer acks setting?

Not sure how popular it would be, but I would be in favor.

-Ewen


>
> Thanks,
> -James
>
> > On Feb 2, 2017, at 9:42 PM, Ewen Cheslack-Postava 
> wrote:
> >
> > James,
> >
> > Great question, I probably should have been clearer. log data is an
> example
> > where the app (or even instance of the app) might know best what the
> right
> > tradeoff is. Depending on your strategy for managing logs, you may or may
> > not be mixing multiple logs (and logs from different deployments) into
> the
> > same topic. For example, if you key by application, then you have an easy
> > way to split logs up while still getting a global feed of log messages.
> > Maybe logs from one app are really critical and we want to retry, but
> from
> > another app are just a nice to have.
> >
> > There are other examples even within a single app. For example, a gaming
> > company might report data from a user of a game to the same topic but
> want
> > 2 producers with different reliability levels (and possibly where the
> > ordering constraints across the two sets that might otherwise cause you
> to
> > use a single consumer are not an issue). High frequency telemetry on a
> > player might be desirable to have, but not the end of the world if some
> is
> > lost. In contrast, they may want a stronger guarantee for, e.g., sometime
> > like chat messages, where they want to have a permanent record of them in
> > all circumstances.
> >
> > -Ewen
> >
> > On Fri, Jan 27, 2017 at 12:59 AM, James Cheng 
> wrote:
> >
> >>
> >>> On Jan 27, 2017, at 12:18 AM, Ewen Cheslack-Postava  >
> >> wrote:
> >>>
> >>> On Thu, Jan 26, 2017 at 4:23 PM, Luciano Afranllie <
> >> listas.luaf...@gmail.com
>  wrote:
> >>>
>  I was thinking about the situation where you have less brokers in the
> >> ISR
>  list than the number set in min.insync.replicas.
> 
>  My idea was that if I, as an administrator, for a given topic, want to
>  favor durability over availability, then if that topic has less ISR
> than
>  the value set in min.insync.replicas I may want to stop producing to
> the
>  topic. In the way min.insync.replicas and ack work, I need to
> coordinate
>  with all producers in order to achieve this. There is no way (or I
> don't
>  know it) to globally enforce stop producing to a topic if it is under
>  replicated.
> 
>  I don't see why, for the same topic, some producers might want get an
> >> error
>  when the number of ISR is below min.insync.replicas while other
> >> producers
>  don't. I think it could be more useful to be able to set that ALL
> >> producers
>  should get an error when a given topic is under replicated so they
> stop
>  producing, than for a single producer to get an error when ANY topic
> is
>  under replicated. I don't have a lot of experience with Kafka so I may
> >> be
>  missing some use cases.
> 
> >>>
> >>> It's also a matter of not having to do a ton of configuration on a
> >>> per-topic basis. Putting some control in the producer apps hands means
> >> you
> 

Re: [VOTE] KIP-54: Sticky Partition Assignment Strategy

2017-02-02 Thread Guozhang Wang
+1 (binding).

Cheers.

On Thu, Feb 2, 2017 at 10:35 PM, Ewen Cheslack-Postava 
wrote:

> +1
>
> I don't think this solves all the stickiness/incremental rebalancing
> problems we'll eventually want to address, but it's a nice improvement,
> would be a benefit for a fair number of applications, and as it's a clean
> extension to the existing options it doesn't come with any significant
> compatibility concerns.
>
> (Also, this should bump this thread, which Jeff Widman was wondering about.
> It's lacking at least 1 more binding vote before it could pass.)
>
> -Ewen
>
> On Thu, Sep 22, 2016 at 1:43 AM, Mickael Maison 
> wrote:
>
> > +1 (non-binding)
> >
> > On Thu, Sep 15, 2016 at 8:32 PM, Bill Bejeck  wrote:
> > > +1
> > >
> > > On Thu, Sep 15, 2016 at 5:16 AM, Rajini Sivaram <
> > > rajinisiva...@googlemail.com> wrote:
> > >
> > >> +1 (non-binding)
> > >>
> > >> On Wed, Sep 14, 2016 at 12:37 AM, Jason Gustafson  >
> > >> wrote:
> > >>
> > >> > Thanks for the KIP. +1 from me.
> > >> >
> > >> > On Tue, Sep 13, 2016 at 12:05 PM, Vahid S Hashemian <
> > >> > vahidhashem...@us.ibm.com> wrote:
> > >> >
> > >> > > Hi all,
> > >> > >
> > >> > > Thanks for providing feedback on this KIP so far.
> > >> > > The KIP was discussed during the KIP meeting today and there
> doesn't
> > >> seem
> > >> > > to be any unaddressed issue at this point.
> > >> > >
> > >> > > So I would like to initiate the voting process.
> > >> > >
> > >> > > The KIP can be found here:
> > >> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > >> > > 54+-+Sticky+Partition+Assignment+Strategy
> > >> > > And the full discussion thread is here:
> > >> > > https://www.mail-archive.com/dev@kafka.apache.org/msg47607.html
> > >> > >
> > >> > > Thanks.
> > >> > > --Vahid
> > >> > >
> > >> > >
> > >> >
> > >>
> > >>
> > >>
> > >> --
> > >> Regards,
> > >>
> > >> Rajini
> > >>
> >
>



-- 
-- Guozhang


Re: Trying to understand design decision about producer ack and min.insync.replicas

2017-02-02 Thread James Cheng
Ewen,

Ah right, that's a good point.

My initial reaction to your examples was that "well, those should be in 
separate topics", but then I realized that people choose their topics for a 
variety of reasons. Sometimes they organize it based on their producers, 
sometimes they organize it based on the nature of the data, but sometimes (as 
you gave examples about), they may organize it based on the consuming 
application. And there are valid reason to want different data types in a 
single topic:

1) You get global ordering
2) You get persistent ordering in the case of re-reads (where as reading 2 
topics would cause different ordering upon re-reads.)
3) Logically-related data types all co-located.

I do still think it'd be convenient to only have to set min.insync.replicas on 
a topic and not have to require producing applications to also set acks=all. 
It'd then be a single thing you have to configure, instead of the current 2 
things. (since, as currently implemented, you have to set both things, in order 
to achieve high durability.) 

But I admit that it's hard to find the balance of 
features/simplicity/complexity, to handle all the use cases.

Thanks,
-James

> On Feb 2, 2017, at 9:42 PM, Ewen Cheslack-Postava  wrote:
> 
> James,
> 
> Great question, I probably should have been clearer. log data is an example
> where the app (or even instance of the app) might know best what the right
> tradeoff is. Depending on your strategy for managing logs, you may or may
> not be mixing multiple logs (and logs from different deployments) into the
> same topic. For example, if you key by application, then you have an easy
> way to split logs up while still getting a global feed of log messages.
> Maybe logs from one app are really critical and we want to retry, but from
> another app are just a nice to have.
> 
> There are other examples even within a single app. For example, a gaming
> company might report data from a user of a game to the same topic but want
> 2 producers with different reliability levels (and possibly where the
> ordering constraints across the two sets that might otherwise cause you to
> use a single consumer are not an issue). High frequency telemetry on a
> player might be desirable to have, but not the end of the world if some is
> lost. In contrast, they may want a stronger guarantee for, e.g., sometime
> like chat messages, where they want to have a permanent record of them in
> all circumstances.
> 
> -Ewen
> 
> On Fri, Jan 27, 2017 at 12:59 AM, James Cheng  wrote:
> 
>> 
>>> On Jan 27, 2017, at 12:18 AM, Ewen Cheslack-Postava 
>> wrote:
>>> 
>>> On Thu, Jan 26, 2017 at 4:23 PM, Luciano Afranllie <
>> listas.luaf...@gmail.com
 wrote:
>>> 
 I was thinking about the situation where you have less brokers in the
>> ISR
 list than the number set in min.insync.replicas.
 
 My idea was that if I, as an administrator, for a given topic, want to
 favor durability over availability, then if that topic has less ISR than
 the value set in min.insync.replicas I may want to stop producing to the
 topic. In the way min.insync.replicas and ack work, I need to coordinate
 with all producers in order to achieve this. There is no way (or I don't
 know it) to globally enforce stop producing to a topic if it is under
 replicated.
 
 I don't see why, for the same topic, some producers might want get an
>> error
 when the number of ISR is below min.insync.replicas while other
>> producers
 don't. I think it could be more useful to be able to set that ALL
>> producers
 should get an error when a given topic is under replicated so they stop
 producing, than for a single producer to get an error when ANY topic is
 under replicated. I don't have a lot of experience with Kafka so I may
>> be
 missing some use cases.
 
>>> 
>>> It's also a matter of not having to do a ton of configuration on a
>>> per-topic basis. Putting some control in the producer apps hands means
>> you
>>> can set reasonably global defaults which make sense for apps that require
>>> stronger durability while letting cases that have lower requirements
>> still
>>> benefit from the durability before consumers see data but not block
>>> producers because the producer chooses lower requirements. WIthout
>>> requiring the ability to make config changes on the Kafka brokers (which
>>> may be locked down and restricted only to Kafka admins), the producer
>>> application can choose to accept weaker guarantees based on the tradeoffs
>>> it needs to make.
>>> 
>> 
>> I'm not sure I follow, Ewen.
>> 
>> I do agree that if I set min.insync.replicas at a broker level, then of
>> course I would like individual producers to decide whether their topic
>> (which inherits from the global setting) should reject writes if that topic
>> has size(ISR)> 
>> But on a topic-level... are you saying that if a particular topic has

Build failed in Jenkins: kafka-trunk-jdk8 #1246

2017-02-02 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] MINOR: added upgrade and API changes to docs

--
[...truncated 1447 lines...]
kafka.api.ConsumerBounceTest > testSeekAndCommitWithBrokerFailures STARTED

kafka.api.ConsumerBounceTest > testSeekAndCommitWithBrokerFailures PASSED

kafka.api.ConsumerBounceTest > testConsumptionWithBrokerFailures STARTED

kafka.api.ConsumerBounceTest > testConsumptionWithBrokerFailures PASSED

kafka.api.SslConsumerTest > testCoordinatorFailover STARTED

kafka.api.SslConsumerTest > testCoordinatorFailover PASSED

kafka.api.SslConsumerTest > testSimpleConsumption STARTED

kafka.api.SslConsumerTest > testSimpleConsumption PASSED

kafka.api.AdminClientTest > testDescribeConsumerGroup STARTED

kafka.api.AdminClientTest > testDescribeConsumerGroup PASSED

kafka.api.AdminClientTest > testListGroups STARTED

kafka.api.AdminClientTest > testListGroups PASSED

kafka.api.AdminClientTest > testListAllBrokerVersionInfo STARTED

kafka.api.AdminClientTest > testListAllBrokerVersionInfo PASSED

kafka.api.AdminClientTest > testDescribeConsumerGroupForNonExistentGroup STARTED

kafka.api.AdminClientTest > testDescribeConsumerGroupForNonExistentGroup PASSED

kafka.api.AdminClientTest > testGetConsumerGroupSummary STARTED

kafka.api.AdminClientTest > testGetConsumerGroupSummary PASSED

kafka.api.UserQuotaTest > testProducerConsumerOverrideUnthrottled STARTED

kafka.api.UserQuotaTest > testProducerConsumerOverrideUnthrottled PASSED

kafka.api.UserQuotaTest > testThrottledProducerConsumer STARTED

kafka.api.UserQuotaTest > testThrottledProducerConsumer PASSED

kafka.api.UserQuotaTest > testQuotaOverrideDelete STARTED

kafka.api.UserQuotaTest > testQuotaOverrideDelete PASSED

kafka.api.SaslPlaintextConsumerTest > testCoordinatorFailover STARTED

kafka.api.SaslPlaintextConsumerTest > testCoordinatorFailover PASSED

kafka.api.SaslPlaintextConsumerTest > testSimpleConsumption STARTED

kafka.api.SaslPlaintextConsumerTest > testSimpleConsumption PASSED

kafka.api.SaslGssapiSslEndToEndAuthorizationTest > 
testTwoConsumersWithDifferentSaslCredentials STARTED

kafka.api.SaslGssapiSslEndToEndAuthorizationTest > 
testTwoConsumersWithDifferentSaslCredentials PASSED

kafka.api.SaslGssapiSslEndToEndAuthorizationTest > 
testNoConsumeWithoutDescribeAclViaSubscribe STARTED

kafka.api.SaslGssapiSslEndToEndAuthorizationTest > 
testNoConsumeWithoutDescribeAclViaSubscribe PASSED

kafka.api.SaslGssapiSslEndToEndAuthorizationTest > testProduceConsumeViaAssign 
STARTED

kafka.api.SaslGssapiSslEndToEndAuthorizationTest > testProduceConsumeViaAssign 
PASSED

kafka.api.SaslGssapiSslEndToEndAuthorizationTest > 
testNoConsumeWithDescribeAclViaAssign STARTED

kafka.api.SaslGssapiSslEndToEndAuthorizationTest > 
testNoConsumeWithDescribeAclViaAssign PASSED

kafka.api.SaslGssapiSslEndToEndAuthorizationTest > 
testNoConsumeWithDescribeAclViaSubscribe STARTED

kafka.api.SaslGssapiSslEndToEndAuthorizationTest > 
testNoConsumeWithDescribeAclViaSubscribe PASSED

kafka.api.SaslGssapiSslEndToEndAuthorizationTest > 
testNoConsumeWithoutDescribeAclViaAssign STARTED

kafka.api.SaslGssapiSslEndToEndAuthorizationTest > 
testNoConsumeWithoutDescribeAclViaAssign PASSED

kafka.api.SaslGssapiSslEndToEndAuthorizationTest > testNoGroupAcl STARTED

kafka.api.SaslGssapiSslEndToEndAuthorizationTest > testNoGroupAcl PASSED

kafka.api.SaslGssapiSslEndToEndAuthorizationTest > testNoProduceWithDescribeAcl 
STARTED

kafka.api.SaslGssapiSslEndToEndAuthorizationTest > testNoProduceWithDescribeAcl 
PASSED

kafka.api.SaslGssapiSslEndToEndAuthorizationTest > 
testProduceConsumeViaSubscribe STARTED

kafka.api.SaslGssapiSslEndToEndAuthorizationTest > 
testProduceConsumeViaSubscribe PASSED

kafka.api.SaslGssapiSslEndToEndAuthorizationTest > 
testNoProduceWithoutDescribeAcl STARTED

kafka.api.SaslGssapiSslEndToEndAuthorizationTest > 
testNoProduceWithoutDescribeAcl PASSED

kafka.api.ProducerBounceTest > testBrokerFailure STARTED

kafka.api.ProducerBounceTest > testBrokerFailure PASSED

kafka.api.AuthorizerIntegrationTest > testCommitWithTopicWrite STARTED

kafka.api.AuthorizerIntegrationTest > testCommitWithTopicWrite PASSED

kafka.api.AuthorizerIntegrationTest > 
testPatternSubscriptionWithTopicAndGroupRead STARTED

kafka.api.AuthorizerIntegrationTest > 
testPatternSubscriptionWithTopicAndGroupRead PASSED

kafka.api.AuthorizerIntegrationTest > testPatternSubscriptionWithNoTopicAccess 
STARTED

kafka.api.AuthorizerIntegrationTest > testPatternSubscriptionWithNoTopicAccess 
PASSED

kafka.api.AuthorizerIntegrationTest > 
testCreatePermissionNeededToReadFromNonExistentTopic STARTED

kafka.api.AuthorizerIntegrationTest > 
testCreatePermissionNeededToReadFromNonExistentTopic PASSED

kafka.api.AuthorizerIntegrationTest > 
testPatternSubscriptionWithTopicDescribeOnlyAndGroupRead STARTED

kafka.api.AuthorizerIntegrationTest > 
testPatternSubscriptionWithTopicDescribeOnlyAndGroupRead PASSED


Re: [DISCUSS] KIP-110: Add Codec for ZStandard Compression

2017-02-02 Thread Ewen Cheslack-Postava
I think attachments get filtered by the Apache mailing lists. You might
need to post it elsewhere. I think you could upload it to the corresponding
JIRA, for example.

-Ewen

On Tue, Jan 31, 2017 at 3:06 AM, Dongjin Lee  wrote:

> No problem. I will elaborate my KIP with the updated benchmark & error
> cases.
>
> And here, I resend you the screenshot:
>
>
> ​
> Best,
> Dongjin
>
> On Tue, Jan 31, 2017 at 7:42 PM, Ismael Juma  wrote:
>
>> Thanks Dongjin. That seems to make things simpler indeed. It would be good
>> to update the KIP with the relevant details. I suggest describing each
>> flow
>> and the error that the user would see in each case.
>>
>> Also, you mentioned an attached screenshot, but it seems like there was
>> nothing attached to the email.
>>
>> Ismael
>>
>> On Tue, Jan 31, 2017 at 9:28 AM, Dongjin Lee  wrote:
>>
>> > Ismael & All,
>> >
>> > After Inspecting the related code & commits, I concluded following:
>> >
>> > 1. We have to update the masking value which is used to retrieve the
>> used
>> > codec id from the messages, to enable the retrieval of the 3rd bit of
>> > compression type field of the message.
>> > 2. The above task is already done; so, we need nothing.
>> >
>> > Here is why.
>> >
>> > Let's start with the first one, with the scenario Juma proposed. In the
>> > case of receiving the message of unsupported compression type, the
>> receiver
>> > (= broker or consumer) raises IllegalArgumentException[^1][^2]. The key
>> > element in this operation is Record#COMPRESSION_CODEC_MASK, which is
>> used
>> > to extract the codec id. We have to update this value from 2-bits
>> extractor
>> > (=0x03) to 3-bits extractor (=0x07).
>> >
>> > But in fact, this task is already done, so its current value is 0x07. We
>> > don't have to update it.
>> >
>> > The reason why this task is already done has some story; From the first
>> > time Record.java file was added to the project[^3], the
>> > COMPRESSION_CODEC_MASK was already 2-bits extractor, that is, 0x03. At
>> that
>> > time, Kafka supported only two compression types - GZipCompression and
>> > SnappyCompression.[^4] After that, KAFKA-1456 introduced two additional
>> > codecs of LZ4 and LZ4C[^5]. This update modified COMPRESSION_CODEC_MASK
>> > into 3 bits extractor, 0x07, in the aim of supporting four compression
>> > codecs.
>> >
>> > Although its following work, KAFKA-1493, removed the support of LZ4C
>> > codec[^6], this mask was not reverted into 2-bits extractor - by this
>> > reason, we don't need to care about the message format.
>> >
>> > Attached screenshot is my test on Juma's scenario. I created topic &
>> sent
>> > some messages using the snapshot version with ZStandard compression and
>> > received the message with the latest version. As you can see, it works
>> > perfectly as expected.
>> >
>> > If you have more opinion to this issue, don't hesitate to send me as a
>> > message.
>> >
>> > Best,
>> > Dongjin
>> >
>> > [^1]: see: Record#compressionType.
>> > [^2]: There is similar routine in Message.scala. But after KAFKA-4390,
>> > that routine is not being used anymore - more precisely, Message class
>> is
>> > now used in ConsoleConsumer only. I think this class should be replaced
>> but
>> > since it is a separated topic, I will send another message for this
>> issue.
>> > [^3]: commit 642da2f (2011.8.2).
>> > [^4]: commit c51b940.
>> > [^5]: commit 547cced.
>> > [^6]: commit 37356bf.
>> >
>> > On Thu, Jan 26, 2017 at 12:35 AM, Ismael Juma 
>> wrote:
>> >
>> >> So far the discussion was around the performance characteristics of the
>> >> new
>> >> compression algorithm. Another area that is important and is not
>> covered
>> >> in
>> >> the KIP is the compatibility implications. For example, what happens
>> if a
>> >> consumer that doesn't support zstd tries to consume a topic compressed
>> >> with
>> >> it? Or if a broker that doesn't support receives data compressed with
>> it?
>> >> If we go through that exercise, then more changes may be required (like
>> >> bumping the version of produce/fetch protocols).
>> >>
>> >> Ismael
>> >>
>> >> On Wed, Jan 25, 2017 at 3:22 PM, Ben Stopford 
>> wrote:
>> >>
>> >> > Is there more discussion to be had on this KIP, or should it be taken
>> >> to a
>> >> > vote?
>> >> >
>> >> > On Mon, Jan 16, 2017 at 6:37 AM Dongjin Lee 
>> wrote:
>> >> >
>> >> > > I updated KIP-110 with JMH-measured benchmark results. Please have
>> a
>> >> > review
>> >> > > when you are free. (The overall result is not different yet.)
>> >> > >
>> >> > > Regards,
>> >> > > Dongjin
>> >> > >
>> >> > > +1. Could anyone assign KAFKA-4514 to me?
>> >> > >
>> >> > > On Thu, Jan 12, 2017 at 11:39 AM, Dongjin Lee 
>> >> > wrote:
>> >> > >
>> >> > > > Okay, I will have a try.
>> >> > > > Thanks Ewen for the guidance!!
>> >> > > >
>> >> > > > Best,
>> >> > > > Dongjin
>> >> > > >

Re: KIP-54 voting status?

2017-02-02 Thread Ewen Cheslack-Postava
I bumped the thread, including another binding vote for the KIP.

Nag messages are sometimes useful here and definitely acceptable -- much of
the time the reviews of KIPs (or JIRAs, mailing list questions, etc) simply
get lost in everyone's inboxes.

-Ewen

On Mon, Jan 30, 2017 at 5:57 PM, Jeff Widman  wrote:

> I joined the dev list after KIP-54 voting started, so unfortunately don't
> have the old thread to bump.
>
> But wanted to check if there was any news on this?
>
> From KAFKA-2273 sounds like there are no outstanding objections to the
> design, but there also aren't yet enough +1's, so is this just languishing
> in purgatory?
>


Re: [VOTE] KIP-54: Sticky Partition Assignment Strategy

2017-02-02 Thread Ewen Cheslack-Postava
+1

I don't think this solves all the stickiness/incremental rebalancing
problems we'll eventually want to address, but it's a nice improvement,
would be a benefit for a fair number of applications, and as it's a clean
extension to the existing options it doesn't come with any significant
compatibility concerns.

(Also, this should bump this thread, which Jeff Widman was wondering about.
It's lacking at least 1 more binding vote before it could pass.)

-Ewen

On Thu, Sep 22, 2016 at 1:43 AM, Mickael Maison 
wrote:

> +1 (non-binding)
>
> On Thu, Sep 15, 2016 at 8:32 PM, Bill Bejeck  wrote:
> > +1
> >
> > On Thu, Sep 15, 2016 at 5:16 AM, Rajini Sivaram <
> > rajinisiva...@googlemail.com> wrote:
> >
> >> +1 (non-binding)
> >>
> >> On Wed, Sep 14, 2016 at 12:37 AM, Jason Gustafson 
> >> wrote:
> >>
> >> > Thanks for the KIP. +1 from me.
> >> >
> >> > On Tue, Sep 13, 2016 at 12:05 PM, Vahid S Hashemian <
> >> > vahidhashem...@us.ibm.com> wrote:
> >> >
> >> > > Hi all,
> >> > >
> >> > > Thanks for providing feedback on this KIP so far.
> >> > > The KIP was discussed during the KIP meeting today and there doesn't
> >> seem
> >> > > to be any unaddressed issue at this point.
> >> > >
> >> > > So I would like to initiate the voting process.
> >> > >
> >> > > The KIP can be found here:
> >> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> > > 54+-+Sticky+Partition+Assignment+Strategy
> >> > > And the full discussion thread is here:
> >> > > https://www.mail-archive.com/dev@kafka.apache.org/msg47607.html
> >> > >
> >> > > Thanks.
> >> > > --Vahid
> >> > >
> >> > >
> >> >
> >>
> >>
> >>
> >> --
> >> Regards,
> >>
> >> Rajini
> >>
>


[jira] [Commented] (KAFKA-4385) producer is sending too many unnecessary meta data request if the meta data for a topic is not available and "auto.create.topics.enable" =false

2017-02-02 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15851100#comment-15851100
 ] 

Ewen Cheslack-Postava commented on KAFKA-4385:
--

[~sslavic] Your analysis seems fine, but the client doesn't currently have a 
way to determine whether auto topic creation is enabled or not. Which means 
even if we jumped through hoops to provide both Retriable and non-Retriable 
versions of the exception, it wouldn't help since the client wouldn't know 
which one to throw.

In general, auto topic creation seems to cause more problems than it solves...

> producer is sending too many unnecessary meta data request if the meta data 
> for a topic is not available and "auto.create.topics.enable" =false
> ---
>
> Key: KAFKA-4385
> URL: https://issues.apache.org/jira/browse/KAFKA-4385
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
>Reporter: Jun Yao
>
> All current kafka-client producer implementation (<= 0.10.1.0),
> When sending a msg to a topic, it will first check if meta data for this 
> topic is available or not, 
> when not available, it will set "metadata.requestUpdate()" and wait for meta 
> data from brokers, 
> The thing is inside "org.apache.kafka.clients.Metadata.awaitUpdate()", it's 
> already doing a "while (this.version <= lastVersion)" loop waiting for new 
> version response, 
> So the loop inside 
> "org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata() is not 
> needed, 
> When "auto.create.topics.enable" is false, sending msgs to a non-exist topic 
> will trigger too many meta requests, everytime a metadata response is 
> returned, because it does not contain the metadata for the topic, it's going 
> to try again until TimeoutException is thrown; 
> This is a waste and sometimes causes too much overhead when unexpected msgs 
> are arrived. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-117: Add a public AdministrativeClient API for Kafka admin operations

2017-02-02 Thread Becket Qin
Hi Colin,

Thanks for the KIP. An admin client is probably a must after we block
direct access to ZK. Some comments and thoughts below:

1. Do we have a clear scope for the admin client? It might be worth
thinking about the entire user experience of the admin client. Ideally we
may want to have a single client to do all the administrative work instead
of having multiple ways to do different things. For example, do we want to
add topic configurations change API in the admin client? What about
partition movements and preferred leader election? Those are also
administrative tasks which seem reasonable to be integrated into the admin
client.

2. Regarding the Future based async Ops v.s. batching of Ops, I would
prefer supporting batching if possible. That usually introduce much less
overhead when doing some big operations, e.g. in controller we have been
putting quite some efforts to batch the requests. For admin client, my
understanding is that the operations are:
a. rare and potentially big
b. likely OK to block (it would be good to see some use cases where a
nonblocking behavior is desired)
c. the efficiency of the operation matters.
Given the above three requirements, it seems a batching blocking API is
fine?

Thanks,

Jiangjie (Becket) Qin



On Thu, Feb 2, 2017 at 5:54 PM, Dong Lin  wrote:

> Hey Colin,
>
> Thanks for the KIP. I have a few comments below:
>
> - I share similar view with Ismael that a Future-based API is better.
> PurgeDataFrom() is an example API that user may want to do it
> asynchronously even though there is only one request in flight at a time.
> In the future we may also have some admin operation that allows user to
> move replica from one broker to another, which also needs to work in both
> sync and async style. It seems more generic to return Future for any API
> that requires both mode.
>
> - I am not sure if it is the cleanest way to have enum classes
> CreateTopicsFlags and DeleteTopicsFlags. Are we going to create such class
> for every future API that requires configuration? It may be more generic to
> provide Map to any admin API that operates on multiple entries.
> For example, deleteTopic(Map). And it can be Map Properties> for those API that requires multiple configs per entry. And we
> can provide default value, doc, config name for those API as we do
> with AbstractConfig.
>
> - I not sure if "Try" is very intuitive to Java developer. Is there any
> counterpart of scala's "Try" in java? We actually have quite a few existing
> classes in Kafka that address the same problem, such as
> ProduceRequestResult, LogAppendResult etc. Maybe we can follow the same
> conversion and use *Result as this class name.
>
> - How are we going to allow user to specify timeout for blocking APIs such
> as deleteTopic? Is this configured per AdminClient, or should it be
> specified in the API parameter?
>
> - Are we going to have this class initiate its own thread, as we do with
> Sender class, to send/receive requests? If yes, it will be useful to have
> have a class that extends AbstractConfig and specifies config and their
> default values.
>
> Thanks,
> Dong
>
>
>
> On Thu, Feb 2, 2017 at 3:02 PM, Ismael Juma  wrote:
>
> > Hi Colin,
> >
> > Thanks for the KIP, great to make progress on this. I have some initial
> > comments, will post more later:
> >
> > 1. We have KafkaProducer that implements the Producer interface and
> > KafkaConsumer that implements the Consumer interface. Maybe we could have
> > KafkaAdminClient that implements the AdminClient interface? Or maybe just
> > KafkaAdmin. Not sure, some ideas for consideration. Also, I don't think
> we
> > should worry about a name clash with the internal AdminClient written in
> > Scala. That will go away soon enough and choosing a good name for the
> > public class is what matters.
> >
> > 2. We should include the proposed package name in the KIP
> > (probably org.apache.kafka.clients.admin?).
> >
> > 3. It would be good to list the supported configs.
> >
> > 4. KIP-107, which passed the vote, specifies the introduction of a method
> > to AdminClient with the following signature. We should figure out how it
> > would look given this proposal.
> >
> > Future>
> > purgeDataBefore(Map offsetForPartition)
> >
> > 5. I am not sure about rejecting the Futures-based API. I think I would
> > prefer that, personally. Grant had an interesting idea of not exposing
> the
> > batch methods in the AdminClient to start with to keep it simple and
> > relying on a Future based API to make it easy for users to run things
> > concurrently. This is consistent with the producer and Java 8 makes
> things
> > a lot nicer with CompletableFuture (similar to Scala Futures). I will
> think
> > more about this and other details of the proposal and send a follow-up.
> >
> > Ismael
> >
> > On Thu, Feb 2, 2017 at 6:54 PM, Colin 

[jira] [Commented] (KAFKA-4614) Long GC pause harming broker performance which is caused by mmap objects created for OffsetIndex

2017-02-02 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15851097#comment-15851097
 ] 

Guozhang Wang commented on KAFKA-4614:
--

[~kawamuray] Thanks for sharing!!

> Long GC pause harming broker performance which is caused by mmap objects 
> created for OffsetIndex
> 
>
> Key: KAFKA-4614
> URL: https://issues.apache.org/jira/browse/KAFKA-4614
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.10.0.1
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
>  Labels: latency, performance
> Fix For: 0.10.2.0
>
> Attachments: kafka-produce-99th.png
>
>
> First let me clarify our system environment information as I think it's 
> important to understand this issue:
> OS: CentOS6
> Kernel version: 2.6.32-XX
> Filesystem used for data volume: XFS
> Java version: 1.8.0_66
> GC option: Kafka default(G1GC)
> Kafka version: 0.10.0.1
> h2. Phenomenon
> In our Kafka cluster, an usual response time for Produce request is about 1ms 
> for 50th percentile to 10ms for 99th percentile. All topics are configured to 
> have 3 replicas and all producers are configured {{acks=all}} so this time 
> includes replication latency.
> Sometimes we observe 99th percentile latency get increased to 100ms ~ 500ms 
> but for the most cases the time consuming part is "Remote" which means it is 
> caused by slow replication which is known to happen by various reasons(which 
> is also an issue that we're trying to improve, but out of interest within 
> this issue).
> However, we found that there are some different patterns which happens rarely 
> but stationary 3 ~ 5 times a day for each servers. The interesting part is 
> that "RequestQueue" also got increased as well as "Total" and "Remote".
> At the same time, we observed that the disk read metrics(in terms of both 
> read bytes and read time) spikes exactly for the same moment. Currently we 
> have only caught up consumers so this metric sticks to zero while all 
> requests are served by page cache.
> In order to investigate what Kafka is "read"ing, I employed SystemTap and 
> wrote the following script. It traces all disk reads(only actual read by 
> physical device) made for the data volume by broker process.
> {code}
> global target_pid = KAFKA_PID
> global target_dev = DATA_VOLUME
> probe ioblock.request {
>   if (rw == BIO_READ && pid() == target_pid && devname == target_dev) {
>  t_ms = gettimeofday_ms() + 9 * 3600 * 1000 // timezone adjustment
>  printf("%s,%03d:  tid = %d, device = %s, inode = %d, size = %d\n", 
> ctime(t_ms / 1000), t_ms % 1000, tid(), devname, ino, size)
>  print_backtrace()
>  print_ubacktrace()
>   }
> }
> {code}
> As the result, we could observe many logs like below:
> {code}
> Thu Dec 22 17:21:39 2016,209:  tid = 126123, device = sdb1, inode = -1, size 
> = 4096
>  0x81275050 : generic_make_request+0x0/0x5a0 [kernel]
>  0x81275660 : submit_bio+0x70/0x120 [kernel]
>  0xa036bcaa : _xfs_buf_ioapply+0x16a/0x200 [xfs]
>  0xa036d95f : xfs_buf_iorequest+0x4f/0xe0 [xfs]
>  0xa036db46 : _xfs_buf_read+0x36/0x60 [xfs]
>  0xa036dc1b : xfs_buf_read+0xab/0x100 [xfs]
>  0xa0363477 : xfs_trans_read_buf+0x1f7/0x410 [xfs]
>  0xa033014e : xfs_btree_read_buf_block+0x5e/0xd0 [xfs]
>  0xa0330854 : xfs_btree_lookup_get_block+0x84/0xf0 [xfs]
>  0xa0330edf : xfs_btree_lookup+0xbf/0x470 [xfs]
>  0xa032456f : xfs_bmbt_lookup_eq+0x1f/0x30 [xfs]
>  0xa032628b : xfs_bmap_del_extent+0x12b/0xac0 [xfs]
>  0xa0326f34 : xfs_bunmapi+0x314/0x850 [xfs]
>  0xa034ad79 : xfs_itruncate_extents+0xe9/0x280 [xfs]
>  0xa0366de5 : xfs_inactive+0x2f5/0x450 [xfs]
>  0xa0374620 : xfs_fs_clear_inode+0xa0/0xd0 [xfs]
>  0x811affbc : clear_inode+0xac/0x140 [kernel]
>  0x811b0776 : generic_delete_inode+0x196/0x1d0 [kernel]
>  0x811b0815 : generic_drop_inode+0x65/0x80 [kernel]
>  0x811af662 : iput+0x62/0x70 [kernel]
>  0x37ff2e5347 : munmap+0x7/0x30 [/lib64/libc-2.12.so]
>  0x7ff169ba5d47 : Java_sun_nio_ch_FileChannelImpl_unmap0+0x17/0x50 
> [/usr/jdk1.8.0_66/jre/lib/amd64/libnio.so]
>  0x7ff269a1307e
> {code}
> We took a jstack dump of the broker process and found that tid = 126123 
> corresponds to the thread which is for GC(hex(tid) == nid(Native Thread ID)):
> {code}
> $ grep 0x1ecab /tmp/jstack.dump
> "Reference Handler" #2 daemon prio=10 os_prio=0 tid=0x7ff278d0c800 
> nid=0x1ecab in Object.wait() [0x7ff17da11000]
> {code}
> In order to confirm, we enabled {{PrintGCApplicationStoppedTime}} switch and 
> confirmed that in total the time the broker paused is longer than usual, when 

[jira] [Comment Edited] (KAFKA-4614) Long GC pause harming broker performance which is caused by mmap objects created for OffsetIndex

2017-02-02 Thread Yuto Kawamura (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15851093#comment-15851093
 ] 

Yuto Kawamura edited comment on KAFKA-4614 at 2/3/17 5:42 AM:
--

Just as a sequel, the below picture shows before and after of the 99th 
percentile response time of Produce request of our production cluster by 
applying the patch to all brokers.

!kafka-produce-99th.png|width=100%!

Thank you all :)


was (Author: kawamuray):
Just as a sequel, the below picture shows before and after of the 99th 
percentile response time of Produce request of our production cluster by 
applying the patch to all brokers.

!kafka-produce-99th.png!

Thank you all :)

> Long GC pause harming broker performance which is caused by mmap objects 
> created for OffsetIndex
> 
>
> Key: KAFKA-4614
> URL: https://issues.apache.org/jira/browse/KAFKA-4614
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.10.0.1
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
>  Labels: latency, performance
> Fix For: 0.10.2.0
>
> Attachments: kafka-produce-99th.png
>
>
> First let me clarify our system environment information as I think it's 
> important to understand this issue:
> OS: CentOS6
> Kernel version: 2.6.32-XX
> Filesystem used for data volume: XFS
> Java version: 1.8.0_66
> GC option: Kafka default(G1GC)
> Kafka version: 0.10.0.1
> h2. Phenomenon
> In our Kafka cluster, an usual response time for Produce request is about 1ms 
> for 50th percentile to 10ms for 99th percentile. All topics are configured to 
> have 3 replicas and all producers are configured {{acks=all}} so this time 
> includes replication latency.
> Sometimes we observe 99th percentile latency get increased to 100ms ~ 500ms 
> but for the most cases the time consuming part is "Remote" which means it is 
> caused by slow replication which is known to happen by various reasons(which 
> is also an issue that we're trying to improve, but out of interest within 
> this issue).
> However, we found that there are some different patterns which happens rarely 
> but stationary 3 ~ 5 times a day for each servers. The interesting part is 
> that "RequestQueue" also got increased as well as "Total" and "Remote".
> At the same time, we observed that the disk read metrics(in terms of both 
> read bytes and read time) spikes exactly for the same moment. Currently we 
> have only caught up consumers so this metric sticks to zero while all 
> requests are served by page cache.
> In order to investigate what Kafka is "read"ing, I employed SystemTap and 
> wrote the following script. It traces all disk reads(only actual read by 
> physical device) made for the data volume by broker process.
> {code}
> global target_pid = KAFKA_PID
> global target_dev = DATA_VOLUME
> probe ioblock.request {
>   if (rw == BIO_READ && pid() == target_pid && devname == target_dev) {
>  t_ms = gettimeofday_ms() + 9 * 3600 * 1000 // timezone adjustment
>  printf("%s,%03d:  tid = %d, device = %s, inode = %d, size = %d\n", 
> ctime(t_ms / 1000), t_ms % 1000, tid(), devname, ino, size)
>  print_backtrace()
>  print_ubacktrace()
>   }
> }
> {code}
> As the result, we could observe many logs like below:
> {code}
> Thu Dec 22 17:21:39 2016,209:  tid = 126123, device = sdb1, inode = -1, size 
> = 4096
>  0x81275050 : generic_make_request+0x0/0x5a0 [kernel]
>  0x81275660 : submit_bio+0x70/0x120 [kernel]
>  0xa036bcaa : _xfs_buf_ioapply+0x16a/0x200 [xfs]
>  0xa036d95f : xfs_buf_iorequest+0x4f/0xe0 [xfs]
>  0xa036db46 : _xfs_buf_read+0x36/0x60 [xfs]
>  0xa036dc1b : xfs_buf_read+0xab/0x100 [xfs]
>  0xa0363477 : xfs_trans_read_buf+0x1f7/0x410 [xfs]
>  0xa033014e : xfs_btree_read_buf_block+0x5e/0xd0 [xfs]
>  0xa0330854 : xfs_btree_lookup_get_block+0x84/0xf0 [xfs]
>  0xa0330edf : xfs_btree_lookup+0xbf/0x470 [xfs]
>  0xa032456f : xfs_bmbt_lookup_eq+0x1f/0x30 [xfs]
>  0xa032628b : xfs_bmap_del_extent+0x12b/0xac0 [xfs]
>  0xa0326f34 : xfs_bunmapi+0x314/0x850 [xfs]
>  0xa034ad79 : xfs_itruncate_extents+0xe9/0x280 [xfs]
>  0xa0366de5 : xfs_inactive+0x2f5/0x450 [xfs]
>  0xa0374620 : xfs_fs_clear_inode+0xa0/0xd0 [xfs]
>  0x811affbc : clear_inode+0xac/0x140 [kernel]
>  0x811b0776 : generic_delete_inode+0x196/0x1d0 [kernel]
>  0x811b0815 : generic_drop_inode+0x65/0x80 [kernel]
>  0x811af662 : iput+0x62/0x70 [kernel]
>  0x37ff2e5347 : munmap+0x7/0x30 [/lib64/libc-2.12.so]
>  0x7ff169ba5d47 : Java_sun_nio_ch_FileChannelImpl_unmap0+0x17/0x50 
> [/usr/jdk1.8.0_66/jre/lib/amd64/libnio.so]
>  0x7ff269a1307e
> 

Re: Trying to understand design decision about producer ack and min.insync.replicas

2017-02-02 Thread Ewen Cheslack-Postava
James,

Great question, I probably should have been clearer. log data is an example
where the app (or even instance of the app) might know best what the right
tradeoff is. Depending on your strategy for managing logs, you may or may
not be mixing multiple logs (and logs from different deployments) into the
same topic. For example, if you key by application, then you have an easy
way to split logs up while still getting a global feed of log messages.
Maybe logs from one app are really critical and we want to retry, but from
another app are just a nice to have.

There are other examples even within a single app. For example, a gaming
company might report data from a user of a game to the same topic but want
2 producers with different reliability levels (and possibly where the
ordering constraints across the two sets that might otherwise cause you to
use a single consumer are not an issue). High frequency telemetry on a
player might be desirable to have, but not the end of the world if some is
lost. In contrast, they may want a stronger guarantee for, e.g., sometime
like chat messages, where they want to have a permanent record of them in
all circumstances.

-Ewen

On Fri, Jan 27, 2017 at 12:59 AM, James Cheng  wrote:

>
> > On Jan 27, 2017, at 12:18 AM, Ewen Cheslack-Postava 
> wrote:
> >
> > On Thu, Jan 26, 2017 at 4:23 PM, Luciano Afranllie <
> listas.luaf...@gmail.com
> >> wrote:
> >
> >> I was thinking about the situation where you have less brokers in the
> ISR
> >> list than the number set in min.insync.replicas.
> >>
> >> My idea was that if I, as an administrator, for a given topic, want to
> >> favor durability over availability, then if that topic has less ISR than
> >> the value set in min.insync.replicas I may want to stop producing to the
> >> topic. In the way min.insync.replicas and ack work, I need to coordinate
> >> with all producers in order to achieve this. There is no way (or I don't
> >> know it) to globally enforce stop producing to a topic if it is under
> >> replicated.
> >>
> >> I don't see why, for the same topic, some producers might want get an
> error
> >> when the number of ISR is below min.insync.replicas while other
> producers
> >> don't. I think it could be more useful to be able to set that ALL
> producers
> >> should get an error when a given topic is under replicated so they stop
> >> producing, than for a single producer to get an error when ANY topic is
> >> under replicated. I don't have a lot of experience with Kafka so I may
> be
> >> missing some use cases.
> >>
> >
> > It's also a matter of not having to do a ton of configuration on a
> > per-topic basis. Putting some control in the producer apps hands means
> you
> > can set reasonably global defaults which make sense for apps that require
> > stronger durability while letting cases that have lower requirements
> still
> > benefit from the durability before consumers see data but not block
> > producers because the producer chooses lower requirements. WIthout
> > requiring the ability to make config changes on the Kafka brokers (which
> > may be locked down and restricted only to Kafka admins), the producer
> > application can choose to accept weaker guarantees based on the tradeoffs
> > it needs to make.
> >
>
> I'm not sure I follow, Ewen.
>
> I do agree that if I set min.insync.replicas at a broker level, then of
> course I would like individual producers to decide whether their topic
> (which inherits from the global setting) should reject writes if that topic
> has size(ISR)
> But on a topic-level... are you saying that if a particular topic has
> min.insync.replicas set, that you want producers to have the flexibility to
> decide on whether they want durability vs availability?
>
> Often times (but not always), a particular topic is used only by a small
> set of producers with a specific set of data. The durability settings would
> usually be chosen due to the nature of the data, rather than based on who
> produced the data, and so it makes sense to me that the durability should
> be on the entire topic, not by the producer.
>
> What is a use case where you have multiple producers writing to the same
> topic but would want different durability?
>
> -James
>
> > The ability to make this tradeoff in different places can seem more
> complex
> > (and really by definition *is* more complex), but it also offers more
> > flexibility.
> >
> > -Ewen
> >
> >
> >> But I understand your point, min.insync.replicas setting should be
> >> understood as "if a producer wants to get an error when topics are under
> >> replicated, then how many replicas are enough for not raising an error?"
> >>
> >>
> >> On Thu, Jan 26, 2017 at 4:16 PM, Ewen Cheslack-Postava <
> e...@confluent.io>
> >> wrote:
> >>
> >>> The acks setting for the producer doesn't affect the final durability
> >>> guarantees. These are still enforced by the replication and min ISR
> >>> settings. Instead, the ack 

[jira] [Comment Edited] (KAFKA-4614) Long GC pause harming broker performance which is caused by mmap objects created for OffsetIndex

2017-02-02 Thread Yuto Kawamura (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15851093#comment-15851093
 ] 

Yuto Kawamura edited comment on KAFKA-4614 at 2/3/17 5:41 AM:
--

Just as a sequel, the below picture shows before and after of the 99th 
percentile response time of Produce request of our production cluster by 
applying the patch to all brokers.

!kafka-produce-99th.png!

Thank you all :)


was (Author: kawamuray):
Just as a sequel, the below picture shows before and after of the 99th 
percentile response time of Produce request of our production cluster by 
applying the patch to all brokers.

https://issues.apache.org/jira/secure/attachment/12850770/kafka-produce-99th.png

Thank you all :)

> Long GC pause harming broker performance which is caused by mmap objects 
> created for OffsetIndex
> 
>
> Key: KAFKA-4614
> URL: https://issues.apache.org/jira/browse/KAFKA-4614
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.10.0.1
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
>  Labels: latency, performance
> Fix For: 0.10.2.0
>
> Attachments: kafka-produce-99th.png
>
>
> First let me clarify our system environment information as I think it's 
> important to understand this issue:
> OS: CentOS6
> Kernel version: 2.6.32-XX
> Filesystem used for data volume: XFS
> Java version: 1.8.0_66
> GC option: Kafka default(G1GC)
> Kafka version: 0.10.0.1
> h2. Phenomenon
> In our Kafka cluster, an usual response time for Produce request is about 1ms 
> for 50th percentile to 10ms for 99th percentile. All topics are configured to 
> have 3 replicas and all producers are configured {{acks=all}} so this time 
> includes replication latency.
> Sometimes we observe 99th percentile latency get increased to 100ms ~ 500ms 
> but for the most cases the time consuming part is "Remote" which means it is 
> caused by slow replication which is known to happen by various reasons(which 
> is also an issue that we're trying to improve, but out of interest within 
> this issue).
> However, we found that there are some different patterns which happens rarely 
> but stationary 3 ~ 5 times a day for each servers. The interesting part is 
> that "RequestQueue" also got increased as well as "Total" and "Remote".
> At the same time, we observed that the disk read metrics(in terms of both 
> read bytes and read time) spikes exactly for the same moment. Currently we 
> have only caught up consumers so this metric sticks to zero while all 
> requests are served by page cache.
> In order to investigate what Kafka is "read"ing, I employed SystemTap and 
> wrote the following script. It traces all disk reads(only actual read by 
> physical device) made for the data volume by broker process.
> {code}
> global target_pid = KAFKA_PID
> global target_dev = DATA_VOLUME
> probe ioblock.request {
>   if (rw == BIO_READ && pid() == target_pid && devname == target_dev) {
>  t_ms = gettimeofday_ms() + 9 * 3600 * 1000 // timezone adjustment
>  printf("%s,%03d:  tid = %d, device = %s, inode = %d, size = %d\n", 
> ctime(t_ms / 1000), t_ms % 1000, tid(), devname, ino, size)
>  print_backtrace()
>  print_ubacktrace()
>   }
> }
> {code}
> As the result, we could observe many logs like below:
> {code}
> Thu Dec 22 17:21:39 2016,209:  tid = 126123, device = sdb1, inode = -1, size 
> = 4096
>  0x81275050 : generic_make_request+0x0/0x5a0 [kernel]
>  0x81275660 : submit_bio+0x70/0x120 [kernel]
>  0xa036bcaa : _xfs_buf_ioapply+0x16a/0x200 [xfs]
>  0xa036d95f : xfs_buf_iorequest+0x4f/0xe0 [xfs]
>  0xa036db46 : _xfs_buf_read+0x36/0x60 [xfs]
>  0xa036dc1b : xfs_buf_read+0xab/0x100 [xfs]
>  0xa0363477 : xfs_trans_read_buf+0x1f7/0x410 [xfs]
>  0xa033014e : xfs_btree_read_buf_block+0x5e/0xd0 [xfs]
>  0xa0330854 : xfs_btree_lookup_get_block+0x84/0xf0 [xfs]
>  0xa0330edf : xfs_btree_lookup+0xbf/0x470 [xfs]
>  0xa032456f : xfs_bmbt_lookup_eq+0x1f/0x30 [xfs]
>  0xa032628b : xfs_bmap_del_extent+0x12b/0xac0 [xfs]
>  0xa0326f34 : xfs_bunmapi+0x314/0x850 [xfs]
>  0xa034ad79 : xfs_itruncate_extents+0xe9/0x280 [xfs]
>  0xa0366de5 : xfs_inactive+0x2f5/0x450 [xfs]
>  0xa0374620 : xfs_fs_clear_inode+0xa0/0xd0 [xfs]
>  0x811affbc : clear_inode+0xac/0x140 [kernel]
>  0x811b0776 : generic_delete_inode+0x196/0x1d0 [kernel]
>  0x811b0815 : generic_drop_inode+0x65/0x80 [kernel]
>  0x811af662 : iput+0x62/0x70 [kernel]
>  0x37ff2e5347 : munmap+0x7/0x30 [/lib64/libc-2.12.so]
>  0x7ff169ba5d47 : Java_sun_nio_ch_FileChannelImpl_unmap0+0x17/0x50 
> 

[jira] [Comment Edited] (KAFKA-4614) Long GC pause harming broker performance which is caused by mmap objects created for OffsetIndex

2017-02-02 Thread Yuto Kawamura (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15851093#comment-15851093
 ] 

Yuto Kawamura edited comment on KAFKA-4614 at 2/3/17 5:37 AM:
--

Just as a sequel, the below picture shows before and after of the 99th 
percentile response time of Produce request of our production cluster by 
applying the patch to all brokers.

https://issues.apache.org/jira/secure/attachment/12850770/kafka-produce-99th.png

Thank you all :)


was (Author: kawamuray):
Just as a sequel, this is what happened to the 99th percentile response time of 
Produce request of our production cluster after applied the patch to all 
brokers.

Thank you all :)

> Long GC pause harming broker performance which is caused by mmap objects 
> created for OffsetIndex
> 
>
> Key: KAFKA-4614
> URL: https://issues.apache.org/jira/browse/KAFKA-4614
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.10.0.1
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
>  Labels: latency, performance
> Fix For: 0.10.2.0
>
> Attachments: kafka-produce-99th.png
>
>
> First let me clarify our system environment information as I think it's 
> important to understand this issue:
> OS: CentOS6
> Kernel version: 2.6.32-XX
> Filesystem used for data volume: XFS
> Java version: 1.8.0_66
> GC option: Kafka default(G1GC)
> Kafka version: 0.10.0.1
> h2. Phenomenon
> In our Kafka cluster, an usual response time for Produce request is about 1ms 
> for 50th percentile to 10ms for 99th percentile. All topics are configured to 
> have 3 replicas and all producers are configured {{acks=all}} so this time 
> includes replication latency.
> Sometimes we observe 99th percentile latency get increased to 100ms ~ 500ms 
> but for the most cases the time consuming part is "Remote" which means it is 
> caused by slow replication which is known to happen by various reasons(which 
> is also an issue that we're trying to improve, but out of interest within 
> this issue).
> However, we found that there are some different patterns which happens rarely 
> but stationary 3 ~ 5 times a day for each servers. The interesting part is 
> that "RequestQueue" also got increased as well as "Total" and "Remote".
> At the same time, we observed that the disk read metrics(in terms of both 
> read bytes and read time) spikes exactly for the same moment. Currently we 
> have only caught up consumers so this metric sticks to zero while all 
> requests are served by page cache.
> In order to investigate what Kafka is "read"ing, I employed SystemTap and 
> wrote the following script. It traces all disk reads(only actual read by 
> physical device) made for the data volume by broker process.
> {code}
> global target_pid = KAFKA_PID
> global target_dev = DATA_VOLUME
> probe ioblock.request {
>   if (rw == BIO_READ && pid() == target_pid && devname == target_dev) {
>  t_ms = gettimeofday_ms() + 9 * 3600 * 1000 // timezone adjustment
>  printf("%s,%03d:  tid = %d, device = %s, inode = %d, size = %d\n", 
> ctime(t_ms / 1000), t_ms % 1000, tid(), devname, ino, size)
>  print_backtrace()
>  print_ubacktrace()
>   }
> }
> {code}
> As the result, we could observe many logs like below:
> {code}
> Thu Dec 22 17:21:39 2016,209:  tid = 126123, device = sdb1, inode = -1, size 
> = 4096
>  0x81275050 : generic_make_request+0x0/0x5a0 [kernel]
>  0x81275660 : submit_bio+0x70/0x120 [kernel]
>  0xa036bcaa : _xfs_buf_ioapply+0x16a/0x200 [xfs]
>  0xa036d95f : xfs_buf_iorequest+0x4f/0xe0 [xfs]
>  0xa036db46 : _xfs_buf_read+0x36/0x60 [xfs]
>  0xa036dc1b : xfs_buf_read+0xab/0x100 [xfs]
>  0xa0363477 : xfs_trans_read_buf+0x1f7/0x410 [xfs]
>  0xa033014e : xfs_btree_read_buf_block+0x5e/0xd0 [xfs]
>  0xa0330854 : xfs_btree_lookup_get_block+0x84/0xf0 [xfs]
>  0xa0330edf : xfs_btree_lookup+0xbf/0x470 [xfs]
>  0xa032456f : xfs_bmbt_lookup_eq+0x1f/0x30 [xfs]
>  0xa032628b : xfs_bmap_del_extent+0x12b/0xac0 [xfs]
>  0xa0326f34 : xfs_bunmapi+0x314/0x850 [xfs]
>  0xa034ad79 : xfs_itruncate_extents+0xe9/0x280 [xfs]
>  0xa0366de5 : xfs_inactive+0x2f5/0x450 [xfs]
>  0xa0374620 : xfs_fs_clear_inode+0xa0/0xd0 [xfs]
>  0x811affbc : clear_inode+0xac/0x140 [kernel]
>  0x811b0776 : generic_delete_inode+0x196/0x1d0 [kernel]
>  0x811b0815 : generic_drop_inode+0x65/0x80 [kernel]
>  0x811af662 : iput+0x62/0x70 [kernel]
>  0x37ff2e5347 : munmap+0x7/0x30 [/lib64/libc-2.12.so]
>  0x7ff169ba5d47 : Java_sun_nio_ch_FileChannelImpl_unmap0+0x17/0x50 
> [/usr/jdk1.8.0_66/jre/lib/amd64/libnio.so]
>  0x7ff269a1307e

[GitHub] kafka pull request #2461: MINOR: added upgrade and API changes to docs

2017-02-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2461


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (KAFKA-4614) Long GC pause harming broker performance which is caused by mmap objects created for OffsetIndex

2017-02-02 Thread Yuto Kawamura (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuto Kawamura updated KAFKA-4614:
-
Attachment: kafka-produce-99th.png

Just as a sequel, this is what happened to the 99th percentile response time of 
Produce request of our production cluster after applied the patch to all 
brokers.

Thank you all :)

> Long GC pause harming broker performance which is caused by mmap objects 
> created for OffsetIndex
> 
>
> Key: KAFKA-4614
> URL: https://issues.apache.org/jira/browse/KAFKA-4614
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.10.0.1
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
>  Labels: latency, performance
> Fix For: 0.10.2.0
>
> Attachments: kafka-produce-99th.png
>
>
> First let me clarify our system environment information as I think it's 
> important to understand this issue:
> OS: CentOS6
> Kernel version: 2.6.32-XX
> Filesystem used for data volume: XFS
> Java version: 1.8.0_66
> GC option: Kafka default(G1GC)
> Kafka version: 0.10.0.1
> h2. Phenomenon
> In our Kafka cluster, an usual response time for Produce request is about 1ms 
> for 50th percentile to 10ms for 99th percentile. All topics are configured to 
> have 3 replicas and all producers are configured {{acks=all}} so this time 
> includes replication latency.
> Sometimes we observe 99th percentile latency get increased to 100ms ~ 500ms 
> but for the most cases the time consuming part is "Remote" which means it is 
> caused by slow replication which is known to happen by various reasons(which 
> is also an issue that we're trying to improve, but out of interest within 
> this issue).
> However, we found that there are some different patterns which happens rarely 
> but stationary 3 ~ 5 times a day for each servers. The interesting part is 
> that "RequestQueue" also got increased as well as "Total" and "Remote".
> At the same time, we observed that the disk read metrics(in terms of both 
> read bytes and read time) spikes exactly for the same moment. Currently we 
> have only caught up consumers so this metric sticks to zero while all 
> requests are served by page cache.
> In order to investigate what Kafka is "read"ing, I employed SystemTap and 
> wrote the following script. It traces all disk reads(only actual read by 
> physical device) made for the data volume by broker process.
> {code}
> global target_pid = KAFKA_PID
> global target_dev = DATA_VOLUME
> probe ioblock.request {
>   if (rw == BIO_READ && pid() == target_pid && devname == target_dev) {
>  t_ms = gettimeofday_ms() + 9 * 3600 * 1000 // timezone adjustment
>  printf("%s,%03d:  tid = %d, device = %s, inode = %d, size = %d\n", 
> ctime(t_ms / 1000), t_ms % 1000, tid(), devname, ino, size)
>  print_backtrace()
>  print_ubacktrace()
>   }
> }
> {code}
> As the result, we could observe many logs like below:
> {code}
> Thu Dec 22 17:21:39 2016,209:  tid = 126123, device = sdb1, inode = -1, size 
> = 4096
>  0x81275050 : generic_make_request+0x0/0x5a0 [kernel]
>  0x81275660 : submit_bio+0x70/0x120 [kernel]
>  0xa036bcaa : _xfs_buf_ioapply+0x16a/0x200 [xfs]
>  0xa036d95f : xfs_buf_iorequest+0x4f/0xe0 [xfs]
>  0xa036db46 : _xfs_buf_read+0x36/0x60 [xfs]
>  0xa036dc1b : xfs_buf_read+0xab/0x100 [xfs]
>  0xa0363477 : xfs_trans_read_buf+0x1f7/0x410 [xfs]
>  0xa033014e : xfs_btree_read_buf_block+0x5e/0xd0 [xfs]
>  0xa0330854 : xfs_btree_lookup_get_block+0x84/0xf0 [xfs]
>  0xa0330edf : xfs_btree_lookup+0xbf/0x470 [xfs]
>  0xa032456f : xfs_bmbt_lookup_eq+0x1f/0x30 [xfs]
>  0xa032628b : xfs_bmap_del_extent+0x12b/0xac0 [xfs]
>  0xa0326f34 : xfs_bunmapi+0x314/0x850 [xfs]
>  0xa034ad79 : xfs_itruncate_extents+0xe9/0x280 [xfs]
>  0xa0366de5 : xfs_inactive+0x2f5/0x450 [xfs]
>  0xa0374620 : xfs_fs_clear_inode+0xa0/0xd0 [xfs]
>  0x811affbc : clear_inode+0xac/0x140 [kernel]
>  0x811b0776 : generic_delete_inode+0x196/0x1d0 [kernel]
>  0x811b0815 : generic_drop_inode+0x65/0x80 [kernel]
>  0x811af662 : iput+0x62/0x70 [kernel]
>  0x37ff2e5347 : munmap+0x7/0x30 [/lib64/libc-2.12.so]
>  0x7ff169ba5d47 : Java_sun_nio_ch_FileChannelImpl_unmap0+0x17/0x50 
> [/usr/jdk1.8.0_66/jre/lib/amd64/libnio.so]
>  0x7ff269a1307e
> {code}
> We took a jstack dump of the broker process and found that tid = 126123 
> corresponds to the thread which is for GC(hex(tid) == nid(Native Thread ID)):
> {code}
> $ grep 0x1ecab /tmp/jstack.dump
> "Reference Handler" #2 daemon prio=10 os_prio=0 tid=0x7ff278d0c800 
> nid=0x1ecab in Object.wait() [0x7ff17da11000]
> {code}
> In order to 

Jenkins build is back to normal : kafka-trunk-jdk8 #1245

2017-02-02 Thread Apache Jenkins Server
See 



Re: [DISCUSS] KIP-117: Add a public AdministrativeClient API for Kafka admin operations

2017-02-02 Thread Dong Lin
Hey Colin,

Thanks for the KIP. I have a few comments below:

- I share similar view with Ismael that a Future-based API is better.
PurgeDataFrom() is an example API that user may want to do it
asynchronously even though there is only one request in flight at a time.
In the future we may also have some admin operation that allows user to
move replica from one broker to another, which also needs to work in both
sync and async style. It seems more generic to return Future for any API
that requires both mode.

- I am not sure if it is the cleanest way to have enum classes
CreateTopicsFlags and DeleteTopicsFlags. Are we going to create such class
for every future API that requires configuration? It may be more generic to
provide Map to any admin API that operates on multiple entries.
For example, deleteTopic(Map). And it can be Map for those API that requires multiple configs per entry. And we
can provide default value, doc, config name for those API as we do
with AbstractConfig.

- I not sure if "Try" is very intuitive to Java developer. Is there any
counterpart of scala's "Try" in java? We actually have quite a few existing
classes in Kafka that address the same problem, such as
ProduceRequestResult, LogAppendResult etc. Maybe we can follow the same
conversion and use *Result as this class name.

- How are we going to allow user to specify timeout for blocking APIs such
as deleteTopic? Is this configured per AdminClient, or should it be
specified in the API parameter?

- Are we going to have this class initiate its own thread, as we do with
Sender class, to send/receive requests? If yes, it will be useful to have
have a class that extends AbstractConfig and specifies config and their
default values.

Thanks,
Dong



On Thu, Feb 2, 2017 at 3:02 PM, Ismael Juma  wrote:

> Hi Colin,
>
> Thanks for the KIP, great to make progress on this. I have some initial
> comments, will post more later:
>
> 1. We have KafkaProducer that implements the Producer interface and
> KafkaConsumer that implements the Consumer interface. Maybe we could have
> KafkaAdminClient that implements the AdminClient interface? Or maybe just
> KafkaAdmin. Not sure, some ideas for consideration. Also, I don't think we
> should worry about a name clash with the internal AdminClient written in
> Scala. That will go away soon enough and choosing a good name for the
> public class is what matters.
>
> 2. We should include the proposed package name in the KIP
> (probably org.apache.kafka.clients.admin?).
>
> 3. It would be good to list the supported configs.
>
> 4. KIP-107, which passed the vote, specifies the introduction of a method
> to AdminClient with the following signature. We should figure out how it
> would look given this proposal.
>
> Future>
> purgeDataBefore(Map offsetForPartition)
>
> 5. I am not sure about rejecting the Futures-based API. I think I would
> prefer that, personally. Grant had an interesting idea of not exposing the
> batch methods in the AdminClient to start with to keep it simple and
> relying on a Future based API to make it easy for users to run things
> concurrently. This is consistent with the producer and Java 8 makes things
> a lot nicer with CompletableFuture (similar to Scala Futures). I will think
> more about this and other details of the proposal and send a follow-up.
>
> Ismael
>
> On Thu, Feb 2, 2017 at 6:54 PM, Colin McCabe  wrote:
>
> > Hi all,
> >
> > I wrote up a Kafka improvement proposal for adding an
> > AdministrativeClient interface.  This is a continuation of the work on
> > KIP-4 towards centralized administrative operations.  Please check out
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-117%
> 3A+Add+a+public+
> > AdministrativeClient+API+for+Kafka+admin+operations
> >
> > regards,
> > Colin
> >
>


Re: [DISCUSS] KIP-116 - Add State Store Checkpoint Interval Configuration

2017-02-02 Thread Matthias J. Sax
Thanks Damian.

One more question: "Checkpointing is disabled if the checkpoint interval
is set to a value <=0."


Does it make sense to disable check pointing? What's the tradeoff here?


-Matthias


On 2/2/17 1:51 AM, Damian Guy wrote:
> Hi Matthias,
> 
> Thanks for the comments.
> 
> 1. TBD - i need to do some performance tests and try and work out a
> sensible default.
> 2. Yes, you are correct. It could be a multiple of the commit.interval.ms.
> But, that would also mean if you change the commit interval - say you lower
> it, then you might also need to change the checkpoint setting (i.e, you
> still only want to checkpoint every n minutes).
> 
> On Wed, 1 Feb 2017 at 23:46 Matthias J. Sax  wrote:
> 
>> Thanks for the KIP Damian.
>>
>> I am wondering about two things:
>>
>> 1. what should be the default value for the new parameter?
>> 2. why is the new parameter provided in ms?
>>
>> About (2): because
>>
>> "the minimum checkpoint interval will be the value of
>> commit.interval.ms. In effect the actual checkpoint interval will be a
>> multiple of the commit interval"
>>
>> it might be easier to just use an parameter that is "number-or-commit
>> intervals".
>>
>>
>> -Matthias
>>
>>
>> On 2/1/17 7:29 AM, Damian Guy wrote:
>>> Thanks for the comments Eno.
>>> As for exactly once, i don't believe this matters as we are just
>> restoring
>>> the change-log, i.e, the result of the aggregations that previously ran
>>> etc. So once initialized the state store will be in the same state as it
>>> was before.
>>> Having the checkpoint in a kafka topic is not ideal as the state is per
>>> kafka streams instance. So each instance would need to start with a
>> unique
>>> id that is persistent.
>>>
>>> Cheers,
>>> Damian
>>>
>>> On Wed, 1 Feb 2017 at 13:20 Eno Thereska  wrote:
>>>
 As a follow up to my previous comment, have you thought about writing
>> the
 checkpoint to a topic instead of a local file? That would have the
 advantage that all metadata continues to be managed by Kafka, as well as
 fit with EoS. The potential disadvantage would be a slower latency,
>> however
 if it is periodic as you mention, I'm not sure that would be a show
>> stopper.

 Thanks
 Eno
> On 1 Feb 2017, at 12:58, Eno Thereska  wrote:
>
> Thanks Damian, this is a good idea and will reduce the restore time.
 Looking forward, with exactly once and support for transactions in
>> Kafka, I
 believe we'll have to add some support for rolling back checkpoints,
>> e.g.,
 when a transaction is aborted. We need to be aware of that and ideally
 anticipate a bit those needs in the KIP.
>
> Thanks
> Eno
>
>
>> On 1 Feb 2017, at 10:18, Damian Guy  wrote:
>>
>> Hi all,
>>
>> I would like to start the discussion on KIP-116:
>>
>>

>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-116+-+Add+State+Store+Checkpoint+Interval+Configuration
>>
>> Thanks,
>> Damian
>


>>>
>>
>>
> 



signature.asc
Description: OpenPGP digital signature


[jira] [Updated] (KAFKA-4317) RocksDB checkpoint files lost on kill -9

2017-02-02 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4317:
---
Description: 
Right now, the checkpoint files for logged RocksDB stores are written during a 
graceful shutdown, and removed upon restoration. Unfortunately this means that 
in a scenario where the process is forcibly killed, the checkpoint files are 
not there, so all RocksDB stores are rematerialized from scratch on the next 
launch.

In a way, this is good, because it simulates bootstrapping a new node (for 
example, its a good way to see how much I/O is used to rematerialize the 
stores) however it leads to longer recovery times when a non-graceful shutdown 
occurs and we want to get the job up and running again.

It seems that two possible things to consider:

- Simply do not remove checkpoint files on restoring. This way a kill -9 will 
result in only repeating the restoration of all the data generated in the 
source topics since the last graceful shutdown.

- Continually update the checkpoint files (perhaps on commit) -- this would 
result in the least amount of overhead/latency in restarting, but the 
additional complexity may not be worth it.


https://cwiki.apache.org/confluence/display/KAFKA/KIP-116%3A+Add+State+Store+Checkpoint+Interval+Configuration

  was:
Right now, the checkpoint files for logged RocksDB stores are written during a 
graceful shutdown, and removed upon restoration. Unfortunately this means that 
in a scenario where the process is forcibly killed, the checkpoint files are 
not there, so all RocksDB stores are rematerialized from scratch on the next 
launch.

In a way, this is good, because it simulates bootstrapping a new node (for 
example, its a good way to see how much I/O is used to rematerialize the 
stores) however it leads to longer recovery times when a non-graceful shutdown 
occurs and we want to get the job up and running again.

It seems that two possible things to consider:

- Simply do not remove checkpoint files on restoring. This way a kill -9 will 
result in only repeating the restoration of all the data generated in the 
source topics since the last graceful shutdown.

- Continually update the checkpoint files (perhaps on commit) -- this would 
result in the least amount of overhead/latency in restarting, but the 
additional complexity may not be worth it.


> RocksDB checkpoint files lost on kill -9
> 
>
> Key: KAFKA-4317
> URL: https://issues.apache.org/jira/browse/KAFKA-4317
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Greg Fodor
>Assignee: Damian Guy
>Priority: Critical
>  Labels: architecture, needs-kip, user-experience
>
> Right now, the checkpoint files for logged RocksDB stores are written during 
> a graceful shutdown, and removed upon restoration. Unfortunately this means 
> that in a scenario where the process is forcibly killed, the checkpoint files 
> are not there, so all RocksDB stores are rematerialized from scratch on the 
> next launch.
> In a way, this is good, because it simulates bootstrapping a new node (for 
> example, its a good way to see how much I/O is used to rematerialize the 
> stores) however it leads to longer recovery times when a non-graceful 
> shutdown occurs and we want to get the job up and running again.
> It seems that two possible things to consider:
> - Simply do not remove checkpoint files on restoring. This way a kill -9 will 
> result in only repeating the restoration of all the data generated in the 
> source topics since the last graceful shutdown.
> - Continually update the checkpoint files (perhaps on commit) -- this would 
> result in the least amount of overhead/latency in restarting, but the 
> additional complexity may not be worth it.
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-116%3A+Add+State+Store+Checkpoint+Interval+Configuration



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4423) Drop support for Java 7

2017-02-02 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4423?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-4423:
---
Description: 
Java 7 was released in July 2011, it hasn't received public updates since April 
2015, Java 8 was released in March 2014 and Java 9 is scheduled to be released 
in July 2017.

The last public release of JDK 7 by Oracle contains a large number of known 
security vulnerabilities and Java 8 introduces a number of
compelling features and we will soon have to support Java 9 so it would be good 
to drop support for Java 7 in 2017. The actual timing would depend on when we 
release the next major release of Kafka.

More details can be found in the KIP:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-118%3A+Drop+Support+for+Java+7+in+Kafka+0.11

  was:
Java 7 was released in July 2011, it hasn't received public updates since April 
2015, Java 8 was released in March 2014 and Java 9 is scheduled to be released 
in July 2017.

The last public release of JDK 7 by Oracle contains a large number of known 
security vulnerabilities and Java 8 introduces a number of
compelling features and we will soon have to support Java 9 so it would be good 
to drop support for Java 7 in 2017. The actual timing would depend on when we 
release the next major release of Kafka.

More details on pros and cons are captured in the following discussion thread 
in the mailing list:

http://search-hadoop.com/m/Kafka/uyzND1oIhV61GS5Sf2

Before we can do this, we need to discuss and vote on a concrete proposal in 
the mailing list.


> Drop support for Java 7
> ---
>
> Key: KAFKA-4423
> URL: https://issues.apache.org/jira/browse/KAFKA-4423
> Project: Kafka
>  Issue Type: Task
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>  Labels: kip
> Fix For: 0.11.0.0
>
>
> Java 7 was released in July 2011, it hasn't received public updates since 
> April 2015, Java 8 was released in March 2014 and Java 9 is scheduled to be 
> released in July 2017.
> The last public release of JDK 7 by Oracle contains a large number of known 
> security vulnerabilities and Java 8 introduces a number of
> compelling features and we will soon have to support Java 9 so it would be 
> good to drop support for Java 7 in 2017. The actual timing would depend on 
> when we release the next major release of Kafka.
> More details can be found in the KIP:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-118%3A+Drop+Support+for+Java+7+in+Kafka+0.11



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [VOTE] KIP-98: Exactly Once Delivery and Transactional Messaging

2017-02-02 Thread Apurva Mehta
The wiki has been updated with a section on authorization, as well a
summary of the message format changes.

On Thu, Feb 2, 2017 at 9:38 AM, Jason Gustafson  wrote:

> Thanks Tom, we'll update the wiki to reflect all the movement on the design
> document. Did you have a specific concern with the new ACLs?
>
> -Jason
>
> On Thu, Feb 2, 2017 at 6:49 AM, Ismael Juma  wrote:
>
> > Hi Tom,
> >
> > That is a good point. During the discussion, it was agreed that changes
> to
> > public interfaces (message format, protocol, ACLs, etc.) would be copied
> to
> > the wiki once the things had settled down, but it looks like that hasn't
> > been done yet. I agree that it makes sense to do it before people vote on
> > it.
> >
> > Ismael
> >
> > On Thu, Feb 2, 2017 at 2:42 PM, Tom Crayford 
> wrote:
> >
> > > -1 (non-binding)
> > >
> > > I've been slow at keeping up with the KIP and the discussion thread.
> This
> > > is an exciting and quite complex new feature, which provides great new
> > > functionality.
> > >
> > > There's a thing I noticed missing from the KIP that's present in the
> > google
> > > doc - the doc talks about ACLs for TransactionalId. If those are going
> to
> > > land with the KIP, I think they should be included in the KIP itself,
> as
> > > new ACLs are significant security changes.
> > >
> > > On Thu, Feb 2, 2017 at 10:04 AM, Eno Thereska 
> > > wrote:
> > >
> > > > +1 (non-binding).
> > > >
> > > > Excellent work and discussions!
> > > >
> > > > Eno
> > > > > On 2 Feb 2017, at 04:13, Guozhang Wang  wrote:
> > > > >
> > > > > Hi all,
> > > > >
> > > > > We would like to start the voting process for KIP-98. The KIP can
> be
> > > > found
> > > > > at
> > > > >
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> > > > >
> > > > > Discussion thread can be found here:
> > > > >
> > > > > http://search-hadoop.com/m/Kafka/uyzND1jwZrr7HRHf?subj=+
> > > > DISCUSS+KIP+98+Exactly+Once+Delivery+and+Transactional+Messaging
> > > > >
> > > > > Thanks,
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > >
> > > >
> > >
> >
>


[jira] [Commented] (KAFKA-4708) Fix Transient failure in BrokerApiVersionsCommandTest.checkBrokerApiVersionCommandOutput

2017-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15850857#comment-15850857
 ] 

ASF GitHub Bot commented on KAFKA-4708:
---

GitHub user cmccabe opened a pull request:

https://github.com/apache/kafka/pull/2489

KAFKA-4708: Fix Transient Failure in BrokerApiVersionsCommandTest.che…

…ckBrokerApiVersionCommandOutput

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cmccabe/kafka KAFKA-4708

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2489.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2489


commit 40301f128e748e690d8939ffc61f83617d61f8f1
Author: Colin P. Mccabe 
Date:   2017-02-03T00:51:03Z

KAFKA-4708: Fix Transient Failure in 
BrokerApiVersionsCommandTest.checkBrokerApiVersionCommandOutput




> Fix Transient failure in 
> BrokerApiVersionsCommandTest.checkBrokerApiVersionCommandOutput
> 
>
> Key: KAFKA-4708
> URL: https://issues.apache.org/jira/browse/KAFKA-4708
> Project: Kafka
>  Issue Type: Bug
>  Components: unit tests
>Reporter: Jason Gustafson
>Assignee: Colin P. McCabe
> Fix For: 0.10.3.0
>
>
> {code}
> kafka.admin.BrokerApiVersionsCommandTest > checkBrokerApiVersionCommandOutput 
> FAILED
> org.junit.ComparisonFailure: expected:<[localhost:34091 (id: 0 rack: 
> null) -> (]> but was:<[]>
> at org.junit.Assert.assertEquals(Assert.java:115)
> at org.junit.Assert.assertEquals(Assert.java:144)
> at 
> kafka.admin.BrokerApiVersionsCommandTest.checkBrokerApiVersionCommandOutput(BrokerApiVersionsCommandTest.scala:44)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4708) Fix Transient failure in BrokerApiVersionsCommandTest.checkBrokerApiVersionCommandOutput

2017-02-02 Thread Colin P. McCabe (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin P. McCabe updated KAFKA-4708:
---
Status: Patch Available  (was: Open)

> Fix Transient failure in 
> BrokerApiVersionsCommandTest.checkBrokerApiVersionCommandOutput
> 
>
> Key: KAFKA-4708
> URL: https://issues.apache.org/jira/browse/KAFKA-4708
> Project: Kafka
>  Issue Type: Bug
>  Components: unit tests
>Reporter: Jason Gustafson
>Assignee: Colin P. McCabe
> Fix For: 0.10.3.0
>
>
> {code}
> kafka.admin.BrokerApiVersionsCommandTest > checkBrokerApiVersionCommandOutput 
> FAILED
> org.junit.ComparisonFailure: expected:<[localhost:34091 (id: 0 rack: 
> null) -> (]> but was:<[]>
> at org.junit.Assert.assertEquals(Assert.java:115)
> at org.junit.Assert.assertEquals(Assert.java:144)
> at 
> kafka.admin.BrokerApiVersionsCommandTest.checkBrokerApiVersionCommandOutput(BrokerApiVersionsCommandTest.scala:44)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2490: kafka-4727: A Production server configuration need...

2017-02-02 Thread amethystic
GitHub user amethystic opened a pull request:

https://github.com/apache/kafka/pull/2490

kafka-4727: A Production server configuration needs to be updated

1. Update value for queued.max.requests to 500
2. Removed invalid config 'controller.message.queue.size'
3. Removed flush configs including 'log.flush.interval.messages', 
'log.flush.interval.ms' and 'log.flush.scheduler.interval.ms'

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/amethystic/kafka 
kafka4727_server_config_doc_update

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2490.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2490


commit 349605c9c4e5f3964d05ba34fb15fde979401348
Author: huxi 
Date:   2017-02-03T01:04:38Z

kafka-4727: A Production server configuration needs to be updated

1. Update value for queued.max.requests to 500
2. Removed invalid config 'controller.message.queue.size'
3. Removed flush configs including 'log.flush.interval.messages', 
'log.flush.interval.ms' and 'log.flush.scheduler.interval.ms'




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #2489: KAFKA-4708: Fix Transient Failure in BrokerApiVers...

2017-02-02 Thread cmccabe
GitHub user cmccabe opened a pull request:

https://github.com/apache/kafka/pull/2489

KAFKA-4708: Fix Transient Failure in BrokerApiVersionsCommandTest.che…

…ckBrokerApiVersionCommandOutput

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cmccabe/kafka KAFKA-4708

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2489.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2489


commit 40301f128e748e690d8939ffc61f83617d61f8f1
Author: Colin P. Mccabe 
Date:   2017-02-03T00:51:03Z

KAFKA-4708: Fix Transient Failure in 
BrokerApiVersionsCommandTest.checkBrokerApiVersionCommandOutput




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (KAFKA-4727) A Production server configuration needs to be updated

2017-02-02 Thread huxi (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxi reassigned KAFKA-4727:
---

Assignee: huxi

> A Production server configuration needs to be updated
> -
>
> Key: KAFKA-4727
> URL: https://issues.apache.org/jira/browse/KAFKA-4727
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
>Assignee: huxi
>  Labels: newbie
>
> In docs/ops.html, we have a section on "A Production server configuration" 
> with queued.max.requests=16. This is often too low. We should change it to 
> the default value, which is 500. It will also be useful to see if other 
> configurations need to be changed accordingly.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4708) Fix Transient failure in BrokerApiVersionsCommandTest.checkBrokerApiVersionCommandOutput

2017-02-02 Thread Colin P. McCabe (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin P. McCabe updated KAFKA-4708:
---
Summary: Fix Transient failure in 
BrokerApiVersionsCommandTest.checkBrokerApiVersionCommandOutput  (was: 
Transient failure in 
BrokerApiVersionsCommandTest.checkBrokerApiVersionCommandOutput)

> Fix Transient failure in 
> BrokerApiVersionsCommandTest.checkBrokerApiVersionCommandOutput
> 
>
> Key: KAFKA-4708
> URL: https://issues.apache.org/jira/browse/KAFKA-4708
> Project: Kafka
>  Issue Type: Bug
>  Components: unit tests
>Reporter: Jason Gustafson
>Assignee: Colin P. McCabe
> Fix For: 0.10.3.0
>
>
> {code}
> kafka.admin.BrokerApiVersionsCommandTest > checkBrokerApiVersionCommandOutput 
> FAILED
> org.junit.ComparisonFailure: expected:<[localhost:34091 (id: 0 rack: 
> null) -> (]> but was:<[]>
> at org.junit.Assert.assertEquals(Assert.java:115)
> at org.junit.Assert.assertEquals(Assert.java:144)
> at 
> kafka.admin.BrokerApiVersionsCommandTest.checkBrokerApiVersionCommandOutput(BrokerApiVersionsCommandTest.scala:44)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2488: MINOR: add architecture section and configure / ex...

2017-02-02 Thread guozhangwang
GitHub user guozhangwang opened a pull request:

https://github.com/apache/kafka/pull/2488

MINOR: add architecture section and configure / execution for streams



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/guozhangwang/kafka 
KMinor-streams-docs-second-pass

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2488.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2488


commit 7949d05da30ac9f0875855dd2bae7ab0ae6137f6
Author: Guozhang Wang 
Date:   2017-02-02T01:06:21Z

quickstart, concepts, misc fixes

commit 78e687c345dde75f68b7b4cb3e3dc95c4055b169
Author: Guozhang Wang 
Date:   2017-02-02T18:23:13Z

github comments

commit b468ac0a3dc899ec17b30828b051b2c58bdcc859
Author: Guozhang Wang 
Date:   2017-02-02T18:44:03Z

Merge branch 'trunk' of https://git-wip-us.apache.org/repos/asf/kafka into 
KMinor-streams-docs-first-pass

commit e00bb85237c1e71c97fd9cfcb5c95719564253e2
Author: Guozhang Wang 
Date:   2017-02-03T00:16:06Z

configuration and execution; architecture sections

commit 84e2585ac7ef3bc52dbd024efef0575f700f45fb
Author: Guozhang Wang 
Date:   2017-02-03T00:16:33Z

added figures




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (KAFKA-4423) Drop support for Java 7

2017-02-02 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4423?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-4423:
---
Labels: kip  (was: )

> Drop support for Java 7
> ---
>
> Key: KAFKA-4423
> URL: https://issues.apache.org/jira/browse/KAFKA-4423
> Project: Kafka
>  Issue Type: Task
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>  Labels: kip
> Fix For: 0.11.0.0
>
>
> Java 7 was released in July 2011, it hasn't received public updates since 
> April 2015, Java 8 was released in March 2014 and Java 9 is scheduled to be 
> released in July 2017.
> The last public release of JDK 7 by Oracle contains a large number of known 
> security vulnerabilities and Java 8 introduces a number of
> compelling features and we will soon have to support Java 9 so it would be 
> good to drop support for Java 7 in 2017. The actual timing would depend on 
> when we release the next major release of Kafka.
> More details on pros and cons are captured in the following discussion thread 
> in the mailing list:
> http://search-hadoop.com/m/Kafka/uyzND1oIhV61GS5Sf2
> Before we can do this, we need to discuss and vote on a concrete proposal in 
> the mailing list.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4423) Drop support for Java 7

2017-02-02 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4423?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-4423:
---
Fix Version/s: 0.11.0.0

> Drop support for Java 7
> ---
>
> Key: KAFKA-4423
> URL: https://issues.apache.org/jira/browse/KAFKA-4423
> Project: Kafka
>  Issue Type: Task
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>  Labels: kip
> Fix For: 0.11.0.0
>
>
> Java 7 was released in July 2011, it hasn't received public updates since 
> April 2015, Java 8 was released in March 2014 and Java 9 is scheduled to be 
> released in July 2017.
> The last public release of JDK 7 by Oracle contains a large number of known 
> security vulnerabilities and Java 8 introduces a number of
> compelling features and we will soon have to support Java 9 so it would be 
> good to drop support for Java 7 in 2017. The actual timing would depend on 
> when we release the next major release of Kafka.
> More details on pros and cons are captured in the following discussion thread 
> in the mailing list:
> http://search-hadoop.com/m/Kafka/uyzND1oIhV61GS5Sf2
> Before we can do this, we need to discuss and vote on a concrete proposal in 
> the mailing list.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2487: Kafka-4722 : Add application.id to StreamThread na...

2017-02-02 Thread sharad-develop
GitHub user sharad-develop opened a pull request:

https://github.com/apache/kafka/pull/2487

Kafka-4722 : Add application.id to StreamThread name

Kafka-4722 : Add application.id to StreamThread name

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sharad-develop/kafka KAFKA-4722

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2487.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2487


commit 7946b2f35c0e3811ba7d706fc9c761d73ece47bb
Author: Damian Guy 
Date:   2017-01-16T19:40:47Z

MINOR: Remove unused constructor param from ProcessorStateManager

Remove applicationId parameter as it is no longer used.

Author: Damian Guy 

Reviewers: Guozhang Wang 

Closes #2385 from dguy/minor-remove-unused-param

commit 621dff22e79dc64b9a8748186dd985774044f91a
Author: Rajini Sivaram 
Date:   2017-01-17T11:16:29Z

KAFKA-4363; Documentation for sasl.jaas.config property

Author: Rajini Sivaram 

Reviewers: Ismael Juma 

Closes #2316 from rajinisivaram/KAFKA-4363

commit e3f4cdd0e249f78a7f4e8f064533bcd15eb11cbf
Author: Rajini Sivaram 
Date:   2017-01-17T12:55:07Z

KAFKA-4590; SASL/SCRAM system tests

Runs sanity test and one replication test using SASL/SCRAM.

Author: Rajini Sivaram 

Reviewers: Ewen Cheslack-Postava , Ismael Juma 


Closes #2355 from rajinisivaram/KAFKA-4590

commit 2b19ad9d8c47fb0f78a6e90d2f5711df6110bf1f
Author: Rajini Sivaram 
Date:   2017-01-17T18:42:55Z

KAFKA-4580; Use sasl.jaas.config for some system tests

Switched console_consumer, verifiable_consumer and verifiable_producer to 
use new sasl.jaas_config property instead of static JAAS configuration file 
when used with SASL_PLAINTEXT.

Author: Rajini Sivaram 

Reviewers: Ewen Cheslack-Postava , Ismael Juma 


Closes #2323 from rajinisivaram/KAFKA-4580

(cherry picked from commit 3f6c4f63c9c17424cf717ca76c74554bcf3b2e9a)
Signed-off-by: Ismael Juma 

commit 60d759a227087079e6fd270c68fd9e38441cb34a
Author: Jason Gustafson 
Date:   2017-01-17T18:42:05Z

MINOR: Some cleanups and additional testing for KIP-88

Author: Jason Gustafson 

Reviewers: Vahid Hashemian , Ismael Juma 


Closes #2383 from hachikuji/minor-cleanup-kip-88

commit c9b9acf6a8b542c2d0d825c17a4a20cf3fa5
Author: Damian Guy 
Date:   2017-01-17T20:33:11Z

KAFKA-4588: Wait for topics to be created in 
QueryableStateIntegrationTest.shouldNotMakeStoreAvailableUntilAllStoresAvailable

After debugging this i can see the times that it fails there is a race 
between when the topic is actually created/ready on the broker and when the 
assignment happens. When it fails `StreamPartitionAssignor.assign(..)` gets 
called with a `Cluster` with no topics. Hence the test hangs as no tasks get 
assigned. To fix this I added a `waitForTopics` method to 
`EmbeddedKafkaCluster`. This will wait until the topics have been created.

Author: Damian Guy 

Reviewers: Matthias J. Sax, Guozhang Wang

Closes #2371 from dguy/integration-test-fix

(cherry picked from commit 825f225bc5706b16af8ec44ca47ee1452c11e6f3)
Signed-off-by: Guozhang Wang 

commit 6f72a5a53c444278187fa6be58031168bcaffb26
Author: Damian Guy 
Date:   2017-01-17T22:13:46Z

KAFKA-3452 Follow-up: Refactoring StateStore hierarchies

This is a follow up of https://github.com/apache/kafka/pull/2166 - 
refactoring the store hierarchies as requested

Author: Damian Guy 

Reviewers: Guozhang Wang 

Closes #2360 from dguy/state-store-refactor

(cherry picked from commit 73b7ae0019d387407375f3865e263225c986a6ce)
Signed-off-by: Guozhang Wang 

commit eb62e5695506ae13bd37102c3c08e8a067eca0c8
Author: Ismael Juma 
Date:   2017-01-18T02:43:10Z

KAFKA-4591; Create Topic Policy follow-up

1. Added javadoc to public classes
2. Removed `s` from config name for consistency with interface name
3. The policy interface now implements Configurable and AutoCloseable as 
per the KIP
4. Use `null` instead of `-1` in `RequestMetadata`
5. 

[jira] [Assigned] (KAFKA-3924) Data loss due to halting when LEO is larger than leader's LEO

2017-02-02 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma reassigned KAFKA-3924:
--

Assignee: Maysam Yabandeh

> Data loss due to halting when LEO is larger than leader's LEO
> -
>
> Key: KAFKA-3924
> URL: https://issues.apache.org/jira/browse/KAFKA-3924
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.0
>Reporter: Maysam Yabandeh
>Assignee: Maysam Yabandeh
>  Labels: reliability
> Fix For: 0.10.0.1
>
> Attachments: deadlock-stack
>
>
> Currently the follower broker panics when its LEO is larger than its leader's 
> LEO,  and assuming that this is an impossible state to reach, halts the 
> process to prevent any further damage.
> {code}
> if (leaderEndOffset < replica.logEndOffset.messageOffset) {
>   // Prior to truncating the follower's log, ensure that doing so is not 
> disallowed by the configuration for unclean leader election.
>   // This situation could only happen if the unclean election 
> configuration for a topic changes while a replica is down. Otherwise,
>   // we should never encounter this situation since a non-ISR leader 
> cannot be elected if disallowed by the broker configuration.
>   if (!LogConfig.fromProps(brokerConfig.originals, 
> AdminUtils.fetchEntityConfig(replicaMgr.zkUtils,
> ConfigType.Topic, 
> topicAndPartition.topic)).uncleanLeaderElectionEnable) {
> // Log a fatal error and shutdown the broker to ensure that data loss 
> does not unexpectedly occur.
> fatal("...")
> Runtime.getRuntime.halt(1)
>   }
> {code}
> Firstly this assumption is invalid and there are legitimate cases (examples 
> below) that this case could actually occur. Secondly halt results into the 
> broker losing its un-flushed data, and if multiple brokers halt 
> simultaneously there is a chance that both leader and followers of a 
> partition are among the halted brokers, which would result into permanent 
> data loss.
> Given that this is a legit case, we suggest to replace it with a graceful 
> shutdown to avoid propagating data loss to the entire cluster.
> Details:
> One legit case that this could actually occur is when a troubled broker 
> shrinks its partitions right before crashing (KAFKA-3410 and KAFKA-3861). In 
> this case the broker has lost some data but the controller cannot still 
> elects the others as the leader. If the crashed broker comes back up, the 
> controller elects it as the leader, and as a result all other brokers who are 
> now following it halt since they have LEOs larger than that of shrunk topics 
> in the restarted broker.  We actually had a case that bringing up a crashed 
> broker simultaneously took down the entire cluster and as explained above 
> this could result into data loss.
> The other legit case is when multiple brokers ungracefully shutdown at the 
> same time. In this case both of the leader and the followers lose their 
> un-flushed data but one of them has HW larger than the other. Controller 
> elects the one who comes back up sooner as the leader and if its LEO is less 
> than its future follower, the follower will halt (and probably lose more 
> data). Simultaneous ungrateful shutdown could happen due to hardware issue 
> (e.g., rack power failure), operator errors, or software issue (e.g., the 
> case above that is further explained in KAFKA-3410 and KAFKA-3861 and causes 
> simultaneous halts in multiple brokers)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4727) A Production server configuration needs to be updated

2017-02-02 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-4727:
---
Labels: newbie  (was: newbies)

> A Production server configuration needs to be updated
> -
>
> Key: KAFKA-4727
> URL: https://issues.apache.org/jira/browse/KAFKA-4727
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
>  Labels: newbie
>
> In docs/ops.html, we have a section on "A Production server configuration" 
> with queued.max.requests=16. This is often too low. We should change it to 
> the default value, which is 500. It will also be useful to see if other 
> configurations need to be changed accordingly.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4727) A Production server configuration needs to be updated

2017-02-02 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-4727:
---
Labels: newbies  (was: )

> A Production server configuration needs to be updated
> -
>
> Key: KAFKA-4727
> URL: https://issues.apache.org/jira/browse/KAFKA-4727
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
>  Labels: newbie
>
> In docs/ops.html, we have a section on "A Production server configuration" 
> with queued.max.requests=16. This is often too low. We should change it to 
> the default value, which is 500. It will also be useful to see if other 
> configurations need to be changed accordingly.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4727) A Production server configuration needs to be updated

2017-02-02 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-4727:
--

 Summary: A Production server configuration needs to be updated
 Key: KAFKA-4727
 URL: https://issues.apache.org/jira/browse/KAFKA-4727
 Project: Kafka
  Issue Type: Improvement
Reporter: Jun Rao


In docs/ops.html, we have a section on "A Production server configuration" with 
queued.max.requests=16. This is often too low. We should change it to the 
default value, which is 500. It will also be useful to see if other 
configurations need to be changed accordingly.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2483: MINOR: Ensure timestamp type is provided when up-c...

2017-02-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2483


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-117: Add a public AdministrativeClient API for Kafka admin operations

2017-02-02 Thread Ismael Juma
Hi Colin,

Thanks for the KIP, great to make progress on this. I have some initial
comments, will post more later:

1. We have KafkaProducer that implements the Producer interface and
KafkaConsumer that implements the Consumer interface. Maybe we could have
KafkaAdminClient that implements the AdminClient interface? Or maybe just
KafkaAdmin. Not sure, some ideas for consideration. Also, I don't think we
should worry about a name clash with the internal AdminClient written in
Scala. That will go away soon enough and choosing a good name for the
public class is what matters.

2. We should include the proposed package name in the KIP
(probably org.apache.kafka.clients.admin?).

3. It would be good to list the supported configs.

4. KIP-107, which passed the vote, specifies the introduction of a method
to AdminClient with the following signature. We should figure out how it
would look given this proposal.

Future>
purgeDataBefore(Map offsetForPartition)

5. I am not sure about rejecting the Futures-based API. I think I would
prefer that, personally. Grant had an interesting idea of not exposing the
batch methods in the AdminClient to start with to keep it simple and
relying on a Future based API to make it easy for users to run things
concurrently. This is consistent with the producer and Java 8 makes things
a lot nicer with CompletableFuture (similar to Scala Futures). I will think
more about this and other details of the proposal and send a follow-up.

Ismael

On Thu, Feb 2, 2017 at 6:54 PM, Colin McCabe  wrote:

> Hi all,
>
> I wrote up a Kafka improvement proposal for adding an
> AdministrativeClient interface.  This is a continuation of the work on
> KIP-4 towards centralized administrative operations.  Please check out
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-117%3A+Add+a+public+
> AdministrativeClient+API+for+Kafka+admin+operations
>
> regards,
> Colin
>


[GitHub] kafka pull request #2461: MINOR: added upgrade and API changes to docs

2017-02-02 Thread mjsax
GitHub user mjsax reopened a pull request:

https://github.com/apache/kafka/pull/2461

MINOR: added upgrade and API changes to docs



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mjsax/kafka addStreamsUpdateSecton

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2461.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2461


commit ba922d6f0b50caf331d992c037149fb30f54f15d
Author: Matthias J. Sax 
Date:   2017-01-27T23:37:52Z

MINOR: added upgrade and API changes to docs

commit 38e0ea16406a119dc9a1f21a4f71dbd6072fdcf5
Author: Matthias J. Sax 
Date:   2017-01-30T07:26:20Z

GitHub comments

commit 000fa1c9705463fe728f1e5c0a591bc5b35bfa45
Author: Matthias J. Sax 
Date:   2017-01-31T00:15:22Z

GitHub comments
added change to null-key semantics for KTable joins

commit ab0e76c33b036e721210decdc8cea7e0c5a087aa
Author: Matthias J. Sax 
Date:   2017-01-31T18:00:38Z

GitHub comments

commit 0024c18c67b2dd198747541cd088fb0255c046d2
Author: Matthias J. Sax 
Date:   2017-02-01T21:57:56Z

Michael's comments

commit c4e0a5e38ed70a1abc52a83cfd561a5bba57bcc3
Author: Matthias J. Sax 
Date:   2017-02-02T18:38:56Z

Github comments

commit 1b400760417d2195f0cdc665180e66899caa95b4
Author: Matthias J. Sax 
Date:   2017-02-02T18:44:03Z

reorder bullet point list




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (KAFKA-4039) Exit Strategy: using exceptions instead of inline invocation of exit/halt

2017-02-02 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma reassigned KAFKA-4039:
--

Assignee: Maysam Yabandeh

> Exit Strategy: using exceptions instead of inline invocation of exit/halt
> -
>
> Key: KAFKA-4039
> URL: https://issues.apache.org/jira/browse/KAFKA-4039
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.0
>Reporter: Maysam Yabandeh
>Assignee: Maysam Yabandeh
>Priority: Critical
>  Labels: reliability
> Fix For: 0.10.3.0
>
> Attachments: deadlock-stack2
>
>
> The current practice is to directly invoke halt/exit right after the line 
> that intends to terminate the execution. In the case of System.exit this 
> could cause deadlocks if the thread invoking System.exit is holding  a lock 
> that will be requested by the shutdown hook threads that will be started by 
> System.exit. An example is reported by [~aozeritsky] in KAFKA-3924. This 
> would also makes testing more difficult as it would require mocking static 
> methods of System and Runtime classes, which is not natively supported in 
> Java.
> One alternative suggested 
> [here|https://issues.apache.org/jira/browse/KAFKA-3924?focusedCommentId=15420269=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15420269]
>  would be to throw some dedicated exceptions that will eventually invoke 
> exit/halt:
> {quote} it would be great to move away from executing `System.exit` inline in 
> favour of throwing an exception (two examples, but maybe we can find better 
> names: FatalExitException and FatalHaltException) that is caught by some 
> central code that then does the `System.exit` or `Runtime.getRuntime.halt`. 
> This helps in a couple of ways:
> (1) Avoids issues with locks being held as in this issue
> (2) It makes it possible to abstract the action, which is very useful in 
> tests. At the moment, we can't easily test for these conditions as they cause 
> the whole test harness to exit. Worse, these conditions are sometimes 
> triggered in the tests and it's unclear why.
> (3) We can have more consistent logging around these actions and possibly 
> extended logging for tests
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4039) Exit Strategy: using exceptions instead of inline invocation of exit/halt

2017-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15850618#comment-15850618
 ] 

ASF GitHub Bot commented on KAFKA-4039:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2474


> Exit Strategy: using exceptions instead of inline invocation of exit/halt
> -
>
> Key: KAFKA-4039
> URL: https://issues.apache.org/jira/browse/KAFKA-4039
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.0
>Reporter: Maysam Yabandeh
>Priority: Critical
>  Labels: reliability
> Fix For: 0.10.3.0
>
> Attachments: deadlock-stack2
>
>
> The current practice is to directly invoke halt/exit right after the line 
> that intends to terminate the execution. In the case of System.exit this 
> could cause deadlocks if the thread invoking System.exit is holding  a lock 
> that will be requested by the shutdown hook threads that will be started by 
> System.exit. An example is reported by [~aozeritsky] in KAFKA-3924. This 
> would also makes testing more difficult as it would require mocking static 
> methods of System and Runtime classes, which is not natively supported in 
> Java.
> One alternative suggested 
> [here|https://issues.apache.org/jira/browse/KAFKA-3924?focusedCommentId=15420269=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15420269]
>  would be to throw some dedicated exceptions that will eventually invoke 
> exit/halt:
> {quote} it would be great to move away from executing `System.exit` inline in 
> favour of throwing an exception (two examples, but maybe we can find better 
> names: FatalExitException and FatalHaltException) that is caught by some 
> central code that then does the `System.exit` or `Runtime.getRuntime.halt`. 
> This helps in a couple of ways:
> (1) Avoids issues with locks being held as in this issue
> (2) It makes it possible to abstract the action, which is very useful in 
> tests. At the moment, we can't easily test for these conditions as they cause 
> the whole test harness to exit. Worse, these conditions are sometimes 
> triggered in the tests and it's unclear why.
> (3) We can have more consistent logging around these actions and possibly 
> extended logging for tests
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2474: KAFKA-4039: Fix deadlock during shutdown due to lo...

2017-02-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2474


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (KAFKA-4039) Exit Strategy: using exceptions instead of inline invocation of exit/halt

2017-02-02 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-4039:
---
   Resolution: Fixed
Fix Version/s: 0.10.3.0
   Status: Resolved  (was: Patch Available)

Issue resolved by pull request 2474
[https://github.com/apache/kafka/pull/2474]

> Exit Strategy: using exceptions instead of inline invocation of exit/halt
> -
>
> Key: KAFKA-4039
> URL: https://issues.apache.org/jira/browse/KAFKA-4039
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.0
>Reporter: Maysam Yabandeh
>Priority: Critical
>  Labels: reliability
> Fix For: 0.10.3.0
>
> Attachments: deadlock-stack2
>
>
> The current practice is to directly invoke halt/exit right after the line 
> that intends to terminate the execution. In the case of System.exit this 
> could cause deadlocks if the thread invoking System.exit is holding  a lock 
> that will be requested by the shutdown hook threads that will be started by 
> System.exit. An example is reported by [~aozeritsky] in KAFKA-3924. This 
> would also makes testing more difficult as it would require mocking static 
> methods of System and Runtime classes, which is not natively supported in 
> Java.
> One alternative suggested 
> [here|https://issues.apache.org/jira/browse/KAFKA-3924?focusedCommentId=15420269=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15420269]
>  would be to throw some dedicated exceptions that will eventually invoke 
> exit/halt:
> {quote} it would be great to move away from executing `System.exit` inline in 
> favour of throwing an exception (two examples, but maybe we can find better 
> names: FatalExitException and FatalHaltException) that is caught by some 
> central code that then does the `System.exit` or `Runtime.getRuntime.halt`. 
> This helps in a couple of ways:
> (1) Avoids issues with locks being held as in this issue
> (2) It makes it possible to abstract the action, which is very useful in 
> tests. At the moment, we can't easily test for these conditions as they cause 
> the whole test harness to exit. Worse, these conditions are sometimes 
> triggered in the tests and it's unclear why.
> (3) We can have more consistent logging around these actions and possibly 
> extended logging for tests
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2461: MINOR: added upgrade and API changes to docs

2017-02-02 Thread mjsax
Github user mjsax closed the pull request at:

https://github.com/apache/kafka/pull/2461


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (KAFKA-4726) ValueMapper should have (read) access to key

2017-02-02 Thread Steven Schlansker (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4726?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Schlansker updated KAFKA-4726:
-
Description: 
{{ValueMapper}} should have read-only access to the key for the value it is 
mapping.  Sometimes the value transformation will depend on the key.

It is possible to do this with a full blown {{KeyValueMapper}} but that loses 
the promise that you won't change the key -- so you might introduce a re-keying 
phase that is totally unnecessary.  It also requires you to return an identity 
KeyValue object which costs something to construct (unless we are lucky and the 
optimizer elides it).

[ If mapValues() is guaranteed to be no less efficient than map() the issue may 
be moot, but I presume there are some optimizations that are valid with the 
former but not latter. ]

  was:
{{ValueMapper}} should have read-only access to the key for the value it is 
mapping.  Sometimes the value transformation will depend on the key.

It is possible to do this with a full blown {{KeyValueMapper}} but that loses 
the promise that you won't change the key -- so you might introduce a re-keying 
phase that is totally unnecessary.

[ If mapValues() is guaranteed to be no less efficient than map() the issue may 
be moot, but I presume there are some optimizations that are valid with the 
former but not latter. ]


> ValueMapper should have (read) access to key
> 
>
> Key: KAFKA-4726
> URL: https://issues.apache.org/jira/browse/KAFKA-4726
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.1.1
>Reporter: Steven Schlansker
>
> {{ValueMapper}} should have read-only access to the key for the value it is 
> mapping.  Sometimes the value transformation will depend on the key.
> It is possible to do this with a full blown {{KeyValueMapper}} but that loses 
> the promise that you won't change the key -- so you might introduce a 
> re-keying phase that is totally unnecessary.  It also requires you to return 
> an identity KeyValue object which costs something to construct (unless we are 
> lucky and the optimizer elides it).
> [ If mapValues() is guaranteed to be no less efficient than map() the issue 
> may be moot, but I presume there are some optimizations that are valid with 
> the former but not latter. ]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4726) ValueMapper should have (read) access to key

2017-02-02 Thread Steven Schlansker (JIRA)
Steven Schlansker created KAFKA-4726:


 Summary: ValueMapper should have (read) access to key
 Key: KAFKA-4726
 URL: https://issues.apache.org/jira/browse/KAFKA-4726
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 0.10.1.1
Reporter: Steven Schlansker


{{ValueMapper}} should have read-only access to the key for the value it is 
mapping.  Sometimes the value transformation will depend on the key.

It is possible to do this with a full blown {{KeyValueMapper}} but that loses 
the promise that you won't change the key -- so you might introduce a re-keying 
phase that is totally unnecessary.

[ If mapValues() is guaranteed to be no less efficient than map() the issue may 
be moot, but I presume there are some optimizations that are valid with the 
former but not latter. ]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging

2017-02-02 Thread Ismael Juma
Yes, I'd also prefer the option where we only have a checksum at the
message set level. I didn't suggest it due to the mentioned auditing use
cases, but if they can be satisfied in some other way, then that would be
great.

Ismael

On 2 Feb 2017 7:03 pm, "Jason Gustafson"  wrote:

One more:

1. I did some benchmarking and CRC32C seems to be a massive win when using
> the hardware instruction (particularly for messages larger than 65k), so
> I'm keen on taking advantage of the message format version bump to add
> support for it. I can write a separate KIP for this as it's not tied to
> Exactly-once, but it would be good to include the code change in the same
> PR that bumps the message format version. The benchmark and results can be
> found in the following link:
> https://gist.github.com/ijuma/e58ad79d489cb831b290e83b46a7d7bb.


Yeah, makes sense. We can add this to this KIP or do it separately,
whichever you prefer. I have also been very interested in removing the
individual message CRCs. The main reason we haven't done so is because some
auditing applications depend on them, but there are cases where it's
already unsafe to depend on the message CRCs not changing on the broker
(message conversion and the use of log append time can both result in new
message-level crcs). So I'm wondering a bit about the use cases that
require the message CRCs and how they handle this. Perhaps if it is not
dependable anyway, we can remove it and safe some space and computation.

-Jason


On Thu, Feb 2, 2017 at 10:28 AM, Jason Gustafson  wrote:

> Hey Ismael,
>
> 2. The message timestamp field is 8 bytes. Did we consider storing the
>> first timestamp in the message set and then storing deltas using varints
>> in
>> the messages like we do for offsets (the difference would be the usage of
>> signed varints)? It seems like the deltas would be quite a bit smaller in
>> the common case (potentially 0 for log append time, so we could even not
>> store them at all using attributes like we do for key/value lengths). An
>> alternative is using MaxTimestamp that is already present in the message
>> set and computing deltas from that, but that seems more complicated. In
>> any
>> case, details aside, was this idea considered and rejected or is it worth
>> exploring further?
>
>
> Took me a while to remember why we didn't do this. The timestamp that is
> included at the message set level is the max timestamp of all messages in
> the message set as is the case in the current message format (I will
update
> the document to make this explicit). We could make the message timestamps
> relative to the max timestamp, but that makes serialization a bit awkward
> since the timestamps are not assumed to be increasing sequentially or
> monotonically. Once the messages in the message set had been determined,
we
> would need to go back and adjust the relative timestamps.
>
> Here's one idea. We let the timestamps in the messages be varints, but we
> make their values be relative to the timestamp of the previous message,
> with the timestamp of the first message being absolute. For example, if we
> had timestamps 500, 501, 499, then we would write 500 for the first
> message, 1 for the next, and -2 for the final message. Would that work?
Let
> me think a bit about it and see if there are any problems.
>
> -Jason
>
> On Thu, Feb 2, 2017 at 9:04 AM, Apurva Mehta  wrote:
>
>> Good point Tom. We will update the KIP with the ACLs section and also the
>> message format changes.
>>
>> > On Feb 2, 2017, at 06:45, Tom Crayford  wrote:
>> >
>> > I said this in the voting thread, but can the authors include a section
>> > about new ACLs if there are going to be ACLs for TransactionalId. It's
>> > mentioned in the google doc, but I think new ACLs should be in a KIP
>> > directly.
>> >
>> >> On Thu, Feb 2, 2017 at 2:42 PM, Ismael Juma  wrote:
>> >>
>> >> Thanks for the responses and updates to the document, Guozhang and
>> Jason.
>> >> They look good. One follow-up and one additional comment:
>> >>
>> >> 1. I did some benchmarking and CRC32C seems to be a massive win when
>> using
>> >> the hardware instruction (particularly for messages larger than 65k),
>> so
>> >> I'm keen on taking advantage of the message format version bump to add
>> >> support for it. I can write a separate KIP for this as it's not tied
to
>> >> Exactly-once, but it would be good to include the code change in the
>> same
>> >> PR that bumps the message format version. The benchmark and results
>> can be
>> >> found in the following link:
>> >> https://gist.github.com/ijuma/e58ad79d489cb831b290e83b46a7d7bb.
>> >>
>> >> 2. The message timestamp field is 8 bytes. Did we consider storing the
>> >> first timestamp in the message set and then storing deltas using
>> varints in
>> >> the messages like we do for offsets (the difference would be the usage
>> of
>> >> signed varints)? It seems 

Re: [VOTE] 0.10.2.0 RC0

2017-02-02 Thread Mathieu Fenniak
+1 (non-binding)

Upgraded a KS app, custom KC connectors, and brokers, ran an end-to-end
test suite.  Looks like a great release to me. :-)

Mathieu


On Wed, Feb 1, 2017 at 4:44 PM, Ewen Cheslack-Postava 
wrote:

> Hello Kafka users, developers and client-developers,
>
> This is the first candidate for release of Apache Kafka 0.10.2.0.
>
> This is a minor version release of Apache Kafka. It includes 19 new KIPs.
> See the release notes and release plan (
> https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+0.10.2.0)
> for more details. A few feature highlights: SASL-SCRAM support, improved
> client compatibility to allow use of clients newer than the broker, session
> windows and global tables in the Kafka Streams API, single message
> transforms in the Kafka Connect framework.
>
> Release notes for the 0.10.2.0 release:
> http://home.apache.org/~ewencp/kafka-0.10.2.0-rc0/RELEASE_NOTES.html
>
> *** Please download, test and vote by Monday February 6th 5pm PST ***
> (Note the longer window to vote to account for the normal 7 days ending in
> the middle of the weekend.)
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> http://kafka.apache.org/KEYS
>
> * Release artifacts to be voted upon (source and binary):
> http://home.apache.org/~ewencp/kafka-0.10.2.0-rc0/
>
> * Maven artifacts to be voted upon:
> https://repository.apache.org/content/groups/staging/
>
> * Javadoc:
> http://home.apache.org/~ewencp/kafka-0.10.2.0-rc0/javadoc/
>
> * Tag to be voted upon (off 0.10.2 branch) is the 0.10.2.0 tag:
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=
> 33ebac1f138f17b86002df05e55a9f5cff47f48a
>
>
> * Documentation:
> http://kafka.apache.org/0102/documentation.html
>
> * Protocol:
> http://kafka.apache.org/0102/protocol.html
>
> * Successful Jenkins builds for the 0.10.2 branch:
> Unit/integration tests: https://builds.apache.org/job/
> kafka-0.10.2-jdk7/60/
> System tests:
> https://jenkins.confluent.io/job/system-test-kafka-branch-builder/697
>
> Thanks,
> Ewen Cheslack-Postava
>


Build failed in Jenkins: kafka-trunk-jdk8 #1244

2017-02-02 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] KAFKA-3896: Fix KStream-KStream leftJoin in 
RepartitionIntegrationTest

[wangguoz] MINOR: Update Streams docs: quickstart and concepts

[me] KAFKA-4719: Consumption timeout should take into account producer

--
[...truncated 65 lines...]
:kafka-trunk-jdk8:core:compileJava UP-TO-DATE
:kafka-trunk-jdk8:core:compileScala
:79:
 value DEFAULT_TIMESTAMP in object OffsetCommitRequest is deprecated: see 
corresponding Javadoc for more information.

org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP
 ^
:36:
 value DEFAULT_TIMESTAMP in object OffsetCommitRequest is deprecated: see 
corresponding Javadoc for more information.
 commitTimestamp: Long = 
org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP,

  ^
:37:
 value DEFAULT_TIMESTAMP in object OffsetCommitRequest is deprecated: see 
corresponding Javadoc for more information.
 expireTimestamp: Long = 
org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP) {

  ^
:501:
 value DEFAULT_TIMESTAMP in object OffsetCommitRequest is deprecated: see 
corresponding Javadoc for more information.
  if (offsetAndMetadata.expireTimestamp == 
org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP)

^
:323:
 value DEFAULT_TIMESTAMP in object OffsetCommitRequest is deprecated: see 
corresponding Javadoc for more information.
  if (partitionData.timestamp == 
OffsetCommitRequest.DEFAULT_TIMESTAMP)
 ^
:94:
 class ProducerConfig in package producer is deprecated: This class has been 
deprecated and will be removed in a future release. Please use 
org.apache.kafka.clients.producer.ProducerConfig instead.
val producerConfig = new ProducerConfig(props)
 ^
:95:
 method fetchTopicMetadata in object ClientUtils is deprecated: This method has 
been deprecated and will be removed in a future release.
fetchTopicMetadata(topics, brokers, producerConfig, correlationId)
^
:187:
 object ProducerRequestStatsRegistry in package producer is deprecated: This 
object has been deprecated and will be removed in a future release.
ProducerRequestStatsRegistry.removeProducerRequestStats(clientId)
^
:129:
 method readFromReadableChannel in class NetworkReceive is deprecated: see 
corresponding Javadoc for more information.
  response.readFromReadableChannel(channel)
   ^
:323:
 value timestamp in class PartitionData is deprecated: see corresponding 
Javadoc for more information.
  if (partitionData.timestamp == 
OffsetCommitRequest.DEFAULT_TIMESTAMP)
^
:326:
 value timestamp in class PartitionData is deprecated: see corresponding 
Javadoc for more information.
offsetRetention + partitionData.timestamp
^
:578:
 method offsetData in class ListOffsetRequest is deprecated: see corresponding 
Javadoc for more information.
val (authorizedRequestInfo, unauthorizedRequestInfo) = 
offsetRequest.offsetData.asScala.partition {
   

[jira] [Commented] (KAFKA-4689) OffsetValidationTest fails validation with "Current position greater than the total number of consumed records"

2017-02-02 Thread Apurva Mehta (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15850342#comment-15850342
 ] 

Apurva Mehta commented on KAFKA-4689:
-

This happened again in the system test run of 02/02/2017:

{noformat}

[INFO  - 2017-02-02 04:58:49,171 - runner_client - log - lineno:221]: 
RunnerClient: 
kafkatest.tests.client.consumer_test.OffsetValidationTest.test_consumer_bounce.clean_shutdown=False.bounce_mode=rolling:
 Summary: Current position greater than the total number of consumed records
Traceback (most recent call last):
  File 
"/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
 line 123, in run
data = self.run_test()
  File 
"/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
 line 176, in run_test
return self.test_context.function(self.test)
  File 
"/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/mark/_mark.py",
 line 321, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/tests/client/consumer_test.py",
 line 158, in test_consumer_bounce
"Current position greater than the total number of consumed records"
AssertionError: Current position greater than the total number of consumed 
records

[INFO  - 2017-02-02 04:58:49,172 - runner_client - log - lineno:221]: 
RunnerClient: 
kafkatest.tests.client.consumer_test.OffsetValidationTest.test_consumer_bounce.clean_shutdown=False.bounce_mode=rolling:
 Data: None
{noformat}

Logs for this run are at 
http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2017-02-02--001.1486027182--apache--trunk--3d9f34d/OffsetValidationTest/test_consumer_bounce/clean_shutdown=False.bounce_mode=rolling/46.tgz

> OffsetValidationTest fails validation with "Current position greater than the 
> total number of consumed records"
> ---
>
> Key: KAFKA-4689
> URL: https://issues.apache.org/jira/browse/KAFKA-4689
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Reporter: Ewen Cheslack-Postava
>Assignee: Jason Gustafson
>  Labels: system-test-failure
>
> {quote}
> 
> test_id:
> kafkatest.tests.client.consumer_test.OffsetValidationTest.test_consumer_bounce.clean_shutdown=False.bounce_mode=all
> status: FAIL
> run time:   1 minute 49.834 seconds
> Current position greater than the total number of consumed records
> Traceback (most recent call last):
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 123, in run
> data = self.run_test()
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 176, in run_test
> return self.test_context.function(self.test)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/mark/_mark.py",
>  line 321, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/tests/client/consumer_test.py",
>  line 157, in test_consumer_bounce
> "Current position greater than the total number of consumed records"
> AssertionError: Current position greater than the total number of consumed 
> records
> {quote}
> See also 
> https://issues.apache.org/jira/browse/KAFKA-3513?focusedCommentId=15791790=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15791790
>  which is another instance of this bug, which indicates the issue goes back 
> at least as far as 1/17/2017. Note that I don't think we've seen this in 
> 0.10.1 yet.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging

2017-02-02 Thread Jason Gustafson
One more:

1. I did some benchmarking and CRC32C seems to be a massive win when using
> the hardware instruction (particularly for messages larger than 65k), so
> I'm keen on taking advantage of the message format version bump to add
> support for it. I can write a separate KIP for this as it's not tied to
> Exactly-once, but it would be good to include the code change in the same
> PR that bumps the message format version. The benchmark and results can be
> found in the following link:
> https://gist.github.com/ijuma/e58ad79d489cb831b290e83b46a7d7bb.


Yeah, makes sense. We can add this to this KIP or do it separately,
whichever you prefer. I have also been very interested in removing the
individual message CRCs. The main reason we haven't done so is because some
auditing applications depend on them, but there are cases where it's
already unsafe to depend on the message CRCs not changing on the broker
(message conversion and the use of log append time can both result in new
message-level crcs). So I'm wondering a bit about the use cases that
require the message CRCs and how they handle this. Perhaps if it is not
dependable anyway, we can remove it and safe some space and computation.

-Jason


On Thu, Feb 2, 2017 at 10:28 AM, Jason Gustafson  wrote:

> Hey Ismael,
>
> 2. The message timestamp field is 8 bytes. Did we consider storing the
>> first timestamp in the message set and then storing deltas using varints
>> in
>> the messages like we do for offsets (the difference would be the usage of
>> signed varints)? It seems like the deltas would be quite a bit smaller in
>> the common case (potentially 0 for log append time, so we could even not
>> store them at all using attributes like we do for key/value lengths). An
>> alternative is using MaxTimestamp that is already present in the message
>> set and computing deltas from that, but that seems more complicated. In
>> any
>> case, details aside, was this idea considered and rejected or is it worth
>> exploring further?
>
>
> Took me a while to remember why we didn't do this. The timestamp that is
> included at the message set level is the max timestamp of all messages in
> the message set as is the case in the current message format (I will update
> the document to make this explicit). We could make the message timestamps
> relative to the max timestamp, but that makes serialization a bit awkward
> since the timestamps are not assumed to be increasing sequentially or
> monotonically. Once the messages in the message set had been determined, we
> would need to go back and adjust the relative timestamps.
>
> Here's one idea. We let the timestamps in the messages be varints, but we
> make their values be relative to the timestamp of the previous message,
> with the timestamp of the first message being absolute. For example, if we
> had timestamps 500, 501, 499, then we would write 500 for the first
> message, 1 for the next, and -2 for the final message. Would that work? Let
> me think a bit about it and see if there are any problems.
>
> -Jason
>
> On Thu, Feb 2, 2017 at 9:04 AM, Apurva Mehta  wrote:
>
>> Good point Tom. We will update the KIP with the ACLs section and also the
>> message format changes.
>>
>> > On Feb 2, 2017, at 06:45, Tom Crayford  wrote:
>> >
>> > I said this in the voting thread, but can the authors include a section
>> > about new ACLs if there are going to be ACLs for TransactionalId. It's
>> > mentioned in the google doc, but I think new ACLs should be in a KIP
>> > directly.
>> >
>> >> On Thu, Feb 2, 2017 at 2:42 PM, Ismael Juma  wrote:
>> >>
>> >> Thanks for the responses and updates to the document, Guozhang and
>> Jason.
>> >> They look good. One follow-up and one additional comment:
>> >>
>> >> 1. I did some benchmarking and CRC32C seems to be a massive win when
>> using
>> >> the hardware instruction (particularly for messages larger than 65k),
>> so
>> >> I'm keen on taking advantage of the message format version bump to add
>> >> support for it. I can write a separate KIP for this as it's not tied to
>> >> Exactly-once, but it would be good to include the code change in the
>> same
>> >> PR that bumps the message format version. The benchmark and results
>> can be
>> >> found in the following link:
>> >> https://gist.github.com/ijuma/e58ad79d489cb831b290e83b46a7d7bb.
>> >>
>> >> 2. The message timestamp field is 8 bytes. Did we consider storing the
>> >> first timestamp in the message set and then storing deltas using
>> varints in
>> >> the messages like we do for offsets (the difference would be the usage
>> of
>> >> signed varints)? It seems like the deltas would be quite a bit smaller
>> in
>> >> the common case (potentially 0 for log append time, so we could even
>> not
>> >> store them at all using attributes like we do for key/value lengths).
>> An
>> >> alternative is using MaxTimestamp that is already present in the
>> message
>> >> 

[jira] [Commented] (KAFKA-4722) Add application.id to StreamThread name

2017-02-02 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15850331#comment-15850331
 ] 

Matthias J. Sax commented on KAFKA-4722:


Thanks for the hint. Removing the duplicate app-Id from the {{threadClientId}} 
in {{StreamThread}} constructor should be easy.

> Add application.id to StreamThread name
> ---
>
> Key: KAFKA-4722
> URL: https://issues.apache.org/jira/browse/KAFKA-4722
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.1.1
>Reporter: Steven Schlansker
>Assignee: Sharad
>Priority: Minor
>  Labels: beginner, easyfix, newbie
>
> StreamThread currently sets its name thusly:
> {code}
> super("StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
> {code}
> If you have multiple {{KafkaStreams}} instance within a single application, 
> it would help to add the application ID to {{StreamThread}} name to identify 
> which thread belong to what {{KafkaStreams}} instance.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4725) Kafka broker fails due to OOM when producer exceeds throttling quota for extended periods of time

2017-02-02 Thread Jeff Chao (JIRA)
Jeff Chao created KAFKA-4725:


 Summary: Kafka broker fails due to OOM when producer exceeds 
throttling quota for extended periods of time
 Key: KAFKA-4725
 URL: https://issues.apache.org/jira/browse/KAFKA-4725
 Project: Kafka
  Issue Type: Bug
  Components: core, producer 
Affects Versions: 0.10.1.1
 Environment: Ubuntu Trusty (14.04.5), Oracle JDK 8
Reporter: Jeff Chao
 Attachments: oom-references.png

Steps to Reproduce:

1. Create a non-compacted topic with 1 partition
2. Set a produce quota of 512 KB/s
3. Send messages at 20 MB/s
4. Observe heap memory growth as time progresses

Investigation:

While running performance tests with a user configured with a produce quota, we 
found that the lead broker serving the requests would exhaust heap memory if 
the producer sustained a inbound request throughput greater than the produce 
quota. 

Upon further investigation, we took a heap dump from that broker process and 
discovered the ThrottledResponse object has a indirect reference to the byte[] 
holding the messages associated with the ProduceRequest. 

We're happy contributing a patch but in the meantime wanted to first raise the 
issue and get feedback from the community.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[DISCUSS] KIP-117: Add a public AdministrativeClient API for Kafka admin operations

2017-02-02 Thread Colin McCabe
Hi all,

I wrote up a Kafka improvement proposal for adding an
AdministrativeClient interface.  This is a continuation of the work on
KIP-4 towards centralized administrative operations.  Please check out
https://cwiki.apache.org/confluence/display/KAFKA/KIP-117%3A+Add+a+public+AdministrativeClient+API+for+Kafka+admin+operations

regards,
Colin


[jira] [Commented] (KAFKA-4719) Timed out waiting for consumption in OffsetValidationTest.test_broker_failure

2017-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15850307#comment-15850307
 ] 

ASF GitHub Bot commented on KAFKA-4719:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2479


> Timed out waiting for consumption in OffsetValidationTest.test_broker_failure
> -
>
> Key: KAFKA-4719
> URL: https://issues.apache.org/jira/browse/KAFKA-4719
> Project: Kafka
>  Issue Type: Bug
>  Components: system tests
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
> Fix For: 0.10.2.0, 0.10.3.0
>
>
> {code}
> 
> test_id: 
> kafkatest.tests.client.consumer_test.OffsetValidationTest.test_broker_failure.clean_shutdown=True.enable_autocommit=False
> status: FAIL
> run time: 1 minute 17.051 seconds
> Timed out waiting for consumption
> Traceback (most recent call last):
> File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 123, in run
> data = self.run_test()
> File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 176, in run_test
> return self.test_context.function(self.test)
> File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/mark/_mark.py",
>  line 321, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
> File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/tests/kafkatest/tests/client/consumer_test.py",
>  line 221, in test_broker_failure
> self.await_consumed_messages(consumer, min_messages=1000)
> File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/tests/kafkatest/tests/verifiable_consumer_test.py",
>  line 74, in await_consumed_messages
> err_msg="Timed out waiting for consumption")
> File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/utils/util.py",
>  line 36, in wait_until
> raise TimeoutError(err_msg)
> TimeoutError: Timed out waiting for consumption
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (KAFKA-4719) Timed out waiting for consumption in OffsetValidationTest.test_broker_failure

2017-02-02 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava resolved KAFKA-4719.
--
   Resolution: Fixed
Fix Version/s: 0.10.2.0

Issue resolved by pull request 2479
[https://github.com/apache/kafka/pull/2479]

> Timed out waiting for consumption in OffsetValidationTest.test_broker_failure
> -
>
> Key: KAFKA-4719
> URL: https://issues.apache.org/jira/browse/KAFKA-4719
> Project: Kafka
>  Issue Type: Bug
>  Components: system tests
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
> Fix For: 0.10.3.0, 0.10.2.0
>
>
> {code}
> 
> test_id: 
> kafkatest.tests.client.consumer_test.OffsetValidationTest.test_broker_failure.clean_shutdown=True.enable_autocommit=False
> status: FAIL
> run time: 1 minute 17.051 seconds
> Timed out waiting for consumption
> Traceback (most recent call last):
> File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 123, in run
> data = self.run_test()
> File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 176, in run_test
> return self.test_context.function(self.test)
> File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/mark/_mark.py",
>  line 321, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
> File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/tests/kafkatest/tests/client/consumer_test.py",
>  line 221, in test_broker_failure
> self.await_consumed_messages(consumer, min_messages=1000)
> File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/tests/kafkatest/tests/verifiable_consumer_test.py",
>  line 74, in await_consumed_messages
> err_msg="Timed out waiting for consumption")
> File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/utils/util.py",
>  line 36, in wait_until
> raise TimeoutError(err_msg)
> TimeoutError: Timed out waiting for consumption
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2479: KAFKA-4719: Consumption timeout should take into a...

2017-02-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2479


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3896) Unstable test KStreamRepartitionJoinTest.shouldCorrectlyRepartitionOnJoinOperations

2017-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15850287#comment-15850287
 ] 

ASF GitHub Bot commented on KAFKA-3896:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2485


> Unstable test 
> KStreamRepartitionJoinTest.shouldCorrectlyRepartitionOnJoinOperations
> ---
>
> Key: KAFKA-3896
> URL: https://issues.apache.org/jira/browse/KAFKA-3896
> Project: Kafka
>  Issue Type: Sub-task
>  Components: unit tests
>Reporter: Ashish Singh
>Assignee: Guozhang Wang
> Fix For: 0.10.2.0, 0.10.3.0
>
>
> {{KStreamRepartitionJoinTest.shouldCorrectlyRepartitionOnJoinOperations}} 
> seems to be unstable. A failure can be found 
> [here|https://builds.apache.org/job/kafka-trunk-git-pr-jdk7/4363/]. Could not 
> reproduce the test failure locally though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2482: MINOR: Update Streams docs: quickstart and concept...

2017-02-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2482


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #2485: KAFKA-3896: Fix KStream-KStream leftJoin in Repart...

2017-02-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2485


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4722) Add application.id to StreamThread name

2017-02-02 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15850282#comment-15850282
 ] 

Guozhang Wang commented on KAFKA-4722:
--

One side note is that we have seen some scenarios where metrics name is 
exceeding some framework's limit (256 bytes, e.g.) because of the long 
client-id, which is including the thread-id:

{code}
threadClientId = clientId + "-" + threadName;
{code}

adding application id to thread name may likely leave the threadClientId to 
have duplicated app.id, since clientId may contain the app.id as well, and 
appid itself can be long if it includes the UUID.

So I'd suggest we think through those scenarios and see if there is a better 
general solution to maintain uniqueness while not making the threadClientId 
itself to be possibly extremely long.

> Add application.id to StreamThread name
> ---
>
> Key: KAFKA-4722
> URL: https://issues.apache.org/jira/browse/KAFKA-4722
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.1.1
>Reporter: Steven Schlansker
>Assignee: Sharad
>Priority: Minor
>  Labels: beginner, easyfix, newbie
>
> StreamThread currently sets its name thusly:
> {code}
> super("StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
> {code}
> If you have multiple {{KafkaStreams}} instance within a single application, 
> it would help to add the application ID to {{StreamThread}} name to identify 
> which thread belong to what {{KafkaStreams}} instance.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging

2017-02-02 Thread Jason Gustafson
Hey Ismael,

2. The message timestamp field is 8 bytes. Did we consider storing the
> first timestamp in the message set and then storing deltas using varints in
> the messages like we do for offsets (the difference would be the usage of
> signed varints)? It seems like the deltas would be quite a bit smaller in
> the common case (potentially 0 for log append time, so we could even not
> store them at all using attributes like we do for key/value lengths). An
> alternative is using MaxTimestamp that is already present in the message
> set and computing deltas from that, but that seems more complicated. In any
> case, details aside, was this idea considered and rejected or is it worth
> exploring further?


Took me a while to remember why we didn't do this. The timestamp that is
included at the message set level is the max timestamp of all messages in
the message set as is the case in the current message format (I will update
the document to make this explicit). We could make the message timestamps
relative to the max timestamp, but that makes serialization a bit awkward
since the timestamps are not assumed to be increasing sequentially or
monotonically. Once the messages in the message set had been determined, we
would need to go back and adjust the relative timestamps.

Here's one idea. We let the timestamps in the messages be varints, but we
make their values be relative to the timestamp of the previous message,
with the timestamp of the first message being absolute. For example, if we
had timestamps 500, 501, 499, then we would write 500 for the first
message, 1 for the next, and -2 for the final message. Would that work? Let
me think a bit about it and see if there are any problems.

-Jason

On Thu, Feb 2, 2017 at 9:04 AM, Apurva Mehta  wrote:

> Good point Tom. We will update the KIP with the ACLs section and also the
> message format changes.
>
> > On Feb 2, 2017, at 06:45, Tom Crayford  wrote:
> >
> > I said this in the voting thread, but can the authors include a section
> > about new ACLs if there are going to be ACLs for TransactionalId. It's
> > mentioned in the google doc, but I think new ACLs should be in a KIP
> > directly.
> >
> >> On Thu, Feb 2, 2017 at 2:42 PM, Ismael Juma  wrote:
> >>
> >> Thanks for the responses and updates to the document, Guozhang and
> Jason.
> >> They look good. One follow-up and one additional comment:
> >>
> >> 1. I did some benchmarking and CRC32C seems to be a massive win when
> using
> >> the hardware instruction (particularly for messages larger than 65k), so
> >> I'm keen on taking advantage of the message format version bump to add
> >> support for it. I can write a separate KIP for this as it's not tied to
> >> Exactly-once, but it would be good to include the code change in the
> same
> >> PR that bumps the message format version. The benchmark and results can
> be
> >> found in the following link:
> >> https://gist.github.com/ijuma/e58ad79d489cb831b290e83b46a7d7bb.
> >>
> >> 2. The message timestamp field is 8 bytes. Did we consider storing the
> >> first timestamp in the message set and then storing deltas using
> varints in
> >> the messages like we do for offsets (the difference would be the usage
> of
> >> signed varints)? It seems like the deltas would be quite a bit smaller
> in
> >> the common case (potentially 0 for log append time, so we could even not
> >> store them at all using attributes like we do for key/value lengths). An
> >> alternative is using MaxTimestamp that is already present in the message
> >> set and computing deltas from that, but that seems more complicated. In
> any
> >> case, details aside, was this idea considered and rejected or is it
> worth
> >> exploring further?
> >>
> >> Ismael
> >>
> >> On Thu, Feb 2, 2017 at 1:02 AM, Jason Gustafson 
> >> wrote:
> >>
> >>> Ismael,
> >>>
> >>> Thanks for the comments. A few responses below:
> >>>
> >>>
>  2. `ProducerAppId` is a new authorization resource type. This
> >> introduces
> >>> a
>  compatibility issue with regards to existing third-party authorizers.
> >> It
>  would be good to highlight this in the migration/compatibility
> section.
> >>>
> >>>
> >>> Ack. I added a note in the migration section.
> >>>
> >>> 4. The Migration plan is relatively brief at the moment. Have we
> >>> considered
>  if there's any additional work required due to KIP-97 (introduced in
>  0.10.2.0)?
> >>>
> >>>
> >>> Thanks, I added a few notes about client compatibility to the migration
> >>> section. I covered the main issues that come to mind, but let me know
> if
> >>> you think of others.
> >>>
> >>> 7. It seems like there is a bit of inconsistency when it comes to
> naming
>  convention. For example, we have `InitPIDRequest`, `PidSnapshot`
>  and `InvalidPidMapping`. The latter two match Kafka's naming
> >> conventions.
>  There are a few other examples like that and it would be 

Kafka KIP meeting Feb 7 at 11:00am PST

2017-02-02 Thread Jun Rao
Hi, Everyone,

We plan to have a Kafka KIP meeting this coming Tuesday at 11:00am PST. If
you plan to attend but haven't received an invite, please let me know. The
following is the tentative agenda.

Agenda:
KIP-112: Handle disk failure for JBOD
KIP-113: Support replicas movement between log directories

Thanks,

Jun


[jira] [Assigned] (KAFKA-4722) Add application.id to StreamThread name

2017-02-02 Thread Sharad (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sharad reassigned KAFKA-4722:
-

Assignee: Sharad

> Add application.id to StreamThread name
> ---
>
> Key: KAFKA-4722
> URL: https://issues.apache.org/jira/browse/KAFKA-4722
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.1.1
>Reporter: Steven Schlansker
>Assignee: Sharad
>Priority: Minor
>  Labels: beginner, easyfix, newbie
>
> StreamThread currently sets its name thusly:
> {code}
> super("StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
> {code}
> If you have multiple {{KafkaStreams}} instance within a single application, 
> it would help to add the application ID to {{StreamThread}} name to identify 
> which thread belong to what {{KafkaStreams}} instance.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4722) Add application.id to StreamThread name

2017-02-02 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4722:
---
Issue Type: Improvement  (was: New Feature)

> Add application.id to StreamThread name
> ---
>
> Key: KAFKA-4722
> URL: https://issues.apache.org/jira/browse/KAFKA-4722
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.1.1
>Reporter: Steven Schlansker
>Priority: Minor
>  Labels: beginner, easyfix, newbie
>
> StreamThread currently sets its name thusly:
> {code}
> super("StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
> {code}
> If you have multiple {{KafkaStreams}} instance within a single application, 
> it would help to add the application ID to StreamThread thread name to 
> identify which thread belong to what KafkaStreams instance.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4722) Add application.id to StreamThread name

2017-02-02 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4722:
---
Priority: Minor  (was: Major)

> Add application.id to StreamThread name
> ---
>
> Key: KAFKA-4722
> URL: https://issues.apache.org/jira/browse/KAFKA-4722
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.10.1.1
>Reporter: Steven Schlansker
>Priority: Minor
>  Labels: beginner, easyfix, newbie
>
> StreamThread currently sets its name thusly:
> {code}
> super("StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
> {code}
> If you have multiple {{KafkaStreams}} instance within a single application, 
> it would help to add the application ID to StreamThread thread name to 
> identify which thread belong to what KafkaStreams instance.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4722) Add application.id to StreamThread name

2017-02-02 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4722:
---
Description: 
StreamThread currently sets its name thusly:

{code}
super("StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
{code}

If you have multiple {{KafkaStreams}} instance within a single application, it 
would help to add the application ID to {{StreamThread}'s name to identify 
which thread belong to what {{KafkaStreams}} instance.

  was:
StreamThread currently sets its name thusly:

{code}
super("StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
{code}

If you have multiple {{KafkaStreams}} instance within a single application, it 
would help to add the application ID to StreamThread thread name to identify 
which thread belong to what KafkaStreams instance.


> Add application.id to StreamThread name
> ---
>
> Key: KAFKA-4722
> URL: https://issues.apache.org/jira/browse/KAFKA-4722
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.1.1
>Reporter: Steven Schlansker
>Priority: Minor
>  Labels: beginner, easyfix, newbie
>
> StreamThread currently sets its name thusly:
> {code}
> super("StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
> {code}
> If you have multiple {{KafkaStreams}} instance within a single application, 
> it would help to add the application ID to {{StreamThread}'s name to identify 
> which thread belong to what {{KafkaStreams}} instance.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4722) Add application.id to StreamThread name

2017-02-02 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4722:
---
Description: 
StreamThread currently sets its name thusly:

{code}
super("StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
{code}

If you have multiple {{KafkaStreams}} instance within a single application, it 
would help to add the application ID to StreamThread thread name to identify 
which thread belong to what KafkaStreams instance.

  was:
StreamThread currently sets its name thusly:

{code}
super("StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
{code}

When you have multiple KStream / KTables in an application, it would be nice to 
customize the "StreamThread" prefix.  The id is a good start but a 
human-recognizable name would make logs much easier to read.


> Add application.id to StreamThread name
> ---
>
> Key: KAFKA-4722
> URL: https://issues.apache.org/jira/browse/KAFKA-4722
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.10.1.1
>Reporter: Steven Schlansker
>  Labels: beginner, easyfix, newbie
>
> StreamThread currently sets its name thusly:
> {code}
> super("StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
> {code}
> If you have multiple {{KafkaStreams}} instance within a single application, 
> it would help to add the application ID to StreamThread thread name to 
> identify which thread belong to what KafkaStreams instance.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [VOTE] KIP-48 Support for delegation tokens as an authentication mechanism

2017-02-02 Thread Jun Rao
Hi, Mani,

Sorry for the late response. A couple of more comments below.

> 107.4 How is token deletion handled? Does every broker delete the same
> > token independently or only one broker does the deletion?
> >
>
> Only one broker does the deletion. Broker updates the expiration in its
> local cache
> and on zookeeper so other brokers also get notified and their cache
> statuses are updated as well.
>
>
Which broker does the deletion?


110. The diagrams in the wiki still show MD5 digest. Could you change it to
SCRAM?

Thanks,

Jun



>
> Thanks.
> Manikumar
>
>
> >
> > On Fri, Dec 23, 2016 at 9:26 AM, Manikumar 
> > wrote:
> >
> > > Hi,
> > >
> > > I would like to initiate the vote on KIP-48:
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-48+
> > > Delegation+token+support+for+Kafka
> > >
> > >
> > > Thanks,
> > > Manikumar
> > >
> >
>


[jira] [Updated] (KAFKA-4722) Add application.id to StreamThread name

2017-02-02 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4722:
---
Summary: Add application.id to StreamThread name  (was: StreamThread should 
allow customization of thread prefix)

> Add application.id to StreamThread name
> ---
>
> Key: KAFKA-4722
> URL: https://issues.apache.org/jira/browse/KAFKA-4722
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.10.1.1
>Reporter: Steven Schlansker
>
> StreamThread currently sets its name thusly:
> {code}
> super("StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
> {code}
> When you have multiple KStream / KTables in an application, it would be nice 
> to customize the "StreamThread" prefix.  The id is a good start but a 
> human-recognizable name would make logs much easier to read.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4722) Add application.id to StreamThread name

2017-02-02 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4722:
---
Description: 
StreamThread currently sets its name thusly:

{code}
super("StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
{code}

If you have multiple {{KafkaStreams}} instance within a single application, it 
would help to add the application ID to {{StreamThread}} name to identify which 
thread belong to what {{KafkaStreams}} instance.

  was:
StreamThread currently sets its name thusly:

{code}
super("StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
{code}

If you have multiple {{KafkaStreams}} instance within a single application, it 
would help to add the application ID to {{StreamThread}'s name to identify 
which thread belong to what {{KafkaStreams}} instance.


> Add application.id to StreamThread name
> ---
>
> Key: KAFKA-4722
> URL: https://issues.apache.org/jira/browse/KAFKA-4722
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.1.1
>Reporter: Steven Schlansker
>Priority: Minor
>  Labels: beginner, easyfix, newbie
>
> StreamThread currently sets its name thusly:
> {code}
> super("StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
> {code}
> If you have multiple {{KafkaStreams}} instance within a single application, 
> it would help to add the application ID to {{StreamThread}} name to identify 
> which thread belong to what {{KafkaStreams}} instance.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4722) Add application.id to StreamThread name

2017-02-02 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4722:
---
Labels: beginner easyfix newbie  (was: )

> Add application.id to StreamThread name
> ---
>
> Key: KAFKA-4722
> URL: https://issues.apache.org/jira/browse/KAFKA-4722
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.10.1.1
>Reporter: Steven Schlansker
>  Labels: beginner, easyfix, newbie
>
> StreamThread currently sets its name thusly:
> {code}
> super("StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
> {code}
> If you have multiple {{KafkaStreams}} instance within a single application, 
> it would help to add the application ID to StreamThread thread name to 
> identify which thread belong to what KafkaStreams instance.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4722) StreamThread should allow customization of thread prefix

2017-02-02 Thread Steven Schlansker (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15850243#comment-15850243
 ] 

Steven Schlansker commented on KAFKA-4722:
--

That would be perfect, thanks :)

> StreamThread should allow customization of thread prefix
> 
>
> Key: KAFKA-4722
> URL: https://issues.apache.org/jira/browse/KAFKA-4722
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.10.1.1
>Reporter: Steven Schlansker
>
> StreamThread currently sets its name thusly:
> {code}
> super("StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
> {code}
> When you have multiple KStream / KTables in an application, it would be nice 
> to customize the "StreamThread" prefix.  The id is a good start but a 
> human-recognizable name would make logs much easier to read.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [VOTE] KIP-98: Exactly Once Delivery and Transactional Messaging

2017-02-02 Thread Jason Gustafson
Thanks Tom, we'll update the wiki to reflect all the movement on the design
document. Did you have a specific concern with the new ACLs?

-Jason

On Thu, Feb 2, 2017 at 6:49 AM, Ismael Juma  wrote:

> Hi Tom,
>
> That is a good point. During the discussion, it was agreed that changes to
> public interfaces (message format, protocol, ACLs, etc.) would be copied to
> the wiki once the things had settled down, but it looks like that hasn't
> been done yet. I agree that it makes sense to do it before people vote on
> it.
>
> Ismael
>
> On Thu, Feb 2, 2017 at 2:42 PM, Tom Crayford  wrote:
>
> > -1 (non-binding)
> >
> > I've been slow at keeping up with the KIP and the discussion thread. This
> > is an exciting and quite complex new feature, which provides great new
> > functionality.
> >
> > There's a thing I noticed missing from the KIP that's present in the
> google
> > doc - the doc talks about ACLs for TransactionalId. If those are going to
> > land with the KIP, I think they should be included in the KIP itself, as
> > new ACLs are significant security changes.
> >
> > On Thu, Feb 2, 2017 at 10:04 AM, Eno Thereska 
> > wrote:
> >
> > > +1 (non-binding).
> > >
> > > Excellent work and discussions!
> > >
> > > Eno
> > > > On 2 Feb 2017, at 04:13, Guozhang Wang  wrote:
> > > >
> > > > Hi all,
> > > >
> > > > We would like to start the voting process for KIP-98. The KIP can be
> > > found
> > > > at
> > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> > > >
> > > > Discussion thread can be found here:
> > > >
> > > > http://search-hadoop.com/m/Kafka/uyzND1jwZrr7HRHf?subj=+
> > > DISCUSS+KIP+98+Exactly+Once+Delivery+and+Transactional+Messaging
> > > >
> > > > Thanks,
> > > >
> > > > --
> > > > -- Guozhang
> > >
> > >
> >
>


[jira] [Resolved] (KAFKA-4182) Move the change logger out of RocksDB stores

2017-02-02 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy resolved KAFKA-4182.
---
   Resolution: Fixed
Fix Version/s: 0.10.2.0

Fixed by: https://github.com/apache/kafka/pull/2360

> Move the change logger out of RocksDB stores
> 
>
> Key: KAFKA-4182
> URL: https://issues.apache.org/jira/browse/KAFKA-4182
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Damian Guy
>  Labels: performance
> Fix For: 0.10.2.0
>
>
> We currently have the change logger embedded within the RocksDB store 
> implementations, however this results in multiple implementations of the same 
> thing and bad separation of concerns. We should create new LoggedStore that 
> wraps the outer most store when logging is enabled, for example:
> loggedStore -> cachingStore -> meteredStore -> innerStore



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging

2017-02-02 Thread Apurva Mehta
Good point Tom. We will update the KIP with the ACLs section and also the 
message format changes. 

> On Feb 2, 2017, at 06:45, Tom Crayford  wrote:
> 
> I said this in the voting thread, but can the authors include a section
> about new ACLs if there are going to be ACLs for TransactionalId. It's
> mentioned in the google doc, but I think new ACLs should be in a KIP
> directly.
> 
>> On Thu, Feb 2, 2017 at 2:42 PM, Ismael Juma  wrote:
>> 
>> Thanks for the responses and updates to the document, Guozhang and Jason.
>> They look good. One follow-up and one additional comment:
>> 
>> 1. I did some benchmarking and CRC32C seems to be a massive win when using
>> the hardware instruction (particularly for messages larger than 65k), so
>> I'm keen on taking advantage of the message format version bump to add
>> support for it. I can write a separate KIP for this as it's not tied to
>> Exactly-once, but it would be good to include the code change in the same
>> PR that bumps the message format version. The benchmark and results can be
>> found in the following link:
>> https://gist.github.com/ijuma/e58ad79d489cb831b290e83b46a7d7bb.
>> 
>> 2. The message timestamp field is 8 bytes. Did we consider storing the
>> first timestamp in the message set and then storing deltas using varints in
>> the messages like we do for offsets (the difference would be the usage of
>> signed varints)? It seems like the deltas would be quite a bit smaller in
>> the common case (potentially 0 for log append time, so we could even not
>> store them at all using attributes like we do for key/value lengths). An
>> alternative is using MaxTimestamp that is already present in the message
>> set and computing deltas from that, but that seems more complicated. In any
>> case, details aside, was this idea considered and rejected or is it worth
>> exploring further?
>> 
>> Ismael
>> 
>> On Thu, Feb 2, 2017 at 1:02 AM, Jason Gustafson 
>> wrote:
>> 
>>> Ismael,
>>> 
>>> Thanks for the comments. A few responses below:
>>> 
>>> 
 2. `ProducerAppId` is a new authorization resource type. This
>> introduces
>>> a
 compatibility issue with regards to existing third-party authorizers.
>> It
 would be good to highlight this in the migration/compatibility section.
>>> 
>>> 
>>> Ack. I added a note in the migration section.
>>> 
>>> 4. The Migration plan is relatively brief at the moment. Have we
>>> considered
 if there's any additional work required due to KIP-97 (introduced in
 0.10.2.0)?
>>> 
>>> 
>>> Thanks, I added a few notes about client compatibility to the migration
>>> section. I covered the main issues that come to mind, but let me know if
>>> you think of others.
>>> 
>>> 7. It seems like there is a bit of inconsistency when it comes to naming
 convention. For example, we have `InitPIDRequest`, `PidSnapshot`
 and `InvalidPidMapping`. The latter two match Kafka's naming
>> conventions.
 There are a few other examples like that and it would be good to clean
>>> them
 up.
>>> 
>>> 
>>> Let's go with InitPidRequest for consistency.  Haha, "InitPIdRequest"
>> seems
>>> like a compromise which satisfies no one.
>>> 
>>> 
>>> -Jason
>>> 
>>> On Wed, Feb 1, 2017 at 4:14 PM, Guozhang Wang 
>> wrote:
>>> 
 Ismael, thanks for your feedbacks. Replied inline.
 
> On Wed, Feb 1, 2017 at 8:03 AM, Ismael Juma  wrote:
> 
> Hi all,
> 
> A few comments follow:
> 
> 1. The document states "inter-broker communications will be increased
>>> by
 M
> * N * P round trips per sec. We need to conduct some system
>> performance
> test to make sure this additional inter-broker traffic would not
>>> largely
> impact the broker cluster". Has this testing been done? And if not,
>> are
 we
> planning to do it soon? It seems important to validate this sooner
>>> rather
> than later. This applies more generally too, it would be great to
> understand how the new message format affects the producer with small
> messages, for example.
> 
> 
 Yes we are conducting the perf tests with the message format changes in
>>> the
 first stage; then the inter-broker communication with minimal
>> transaction
 coordinator implementations in the second stage.
 
 
> 2. `ProducerAppId` is a new authorization resource type. This
>>> introduces
 a
> compatibility issue with regards to existing third-party authorizers.
>>> It
> would be good to highlight this in the migration/compatibility
>> section.
> 
> 3. I was happy to see that default values for the new configs have
>> been
> added to the document since I last checked it. It would be good to
 explain
> the motivation for the choices.
> 
> 
 Updated doc.
 
 
> 4. The Migration plan is relatively brief at the moment. Have we
 considered
> if 

[jira] [Commented] (KAFKA-4723) offsets.storage=kafka - groups stuck in rebalancing with committed offsets

2017-02-02 Thread Jason Bew (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15850173#comment-15850173
 ] 

Jason Bew commented on KAFKA-4723:
--

On further examination it does in deed look as though the brokers are caught in 
a confused state. Some offsets are correctly stored in Kafka, but some are 
still stored in Zookeeper despite doing a rolling restart to store in both 
zookeeper and kafka, and then just to kafka.  

Not sure if this is a bug or some other problem has arisen.

> offsets.storage=kafka - groups stuck in rebalancing with committed offsets
> --
>
> Key: KAFKA-4723
> URL: https://issues.apache.org/jira/browse/KAFKA-4723
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.1
>Reporter: Jason Bew
>Priority: Minor
>
> Hi, I have moved offset store to kafka only, when I now run;
>  bin/kafka-consumer-groups.sh  --bootstrap-server localhost:9094 --describe  
> --new-consumer --group my_consumer_group
> I get the message;
> Consumer group `my_consumer_group` does not exist or is rebalancing.
> I have found the  issue KAFKA-3144 however this refers to consumer groups 
> that have no committed offsets, the groups I am looking do and are constantly 
> in use.
> using --list I get all my consumer groups returned. Although some are 
> inactive I have around 6 very active ones (millions of messages a day 
> constantly). looking at the mbean data and kafka tool etc I can see the lags 
> and offsets changing every second. Therefore I would expect the 
> kafka-consumer-groups.sh script to return the lags and offsets for all 6 
> active consumer groups.
> I think what has happened is when I moved offset storage to kafka from 
> zookeeper (and then disabled sending to both), something has got confused.  
> Querying zookeeper I get the offsets for the alleged missing consumer groups 
> - but they should be stored and committed to kafka.
> Many thanks.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUS] consuming messages is polling - is there a reason? new KIP for poll?

2017-02-02 Thread Ismael Juma
On Thu, Feb 2, 2017 at 4:41 PM, radai  wrote:

> also - i dont think you need to shorten fetch.max.wait.ms to get lower
> delays - you could still configure a relatively long fetch.max.wait.ms and
> have the broker answer your poll the minute _any_ messags are available.
>

Yes, `fetch.min.bytes` is 1 by default, so the broker will return data as
soon as it's available if that is not changed.

Ismael


Re: [DISCUS] consuming messages is polling - is there a reason? new KIP for poll?

2017-02-02 Thread radai
kafka relies on the underlying OS' page cache for serving "popular" data.
so "pre-assembling" push batches would move from page cache to heap
storage, which is not as appealing.
also, for trivial cases a lot of consumers read the same thing, which would
make the heap caching even worse.

also - i dont think you need to shorten fetch.max.wait.ms to get lower
delays - you could still configure a relatively long fetch.max.wait.ms and
have the broker answer your poll the minute _any_ messags are available.

On Wed, Feb 1, 2017 at 2:46 AM, Alexander Binzberger <
alexander.binzber...@wingcon.com> wrote:

> ave very few. I don't see how push would cost more CPU time or resources
> on the broker then polling with a lot of consumers very frequently.


Live-lock between consumer thread and heartbeat thread

2017-02-02 Thread Lukavský Jan
Hi all,
I have a question about a very suspicious behavior I see during
consuming messages using manual synchronous commit with Kafka 0.10.1.0.
The code looks something like this:

try (KafkaConsumer<...> consumer = ...) {
   Map commitMap =
Collections.synchronizedMap(...);
   while (!Thread.currentThread().isInterrupted()) {
 ConsumerRecords records = consumer.poll(..);
 for (...) {
   // queue records for asynchronous processing in different thread.
   // when the asynchronous processing finishes, it updates the
   // `commitMap', so it has to be synchronized somehow
 }
 synchronized (commitMap) {
   // commit if we have anything to commit
   if (!commitMap.isEmpty()) {
 consumer.commitSync(commitMap);
 commitMap.clear();
   }
 }
   }
}


Now, what time to time happens in my case is that the consumer thread is
stuck in the call to `commitSync`. By straing the PID I found out that
it periodically epolls on an *empty* list of file descriptors. By
further investigation I found out, that response to the `commitSync` is
being handled by the kafka-coordinator-heartbeat-thread, which during
handling of the response needs to access the `commitMap`, and therefore
blocks, because the lock is being held by the application main thread.
Therefore, the whole consumption stops and ends in live-lock. The
solution in my case was to clone the map and unsynchronize the call to
`commitSync` like this:

   final Map clone;
   synchronized (commitMap) {
 if (!commitMap.isEmpty()) {
   clone = new HashMap<>(commitMap);
   commitMap.clear();
 } else {
   clone = null;
 }
   }
   if (clone != null) {
 consumer.commitSync(clone);
   }

which seems to work fine. My question is whether my interpretation of
the problem is correct and if so, should be anything done to avoid this?
I see two possibilities - either the call to `commitSync` should clone
the map itself, or there should be somehow guaranteed that the same
thread that issues synchronous requests receives the response. Am I right?

Thanks for comments,
  best,
   Jan


Obsah této zprávy má výlučně komunikační charakter. Nepředstavuje návrh na 
uzavření smlouvy či na její změnu ani přijetí případného návrhu. Smlouvy či 
jejich změny jsou společností O2 Czech Republic a.s. uzavírány v písemné formě 
nebo v podobě a postupem podle příslušných všeobecných podmínek společnosti O2 
Czech Republic a.s., a pokud jsou dohodnuty všechny náležitosti. Smlouvy jsou 
uzavírány oprávněnou osobou na základě písemného pověření. Smlouvy o smlouvě 
budoucí jsou uzavírány výhradně v písemné formě, vlastnoručně podepsané nebo s 
uznávaným elektronickým podpisem. Podmínky, za nichž O2 Czech Republic a.s. 
přistupuje k jednání o smlouvě a jakými se řídí, jsou dostupné 
zde.

The content of this message is intended for communication purposes only. It 
does neither represent any contract proposal, nor its amendment or acceptance 
of any potential contract proposal. O2 Czech Republic a.s. concludes contracts 
or amendments thereto in a written form or in the form and the procedure in 
accordance with relevant general terms and conditions of O2 Czech Republic 
a.s., if all requirements are agreed. Contracts are concluded by an authorized 
person entitled on the basis of a written authorization. Contracts on a future 
contract are concluded solely in a written form, self-signed or signed by means 
of an advanced electronic signature. The conditions under which O2 Czech 
Republic a.s. negotiates contracts and under which it proceeds are available 
here.


Re: [VOTE] KIP-98: Exactly Once Delivery and Transactional Messaging

2017-02-02 Thread Ismael Juma
Hi Tom,

That is a good point. During the discussion, it was agreed that changes to
public interfaces (message format, protocol, ACLs, etc.) would be copied to
the wiki once the things had settled down, but it looks like that hasn't
been done yet. I agree that it makes sense to do it before people vote on
it.

Ismael

On Thu, Feb 2, 2017 at 2:42 PM, Tom Crayford  wrote:

> -1 (non-binding)
>
> I've been slow at keeping up with the KIP and the discussion thread. This
> is an exciting and quite complex new feature, which provides great new
> functionality.
>
> There's a thing I noticed missing from the KIP that's present in the google
> doc - the doc talks about ACLs for TransactionalId. If those are going to
> land with the KIP, I think they should be included in the KIP itself, as
> new ACLs are significant security changes.
>
> On Thu, Feb 2, 2017 at 10:04 AM, Eno Thereska 
> wrote:
>
> > +1 (non-binding).
> >
> > Excellent work and discussions!
> >
> > Eno
> > > On 2 Feb 2017, at 04:13, Guozhang Wang  wrote:
> > >
> > > Hi all,
> > >
> > > We would like to start the voting process for KIP-98. The KIP can be
> > found
> > > at
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> > >
> > > Discussion thread can be found here:
> > >
> > > http://search-hadoop.com/m/Kafka/uyzND1jwZrr7HRHf?subj=+
> > DISCUSS+KIP+98+Exactly+Once+Delivery+and+Transactional+Messaging
> > >
> > > Thanks,
> > >
> > > --
> > > -- Guozhang
> >
> >
>


Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging

2017-02-02 Thread Tom Crayford
I said this in the voting thread, but can the authors include a section
about new ACLs if there are going to be ACLs for TransactionalId. It's
mentioned in the google doc, but I think new ACLs should be in a KIP
directly.

On Thu, Feb 2, 2017 at 2:42 PM, Ismael Juma  wrote:

> Thanks for the responses and updates to the document, Guozhang and Jason.
> They look good. One follow-up and one additional comment:
>
> 1. I did some benchmarking and CRC32C seems to be a massive win when using
> the hardware instruction (particularly for messages larger than 65k), so
> I'm keen on taking advantage of the message format version bump to add
> support for it. I can write a separate KIP for this as it's not tied to
> Exactly-once, but it would be good to include the code change in the same
> PR that bumps the message format version. The benchmark and results can be
> found in the following link:
> https://gist.github.com/ijuma/e58ad79d489cb831b290e83b46a7d7bb.
>
> 2. The message timestamp field is 8 bytes. Did we consider storing the
> first timestamp in the message set and then storing deltas using varints in
> the messages like we do for offsets (the difference would be the usage of
> signed varints)? It seems like the deltas would be quite a bit smaller in
> the common case (potentially 0 for log append time, so we could even not
> store them at all using attributes like we do for key/value lengths). An
> alternative is using MaxTimestamp that is already present in the message
> set and computing deltas from that, but that seems more complicated. In any
> case, details aside, was this idea considered and rejected or is it worth
> exploring further?
>
> Ismael
>
> On Thu, Feb 2, 2017 at 1:02 AM, Jason Gustafson 
> wrote:
>
> > Ismael,
> >
> > Thanks for the comments. A few responses below:
> >
> >
> > > 2. `ProducerAppId` is a new authorization resource type. This
> introduces
> > a
> > > compatibility issue with regards to existing third-party authorizers.
> It
> > > would be good to highlight this in the migration/compatibility section.
> >
> >
> > Ack. I added a note in the migration section.
> >
> >  4. The Migration plan is relatively brief at the moment. Have we
> > considered
> > > if there's any additional work required due to KIP-97 (introduced in
> > > 0.10.2.0)?
> >
> >
> > Thanks, I added a few notes about client compatibility to the migration
> > section. I covered the main issues that come to mind, but let me know if
> > you think of others.
> >
> > 7. It seems like there is a bit of inconsistency when it comes to naming
> > > convention. For example, we have `InitPIDRequest`, `PidSnapshot`
> > > and `InvalidPidMapping`. The latter two match Kafka's naming
> conventions.
> > > There are a few other examples like that and it would be good to clean
> > them
> > > up.
> >
> >
> > Let's go with InitPidRequest for consistency.  Haha, "InitPIdRequest"
> seems
> > like a compromise which satisfies no one.
> >
> >
> > -Jason
> >
> > On Wed, Feb 1, 2017 at 4:14 PM, Guozhang Wang 
> wrote:
> >
> > > Ismael, thanks for your feedbacks. Replied inline.
> > >
> > > On Wed, Feb 1, 2017 at 8:03 AM, Ismael Juma  wrote:
> > >
> > > > Hi all,
> > > >
> > > > A few comments follow:
> > > >
> > > > 1. The document states "inter-broker communications will be increased
> > by
> > > M
> > > > * N * P round trips per sec. We need to conduct some system
> performance
> > > > test to make sure this additional inter-broker traffic would not
> > largely
> > > > impact the broker cluster". Has this testing been done? And if not,
> are
> > > we
> > > > planning to do it soon? It seems important to validate this sooner
> > rather
> > > > than later. This applies more generally too, it would be great to
> > > > understand how the new message format affects the producer with small
> > > > messages, for example.
> > > >
> > > >
> > > Yes we are conducting the perf tests with the message format changes in
> > the
> > > first stage; then the inter-broker communication with minimal
> transaction
> > > coordinator implementations in the second stage.
> > >
> > >
> > > > 2. `ProducerAppId` is a new authorization resource type. This
> > introduces
> > > a
> > > > compatibility issue with regards to existing third-party authorizers.
> > It
> > > > would be good to highlight this in the migration/compatibility
> section.
> > > >
> > > > 3. I was happy to see that default values for the new configs have
> been
> > > > added to the document since I last checked it. It would be good to
> > > explain
> > > > the motivation for the choices.
> > > >
> > > >
> > > Updated doc.
> > >
> > >
> > > > 4. The Migration plan is relatively brief at the moment. Have we
> > > considered
> > > > if there's any additional work required due to KIP-97 (introduced in
> > > > 0.10.2.0)?
> > > >
> > > > 5. transactional.id sounds good
> > > >
> > > > 6. Since we are keeping per message CRCs 

Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging

2017-02-02 Thread Ismael Juma
Thanks for the responses and updates to the document, Guozhang and Jason.
They look good. One follow-up and one additional comment:

1. I did some benchmarking and CRC32C seems to be a massive win when using
the hardware instruction (particularly for messages larger than 65k), so
I'm keen on taking advantage of the message format version bump to add
support for it. I can write a separate KIP for this as it's not tied to
Exactly-once, but it would be good to include the code change in the same
PR that bumps the message format version. The benchmark and results can be
found in the following link:
https://gist.github.com/ijuma/e58ad79d489cb831b290e83b46a7d7bb.

2. The message timestamp field is 8 bytes. Did we consider storing the
first timestamp in the message set and then storing deltas using varints in
the messages like we do for offsets (the difference would be the usage of
signed varints)? It seems like the deltas would be quite a bit smaller in
the common case (potentially 0 for log append time, so we could even not
store them at all using attributes like we do for key/value lengths). An
alternative is using MaxTimestamp that is already present in the message
set and computing deltas from that, but that seems more complicated. In any
case, details aside, was this idea considered and rejected or is it worth
exploring further?

Ismael

On Thu, Feb 2, 2017 at 1:02 AM, Jason Gustafson  wrote:

> Ismael,
>
> Thanks for the comments. A few responses below:
>
>
> > 2. `ProducerAppId` is a new authorization resource type. This introduces
> a
> > compatibility issue with regards to existing third-party authorizers. It
> > would be good to highlight this in the migration/compatibility section.
>
>
> Ack. I added a note in the migration section.
>
>  4. The Migration plan is relatively brief at the moment. Have we
> considered
> > if there's any additional work required due to KIP-97 (introduced in
> > 0.10.2.0)?
>
>
> Thanks, I added a few notes about client compatibility to the migration
> section. I covered the main issues that come to mind, but let me know if
> you think of others.
>
> 7. It seems like there is a bit of inconsistency when it comes to naming
> > convention. For example, we have `InitPIDRequest`, `PidSnapshot`
> > and `InvalidPidMapping`. The latter two match Kafka's naming conventions.
> > There are a few other examples like that and it would be good to clean
> them
> > up.
>
>
> Let's go with InitPidRequest for consistency.  Haha, "InitPIdRequest" seems
> like a compromise which satisfies no one.
>
>
> -Jason
>
> On Wed, Feb 1, 2017 at 4:14 PM, Guozhang Wang  wrote:
>
> > Ismael, thanks for your feedbacks. Replied inline.
> >
> > On Wed, Feb 1, 2017 at 8:03 AM, Ismael Juma  wrote:
> >
> > > Hi all,
> > >
> > > A few comments follow:
> > >
> > > 1. The document states "inter-broker communications will be increased
> by
> > M
> > > * N * P round trips per sec. We need to conduct some system performance
> > > test to make sure this additional inter-broker traffic would not
> largely
> > > impact the broker cluster". Has this testing been done? And if not, are
> > we
> > > planning to do it soon? It seems important to validate this sooner
> rather
> > > than later. This applies more generally too, it would be great to
> > > understand how the new message format affects the producer with small
> > > messages, for example.
> > >
> > >
> > Yes we are conducting the perf tests with the message format changes in
> the
> > first stage; then the inter-broker communication with minimal transaction
> > coordinator implementations in the second stage.
> >
> >
> > > 2. `ProducerAppId` is a new authorization resource type. This
> introduces
> > a
> > > compatibility issue with regards to existing third-party authorizers.
> It
> > > would be good to highlight this in the migration/compatibility section.
> > >
> > > 3. I was happy to see that default values for the new configs have been
> > > added to the document since I last checked it. It would be good to
> > explain
> > > the motivation for the choices.
> > >
> > >
> > Updated doc.
> >
> >
> > > 4. The Migration plan is relatively brief at the moment. Have we
> > considered
> > > if there's any additional work required due to KIP-97 (introduced in
> > > 0.10.2.0)?
> > >
> > > 5. transactional.id sounds good
> > >
> > > 6. Since we are keeping per message CRCs for auditing apps, have we
> > > considered mitigating the performance cost by using the more performant
> > > CRC32c in the new message format version?
> > >
> > >
> > We have not discussed about this before. But I think it should be doable
> as
> > long as we can include the additional conversion logic in the migration
> > plan.
> >
> >
> > > Nits:
> > >
> > > 7. It seems like there is a bit of inconsistency when it comes to
> naming
> > > convention. For example, we have `InitPIDRequest`, `PidSnapshot`
> > > and `InvalidPidMapping`. The 

Re: [VOTE] KIP-98: Exactly Once Delivery and Transactional Messaging

2017-02-02 Thread Tom Crayford
-1 (non-binding)

I've been slow at keeping up with the KIP and the discussion thread. This
is an exciting and quite complex new feature, which provides great new
functionality.

There's a thing I noticed missing from the KIP that's present in the google
doc - the doc talks about ACLs for TransactionalId. If those are going to
land with the KIP, I think they should be included in the KIP itself, as
new ACLs are significant security changes.

On Thu, Feb 2, 2017 at 10:04 AM, Eno Thereska 
wrote:

> +1 (non-binding).
>
> Excellent work and discussions!
>
> Eno
> > On 2 Feb 2017, at 04:13, Guozhang Wang  wrote:
> >
> > Hi all,
> >
> > We would like to start the voting process for KIP-98. The KIP can be
> found
> > at
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> >
> > Discussion thread can be found here:
> >
> > http://search-hadoop.com/m/Kafka/uyzND1jwZrr7HRHf?subj=+
> DISCUSS+KIP+98+Exactly+Once+Delivery+and+Transactional+Messaging
> >
> > Thanks,
> >
> > --
> > -- Guozhang
>
>


Re: Live-lock between consumer thread and heartbeat thread

2017-02-02 Thread Jan Lukavský

Hi Ismael,

short answer at the end of this email. :)

On 02/02/2017 02:52 PM, Ismael Juma wrote:


I hadn't quite understood this. So the asynchronous code continues to
update the map, I see.

Regarding your suggested fix, the required change is not to hold a lock to
the map passed to `commitSync`, so cloning should be enough to fix the
issue. So do I understand correctly that moving the commitSync out the
synchronized block is an optimisation (i.e. the async threads can update
the map while the commitSync is happening)?



Yes, exactly.
Best,
 Jan


Re: Live-lock between consumer thread and heartbeat thread

2017-02-02 Thread Ismael Juma
Sorry, I replied a little too fast. It is true that my original suggestion
would not work and I agree that it would be better to copy the map in
`commitSync` (that's why I suggested it in my email). Please file a JIRA.
However, that doesn't solve your immediate issue. More inline.

On Thu, Feb 2, 2017 at 12:37 PM, Jan Lukavský  wrote:

>  - if I remove the `Collections.synchronizedMap` from the `commitMap` I
> get unsynchronized map and therefore the asynchronous writes to this map
> would result in undefined state
>

Yes, I had assumed that there would be synchronization in the code that
does this, but I realise now that this assumption was incorrect.

 - if I remove the manual synchronization then there is a race condition
> between the call to `commitSync` and `clear` of the `commitMap` - some
> other thread could write to the `commitMap` between calls to `commitSync`
> and `clear` and therefore the update to the map would be lost - this is the
> same reason why I cannot use ConcurrentHashMap, because there would be no
> synchronization between commiting the map and clearing it
>

I hadn't quite understood this. So the asynchronous code continues to
update the map, I see.

Regarding your suggested fix, the required change is not to hold a lock to
the map passed to `commitSync`, so cloning should be enough to fix the
issue. So do I understand correctly that moving the commitSync out the
synchronized block is an optimisation (i.e. the async threads can update
the map while the commitSync is happening)?

Ismael


Re: Live-lock between consumer thread and heartbeat thread

2017-02-02 Thread Rajini Sivaram
I agree with Jan. KafkaConsumer should take a copy of the offsetMap, now
that there are multiple threads accessing the map. commitAsync already does
take a copy and when there was only one thread in the consumer, it was
reasonable to avoid cloning for commitSync. But now it makes sense to clone
for commitSync too.

On Thu, Feb 2, 2017 at 12:37 PM, Jan Lukavský  wrote:

> I'd disagree that I can fix the issue as you suggest, because:
>
>  - if I remove the `Collections.synchronizedMap` from the `commitMap` I
> get unsynchronized map and therefore the asynchronous writes to this map
> would result in undefined state
>
>  - if I remove the manual synchronization then there is a race condition
> between the call to `commitSync` and `clear` of the `commitMap` - some
> other thread could write to the `commitMap` between calls to `commitSync`
> and `clear` and therefore the update to the map would be lost - this is the
> same reason why I cannot use ConcurrentHashMap, because there would be no
> synchronization between commiting the map and clearing it
>
> It seems to me quite natural to clone the map in call to synchronous
> commit, if it cannot be guaranteed that synchronous responses are handled
> by the same thread that issued the request (which in my point of view would
> be the best choice, but I still don't enough understand the details of
> kafka network stack).
>
> Jan
>
>
>
> On 02/02/2017 01:25 PM, Ismael Juma wrote:
>
>> OK, you can fix this by removing `Collections.synchronizedMap` from the
>> following line or by removing the synchronized blocks.
>>
>> Map commitMap =
>> Collections.synchronizedMap(...);
>>
>> There is no reason to do manual and automatic synchronization at the same
>> time in this case. Because `Collections.synchonizedMap` uses the returned
>> map for synchronization, it means that even calling `get` on it will block
>> in this case. The consumer could copy the map to avoid this scenario as
>> the
>> heartbeat thread is meant to be an implementation detail. Jason, what do
>> you think?
>>
>> Let me know if this fixes your issue.
>>
>> Ismael
>>
>> On Thu, Feb 2, 2017 at 12:17 PM, Jan Lukavský  wrote:
>>
>> Hi Ismael,
>>>
>>> yes, no problem:
>>>
>>> The following thread is the main thread interacting with the
>>> KafkaConsumer
>>> (polling topic and committing offsets):
>>>
>>> "pool-3-thread-1" #14 prio=5 os_prio=0 tid=0x7f00f4434800 nid=0x32a9
>>> runnable [0x7f00b6662000]
>>> java.lang.Thread.State: RUNNABLE
>>>  at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>>>  at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269
>>> )
>>>  at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java
>>> :93)
>>>  at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:
>>> 86)
>>>  - locked <0x0005c0abb218> (a sun.nio.ch.Util$3)
>>>  - locked <0x0005c0abb208> (a java.util.Collections$Unmodifi
>>> ableSet)
>>>  - locked <0x0005c0abaa48> (a sun.nio.ch.EPollSelectorImpl)
>>>  at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
>>>  at org.apache.kafka.common.network.Selector.select(Selector.jav
>>> a:470)
>>>  at org.apache.kafka.common.network.Selector.poll(Selector.java:
>>> 286)
>>>  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.ja
>>> va:260)
>>>  at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
>>> lient.poll(ConsumerNetworkClient.java:232)
>>>  - locked <0x0005c0acf630> (a org.apache.kafka.clients.consu
>>> mer.internals.ConsumerNetworkClient)
>>>  at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
>>> lient.poll(ConsumerNetworkClient.java:180)
>>>  at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
>>> tor.commitOffsetsSync(ConsumerCoordinator.java:499)
>>>  at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(K
>>> afkaConsumer.java:1104)
>>>  at cz.o2..KafkaCommitLog.lambda$
>>> observePartitions$7(KafkaCommitLog.java:204)
>>>  - locked <0x0005c0612c88> (a java.util.Collections$Synchron
>>> izedMap)
>>>  at cz.o2..KafkaCommitLog$$Lambda
>>> $62/1960388071.run(Unknown
>>> Source) <- here is the synchronized block that takes monitor of the
>>> `commitMap`
>>>  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>>> Executor.java:1142)
>>>  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>>> lExecutor.java:617)
>>>  at java.lang.Thread.run(Thread.java:745)
>>>
>>> This thread just spins around in epoll returning 0. The other thread is
>>> the coordinator
>>>
>>> "kafka-coordinator-heartbeat-thread | consumer" #15 daemon prio=5
>>> os_prio=0 tid=0x7f0084067000 nid=0x32aa waiting for monitor entry
>>> [0x7f00b6361000]
>>> java.lang.Thread.State: BLOCKED (on object monitor)
>>>   

[GitHub] kafka pull request #2486: KAFKA-4724: Clean up of state directories can poss...

2017-02-02 Thread dguy
GitHub user dguy opened a pull request:

https://github.com/apache/kafka/pull/2486

KAFKA-4724: Clean up of state directories can possibly remove stores that 
are about to be used by another thread

Delay the cleanup of state directories that are not locked and not owned by 
the current thread such that we only remove the directory if its last modified 
is < now - cleanupDelayMs.
This also helps to avoid a race between threads on the same instance, where 
during rebalance, one thread releases the lock on the state directory, and 
before another thread can take the lock, the cleanup runs and removes the data.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dguy/kafka KAFKA-4724

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2486.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2486


commit 229471706dec638361777f00de195e7538a54c29
Author: Damian Guy 
Date:   2017-02-02T12:58:07Z

delay cleanup of state directories




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4724) Clean up of state directories can possibly remove stores that are about to be used by another thread

2017-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15849888#comment-15849888
 ] 

ASF GitHub Bot commented on KAFKA-4724:
---

GitHub user dguy opened a pull request:

https://github.com/apache/kafka/pull/2486

KAFKA-4724: Clean up of state directories can possibly remove stores that 
are about to be used by another thread

Delay the cleanup of state directories that are not locked and not owned by 
the current thread such that we only remove the directory if its last modified 
is < now - cleanupDelayMs.
This also helps to avoid a race between threads on the same instance, where 
during rebalance, one thread releases the lock on the state directory, and 
before another thread can take the lock, the cleanup runs and removes the data.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dguy/kafka KAFKA-4724

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2486.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2486


commit 229471706dec638361777f00de195e7538a54c29
Author: Damian Guy 
Date:   2017-02-02T12:58:07Z

delay cleanup of state directories




> Clean up of state directories can possibly remove stores that are about to be 
> used by another thread
> 
>
> Key: KAFKA-4724
> URL: https://issues.apache.org/jira/browse/KAFKA-4724
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Damian Guy
>Assignee: Damian Guy
> Fix For: 0.10.3.0
>
>
> Currently, each {{StreamThread}} cleans up unused state directories based on 
> the config {{StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG}}. 
> In situations where there are multiple threads configured this can lead to a 
> race condition where the state directory gets removed by one thread when the 
> task has been assigned (but not yet taken a lock) to another thread in the 
> process.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4724) Clean up of state directories can possibly remove stores that are about to be used by another thread

2017-02-02 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-4724:
--
Summary: Clean up of state directories can possibly remove stores that are 
about to be used by another thread  (was: Clean up of state directories has a 
race condition and can possibly remove stores that are still used)

> Clean up of state directories can possibly remove stores that are about to be 
> used by another thread
> 
>
> Key: KAFKA-4724
> URL: https://issues.apache.org/jira/browse/KAFKA-4724
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Damian Guy
>Assignee: Damian Guy
> Fix For: 0.10.3.0
>
>
> Currently, each {{StreamThread}} cleans up unused state directories based on 
> the config {{StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG}}. 
> In situations where there are multiple threads configured this can lead to a 
> race condition where the state directory gets removed by one thread when the 
> task has been assigned (but not yet taken a lock) to another thread in the 
> process.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: Live-lock between consumer thread and heartbeat thread

2017-02-02 Thread Jan Lukavský

I'd disagree that I can fix the issue as you suggest, because:

 - if I remove the `Collections.synchronizedMap` from the `commitMap` I 
get unsynchronized map and therefore the asynchronous writes to this map 
would result in undefined state


 - if I remove the manual synchronization then there is a race 
condition between the call to `commitSync` and `clear` of the 
`commitMap` - some other thread could write to the `commitMap` between 
calls to `commitSync` and `clear` and therefore the update to the map 
would be lost - this is the same reason why I cannot use 
ConcurrentHashMap, because there would be no synchronization between 
commiting the map and clearing it


It seems to me quite natural to clone the map in call to synchronous 
commit, if it cannot be guaranteed that synchronous responses are 
handled by the same thread that issued the request (which in my point of 
view would be the best choice, but I still don't enough understand the 
details of kafka network stack).


Jan


On 02/02/2017 01:25 PM, Ismael Juma wrote:

OK, you can fix this by removing `Collections.synchronizedMap` from the
following line or by removing the synchronized blocks.

Map commitMap =
Collections.synchronizedMap(...);

There is no reason to do manual and automatic synchronization at the same
time in this case. Because `Collections.synchonizedMap` uses the returned
map for synchronization, it means that even calling `get` on it will block
in this case. The consumer could copy the map to avoid this scenario as the
heartbeat thread is meant to be an implementation detail. Jason, what do
you think?

Let me know if this fixes your issue.

Ismael

On Thu, Feb 2, 2017 at 12:17 PM, Jan Lukavský  wrote:


Hi Ismael,

yes, no problem:

The following thread is the main thread interacting with the KafkaConsumer
(polling topic and committing offsets):

"pool-3-thread-1" #14 prio=5 os_prio=0 tid=0x7f00f4434800 nid=0x32a9
runnable [0x7f00b6662000]
java.lang.Thread.State: RUNNABLE
 at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
 at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
 at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java
:93)
 at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
 - locked <0x0005c0abb218> (a sun.nio.ch.Util$3)
 - locked <0x0005c0abb208> (a java.util.Collections$Unmodifi
ableSet)
 - locked <0x0005c0abaa48> (a sun.nio.ch.EPollSelectorImpl)
 at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
 at org.apache.kafka.common.network.Selector.select(Selector.jav
a:470)
 at org.apache.kafka.common.network.Selector.poll(Selector.java:
286)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.ja
va:260)
 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
lient.poll(ConsumerNetworkClient.java:232)
 - locked <0x0005c0acf630> (a org.apache.kafka.clients.consu
mer.internals.ConsumerNetworkClient)
 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
lient.poll(ConsumerNetworkClient.java:180)
 at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
tor.commitOffsetsSync(ConsumerCoordinator.java:499)
 at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(K
afkaConsumer.java:1104)
 at cz.o2..KafkaCommitLog.lambda$
observePartitions$7(KafkaCommitLog.java:204)
 - locked <0x0005c0612c88> (a java.util.Collections$Synchron
izedMap)
 at cz.o2..KafkaCommitLog$$Lambda$62/1960388071.run(Unknown
Source) <- here is the synchronized block that takes monitor of the
`commitMap`
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
Executor.java:1142)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
lExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)

This thread just spins around in epoll returning 0. The other thread is
the coordinator

"kafka-coordinator-heartbeat-thread | consumer" #15 daemon prio=5
os_prio=0 tid=0x7f0084067000 nid=0x32aa waiting for monitor entry
[0x7f00b6361000]
java.lang.Thread.State: BLOCKED (on object monitor)
 at java.util.Collections$SynchronizedMap.get(Collections.java:2
584)
 - waiting to lock <0x0005c0612c88> (a
java.util.Collections$SynchronizedMap) <- waiting for the `commitMap`,
which will never happen
 at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
tor$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:635) <-
handles response to the commitSync request
 at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
tor$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:615)
 at org.apache.kafka.clients.consumer.internals.AbstractCoordina
tor$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742)
 at 

Re: Live-lock between consumer thread and heartbeat thread

2017-02-02 Thread Ismael Juma
OK, you can fix this by removing `Collections.synchronizedMap` from the
following line or by removing the synchronized blocks.

Map commitMap =
Collections.synchronizedMap(...);

There is no reason to do manual and automatic synchronization at the same
time in this case. Because `Collections.synchonizedMap` uses the returned
map for synchronization, it means that even calling `get` on it will block
in this case. The consumer could copy the map to avoid this scenario as the
heartbeat thread is meant to be an implementation detail. Jason, what do
you think?

Let me know if this fixes your issue.

Ismael

On Thu, Feb 2, 2017 at 12:17 PM, Jan Lukavský  wrote:

> Hi Ismael,
>
> yes, no problem:
>
> The following thread is the main thread interacting with the KafkaConsumer
> (polling topic and committing offsets):
>
> "pool-3-thread-1" #14 prio=5 os_prio=0 tid=0x7f00f4434800 nid=0x32a9
> runnable [0x7f00b6662000]
>java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
> at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
> at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java
> :93)
> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
> - locked <0x0005c0abb218> (a sun.nio.ch.Util$3)
> - locked <0x0005c0abb208> (a java.util.Collections$Unmodifi
> ableSet)
> - locked <0x0005c0abaa48> (a sun.nio.ch.EPollSelectorImpl)
> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
> at org.apache.kafka.common.network.Selector.select(Selector.jav
> a:470)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:
> 286)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.ja
> va:260)
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
> lient.poll(ConsumerNetworkClient.java:232)
> - locked <0x0005c0acf630> (a org.apache.kafka.clients.consu
> mer.internals.ConsumerNetworkClient)
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
> lient.poll(ConsumerNetworkClient.java:180)
> at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
> tor.commitOffsetsSync(ConsumerCoordinator.java:499)
> at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(K
> afkaConsumer.java:1104)
> at cz.o2..KafkaCommitLog.lambda$
> observePartitions$7(KafkaCommitLog.java:204)
> - locked <0x0005c0612c88> (a java.util.Collections$Synchron
> izedMap)
> at cz.o2. hidden>.KafkaCommitLog$$Lambda$62/1960388071.run(Unknown
> Source) <- here is the synchronized block that takes monitor of the
> `commitMap`
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
> Executor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
> lExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> This thread just spins around in epoll returning 0. The other thread is
> the coordinator
>
> "kafka-coordinator-heartbeat-thread | consumer" #15 daemon prio=5
> os_prio=0 tid=0x7f0084067000 nid=0x32aa waiting for monitor entry
> [0x7f00b6361000]
>java.lang.Thread.State: BLOCKED (on object monitor)
> at java.util.Collections$SynchronizedMap.get(Collections.java:2
> 584)
> - waiting to lock <0x0005c0612c88> (a
> java.util.Collections$SynchronizedMap) <- waiting for the `commitMap`,
> which will never happen
> at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
> tor$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:635) <-
> handles response to the commitSync request
> at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
> tor$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:615)
> at org.apache.kafka.clients.consumer.internals.AbstractCoordina
> tor$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742)
> at org.apache.kafka.clients.consumer.internals.AbstractCoordina
> tor$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722)
> at org.apache.kafka.clients.consumer.internals.RequestFuture$1.
> onSuccess(RequestFuture.java:186)
> at org.apache.kafka.clients.consumer.internals.RequestFuture.fi
> reSuccess(RequestFuture.java:149)
> at org.apache.kafka.clients.consumer.internals.RequestFuture.co
> mplete(RequestFuture.java:116)
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
> lient$RequestFutureCompletionHandler.fireCompletion(Consumer
> NetworkClient.java:479)
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
> lient.firePendingCompletedRequests(ConsumerNetworkClient.java:316)
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
> lient.poll(ConsumerNetworkClient.java:219)
> at 

Re: Live-lock between consumer thread and heartbeat thread

2017-02-02 Thread Jan Lukavský

Hi Ismael,

yes, no problem:

The following thread is the main thread interacting with the 
KafkaConsumer (polling topic and committing offsets):


"pool-3-thread-1" #14 prio=5 os_prio=0 tid=0x7f00f4434800 nid=0x32a9 
runnable [0x7f00b6662000]

   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x0005c0abb218> (a sun.nio.ch.Util$3)
- locked <0x0005c0abb208> (a 
java.util.Collections$UnmodifiableSet)

- locked <0x0005c0abaa48> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at 
org.apache.kafka.common.network.Selector.select(Selector.java:470)

at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
at 
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
- locked <0x0005c0acf630> (a 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:499)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1104)
at cz.o2.hidden>.KafkaCommitLog.lambda$observePartitions$7(KafkaCommitLog.java:204)
- locked <0x0005c0612c88> (a 
java.util.Collections$SynchronizedMap)
at cz.o2.hidden>.KafkaCommitLog$$Lambda$62/1960388071.run(Unknown Source) <- here 
is the synchronized block that takes monitor of the `commitMap`
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

This thread just spins around in epoll returning 0. The other thread is 
the coordinator


"kafka-coordinator-heartbeat-thread | consumer" #15 daemon prio=5 
os_prio=0 tid=0x7f0084067000 nid=0x32aa waiting for monitor entry 
[0x7f00b6361000]

   java.lang.Thread.State: BLOCKED (on object monitor)
at java.util.Collections$SynchronizedMap.get(Collections.java:2584)
- waiting to lock <0x0005c0612c88> (a 
java.util.Collections$SynchronizedMap) <- waiting for the `commitMap`, 
which will never happen
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:635) 
<- handles response to the commitSync request
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:615)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:316)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:219)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:266)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:865)
- locked <0x0005c0acefc8> (a 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)


Hope this helps, if you needed any more debug info, I'm here to help. :)
Cheers,
 Jan

On 02/02/2017 12:48 PM, Ismael Juma wrote:

Hi Jan,

Do you have stacktraces showing the issue? That would help. Also, if you
can test 0.10.1.1, which is the latest stable release, that would be even
better. From looking at the code briefly, I don't see where the consumer is
locking on the received offsets map, so not sure what would cause it to
block in the way you describe. Hopefully a stacktrace when the consumer is
blocked would clarify. You can get a stacktrace 

[jira] [Assigned] (KAFKA-4724) Clean up of state directories has a race condition and can possibly remove stores that are still used

2017-02-02 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy reassigned KAFKA-4724:
-

Assignee: Damian Guy

> Clean up of state directories has a race condition and can possibly remove 
> stores that are still used
> -
>
> Key: KAFKA-4724
> URL: https://issues.apache.org/jira/browse/KAFKA-4724
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Damian Guy
>Assignee: Damian Guy
> Fix For: 0.10.3.0
>
>
> Currently, each {{StreamThread}} cleans up unused state directories based on 
> the config {{StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG}}. 
> In situations where there are multiple threads configured this can lead to a 
> race condition where the state directory gets removed by one thread when the 
> task has been assigned (but not yet taken a lock) to another thread in the 
> process.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4724) Clean up of state directories has a race condition and can possibly remove stores that are still used

2017-02-02 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-4724:
--
Summary: Clean up of state directories has a race condition and can 
possibly remove stores that are still used  (was: Clean up of state directories 
has a race condition and can possibly remove stores that are sill used)

> Clean up of state directories has a race condition and can possibly remove 
> stores that are still used
> -
>
> Key: KAFKA-4724
> URL: https://issues.apache.org/jira/browse/KAFKA-4724
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Damian Guy
> Fix For: 0.10.3.0
>
>
> Currently, each {{StreamThread}} cleans up unused state directories based on 
> the config {{StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG}}. 
> In situations where there are multiple threads configured this can lead to a 
> race condition where the state directory gets removed by one thread when the 
> task has been assigned (but not yet taken a lock) to another thread in the 
> process.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: Live-lock between consumer thread and heartbeat thread

2017-02-02 Thread Ismael Juma
Hi Jan,

Do you have stacktraces showing the issue? That would help. Also, if you
can test 0.10.1.1, which is the latest stable release, that would be even
better. From looking at the code briefly, I don't see where the consumer is
locking on the received offsets map, so not sure what would cause it to
block in the way you describe. Hopefully a stacktrace when the consumer is
blocked would clarify. You can get a stacktrace via the jstack tool.

Ismael

On Thu, Feb 2, 2017 at 10:45 AM, je.ik  wrote:

> Hi all,
> I have a question about a very suspicious behavior I see during consuming
> messages using manual synchronous commit with Kafka 0.10.1.0. The code
> looks something like this:
>
> try (KafkaConsumer<...> consumer = ...) {
>   Map commitMap =
> Collections.synchronizedMap(...);
>   while (!Thread.currentThread().isInterrupted()) {
> ConsumerRecords records = consumer.poll(..);
> for (...) {
>   // queue records for asynchronous processing in different thread.
>   // when the asynchronous processing finishes, it updates the
>   // `commitMap', so it has to be synchronized somehow
> }
> synchronized (commitMap) {
>   // commit if we have anything to commit
>   if (!commitMap.isEmpty()) {
> consumer.commitSync(commitMap);
> commitMap.clear();
>   }
> }
>   }
> }
>
>
> Now, what time to time happens in my case is that the consumer thread is
> stuck in the call to `commitSync`. By straing the PID I found out that it
> periodically epolls on an *empty* list of file descriptors. By further
> investigation I found out, that response to the `commitSync` is being
> handled by the kafka-coordinator-heartbeat-thread, which during handling
> of the response needs to access the `commitMap`, and therefore blocks,
> because the lock is being held by the application main thread. Therefore,
> the whole consumption stops and ends in live-lock. The solution in my case
> was to clone the map and unsynchronize the call to `commitSync` like this:
>
>   final Map clone;
>   synchronized (commitMap) {
> if (!commitMap.isEmpty()) {
>   clone = new HashMap<>(commitMap);
>   commitMap.clear();
> } else {
>   clone = null;
> }
>   }
>   if (clone != null) {
> consumer.commitSync(clone);
>   }
>
> which seems to work fine. My question is whether my interpretation of the
> problem is correct and if so, should be anything done to avoid this? I see
> two possibilities - either the call to `commitSync` should clone the map
> itself, or there should be somehow guaranteed that the same thread that
> issues synchronous requests receives the response. Am I right?
>
> Thanks for comments,
>  best,
>   Jan
>


[jira] [Created] (KAFKA-4724) Clean up of state directories has a race condition and can possibly remove stores that are sill used

2017-02-02 Thread Damian Guy (JIRA)
Damian Guy created KAFKA-4724:
-

 Summary: Clean up of state directories has a race condition and 
can possibly remove stores that are sill used
 Key: KAFKA-4724
 URL: https://issues.apache.org/jira/browse/KAFKA-4724
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.2.0
Reporter: Damian Guy
 Fix For: 0.10.3.0


Currently, each {{StreamThread}} cleans up unused state directories based on 
the config {{StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG}}. 
In situations where there are multiple threads configured this can lead to a 
race condition where the state directory gets removed by one thread when the 
task has been assigned (but not yet taken a lock) to another thread in the 
process.




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4723) offsets.storage=kafka - groups stuck in rebalancing with committed offsets

2017-02-02 Thread Jason Bew (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Bew updated KAFKA-4723:
-
Description: 
Hi, I have moved offset store to kafka only, when I now run;

 bin/kafka-consumer-groups.sh  --bootstrap-server localhost:9094 --describe  
--new-consumer --group my_consumer_group

I get the message;

Consumer group `my_consumer_group` does not exist or is rebalancing.

I have found the  issue KAFKA-3144 however this refers to consumer groups that 
have no committed offsets, the groups I am looking do and are constantly in use.

using --list I get all my consumer groups returned. Although some are inactive 
I have around 6 very active ones (millions of messages a day constantly). 
looking at the mbean data and kafka tool etc I can see the lags and offsets 
changing every second. Therefore I would expect the kafka-consumer-groups.sh 
script to return the lags and offsets for all 6 active consumer groups.

I think what has happened is when I moved offset storage to kafka from 
zookeeper (and then disabled sending to both), something has got confused.  
Querying zookeeper I get the offsets for the alleged missing consumer groups - 
but they should be stored and committed to kafka.

Many thanks.

  was:
Hi, I have moved offset store to kafka only, when I now run;

 bin/kafka-consumer-groups.sh  --bootstrap-server localhost:9094 --describe  
--new-consumer --group my_consumer_group

I get the message;

Consumer group `my_consumer_group` does not exist or is rebalancing.

I have found the  issue KAFKA-3144 however this refers to consumer groups that 
have no committed offsets, the groups I am looking do and are constantly in use.

using --list I get all my consumer groups returned. Although some are inactive 
I have around 6 very active ones (millions of messages a day constantly). 
looking at the mbean data and kafka tool etc I can see the lags and offsets 
changing every second. Therefore I would expect the kafka-consumer-groups.sh 
script to return the lags and offsets for all 6 active consumer groups.

Many thanks.


> offsets.storage=kafka - groups stuck in rebalancing with committed offsets
> --
>
> Key: KAFKA-4723
> URL: https://issues.apache.org/jira/browse/KAFKA-4723
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.1
>Reporter: Jason Bew
>Priority: Minor
>
> Hi, I have moved offset store to kafka only, when I now run;
>  bin/kafka-consumer-groups.sh  --bootstrap-server localhost:9094 --describe  
> --new-consumer --group my_consumer_group
> I get the message;
> Consumer group `my_consumer_group` does not exist or is rebalancing.
> I have found the  issue KAFKA-3144 however this refers to consumer groups 
> that have no committed offsets, the groups I am looking do and are constantly 
> in use.
> using --list I get all my consumer groups returned. Although some are 
> inactive I have around 6 very active ones (millions of messages a day 
> constantly). looking at the mbean data and kafka tool etc I can see the lags 
> and offsets changing every second. Therefore I would expect the 
> kafka-consumer-groups.sh script to return the lags and offsets for all 6 
> active consumer groups.
> I think what has happened is when I moved offset storage to kafka from 
> zookeeper (and then disabled sending to both), something has got confused.  
> Querying zookeeper I get the offsets for the alleged missing consumer groups 
> - but they should be stored and committed to kafka.
> Many thanks.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Live-lock between consumer thread and heartbeat thread

2017-02-02 Thread je.ik

Hi all,
I have a question about a very suspicious behavior I see during 
consuming messages using manual synchronous commit with Kafka 0.10.1.0. 
The code looks something like this:


try (KafkaConsumer<...> consumer = ...) {
  Map commitMap = 
Collections.synchronizedMap(...);

  while (!Thread.currentThread().isInterrupted()) {
ConsumerRecords records = consumer.poll(..);
for (...) {
  // queue records for asynchronous processing in different thread.
  // when the asynchronous processing finishes, it updates the
  // `commitMap', so it has to be synchronized somehow
}
synchronized (commitMap) {
  // commit if we have anything to commit
  if (!commitMap.isEmpty()) {
consumer.commitSync(commitMap);
commitMap.clear();
  }
}
  }
}


Now, what time to time happens in my case is that the consumer thread is 
stuck in the call to `commitSync`. By straing the PID I found out that 
it periodically epolls on an *empty* list of file descriptors. By 
further investigation I found out, that response to the `commitSync` is 
being handled by the kafka-coordinator-heartbeat-thread, which during 
handling of the response needs to access the `commitMap`, and therefore 
blocks, because the lock is being held by the application main thread. 
Therefore, the whole consumption stops and ends in live-lock. The 
solution in my case was to clone the map and unsynchronize the call to 
`commitSync` like this:


  final Map clone;
  synchronized (commitMap) {
if (!commitMap.isEmpty()) {
  clone = new HashMap<>(commitMap);
  commitMap.clear();
} else {
  clone = null;
}
  }
  if (clone != null) {
consumer.commitSync(clone);
  }

which seems to work fine. My question is whether my interpretation of 
the problem is correct and if so, should be anything done to avoid this? 
I see two possibilities - either the call to `commitSync` should clone 
the map itself, or there should be somehow guaranteed that the same 
thread that issues synchronous requests receives the response. Am I right?


Thanks for comments,
 best,
  Jan


  1   2   >