Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging

2016-12-01 Thread Jason Gustafson
Hey Jay,

Thanks for the questions! Let me take a couple of them.

2. The initTransactions() call is a little annoying. Can we get rid of
>that and call it automatically if you set a transaction.app.id when we
>do the first message send as we do with metadata? Arguably we should
> have
>included a general connect() or init() call in the producer, but given
> that
>we didn't do this it seems weird that the cluster metadata initializes
>automatically on demand and the transaction metadata doesn't.


The purpose of this call is to fence off any producer with the same AppID
and await the completion of any pending transactions. When it returns, you
know that your producer is safe to resume work. Take the the "consume and
produce" use case as an example. We send the offset commits as part of the
producer's transaction (approximating the idea that it is "just another
write to a partition"). When you first initialize the application, you have
to determine when it's safe for the consumer to read those offsets.
Otherwise, you may read stale offsets before a transaction which is rolling
forward is able to write the marker to __consumer_offsets. So we can't do
the initialization in send() because that would assume that we had already
read data from the consumer, which we can't do until we've initialized the
producer. Does that make sense?

(For what it's worth, we're not married to this name or any of the others,
so anyone can feel free to suggest alternatives.)


5. One implication of factoring out the message set seems to be you
>can't ever "repack" messages to improve compression beyond what is done
> by
>the producer. We'd talked about doing this either by buffering when
> writing
>or during log cleaning. This isn't a show stopper but I think one
>implication is that we won't be able to do this. Furthermore with log
>cleaning you'd assume that over time ALL messages would collapse down
> to a
>single wrapper as compaction removes the others.


Yeah, that's a fair point. You may still be able to do some merging if
adjacent message sets have the same PID, but the potential savings might
not be worth the cost of implementation. My gut feeling is that merging
message sets from different producers may not be a great idea anyway (you'd
have to accept the fact that you always need "deep iteration" to find the
PIDs contained within the message set), but I haven't thought a ton about
it. Ultimately we'll have to decide if the potential for savings in the
future is worth some loss in efficiency now (for what it's worth, I think
the work that Ben has been looking at also hopes to bundle some more
information into the message set header).

On a purely pragmatic development level, after spending a ton of recent
time working with that code, I can say that the benefit of having a
conceptually simpler message format is huge. It allows you to converge the
paths for validation of message sets on the broker, for example. Currently,
we pretty much need two separate paths everywhere we process messages. It
can be tricky just to tell if the message you're dealing with is the inner
or outer message, and whether it matters or not. Also, the fact that the
inner and outer messages share common fields makes bugs like KAFKA-4298
 possible. The risk of
these bugs is much smaller when you can clearly separate the fields allowed
in the message set header and those in the messages.


Thanks,
Jason

On Thu, Dec 1, 2016 at 8:19 PM, Jay Kreps  wrote:

> Looks great!
>
> A few questions:
>
>1. What is the relationship between transaction.app.id and the existing
>config application.id in streams?
>2. The initTransactions() call is a little annoying. Can we get rid of
>that and call it automatically if you set a transaction.app.id when we
>do the first message send as we do with metadata? Arguably we should
> have
>included a general connect() or init() call in the producer, but given
> that
>we didn't do this it seems weird that the cluster metadata initializes
>automatically on demand and the transaction metadata doesn't.
>3. The equivalent concept of what we call "fetch.mode" in databases is
>called "isolation level" and takes values like "serializable", "read
>committed", "read uncommitted". Since we went with transaction as the
> name
>for the thing in between the begin/commit might make sense to use this
>terminology for the concept and levels? I think the behavior we are
>planning is "read committed" and the alternative re-ordering behavior is
>equivalent to "serializable"?
>4. Can the PID be made 4 bytes if we handle roll-over gracefully? 2
>billion concurrent producers should be enough for anyone, right?
>5. One implication of factoring out the message set seems to be you
>can't ever "repack" messages to improve compression beyond what is done
> by
>

Jenkins build is back to normal : kafka-trunk-jdk7 #1724

2016-12-01 Thread Apache Jenkins Server
See 



Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging

2016-12-01 Thread Apurva Mehta
Hi Jay,

Thanks for your comments. Answers to some of your points are below:

2. There have been long debates about the necessity of the initTransactions
method. Let's consider the options for doing without the initTransactions
method:

   - If we do it on demand, we have to consider that the InitPIDRequest
   sent by initTransactions actually does transaction recovery. It would be
   slightly counterintuitive for the previous transactions to be recovered
   _only_ when new data is produced.
   - We did think about doing the initialization in the constructor, but
   then this would introduce potentially long blocking calls to the
   constructor in the case of transaction recovery. In particular, the
   coordinator would have to write the commit markers to all the partitions
   that are part of the transactions during recovery.
   - We did think about doing it in the 'beginTransaction' method, but then
   this suffers the same critique as doing it on demand: transaction recovery
   only happens when the next transaction is started.

These were the considerations that motivated the initTransactions method.
Personally, I am not opposed to doing transaction recovery in the
constructor and would be interested in others' opinions on this. Of course,
it would be even better if there were a third or fourth option!

* *

3. That nomenclature makes sense. We can adopt it unless others have
objections.

* *

4. I think the PID allocation with only 4 byte PIDs may become tricky. In
particular, the current algorithm for PID allocation is to have each
coordinator carve out some part of the PID space and then allocate from its
pool. The available PIDs would be stored in zookeeper and parceled out to
coordinators on demand. With 64bit PIDs we can be sure that we would have
PIDs available till eternity.

With a 32 bit PID, we are less certain because our PID expiration algorithm
doesn't 'restore' expired PIDs. If we did restore PIDs, then I think 32bit
PIDs would be sufficient, since the space would only have to be big enough
to handle all concurrent producers. The latter is worth thinking about
given the storage savings. We will explore restoring PIDs on expiration,
and reducing to 32bit PIDs.

* *

Thanks,
Apurva

On Thu, Dec 1, 2016 at 8:19 PM, Jay Kreps  wrote:

> Looks great!
>
> A few questions:
>
>1. What is the relationship between transaction.app.id and the existing
>config application.id in streams?
>2. The initTransactions() call is a little annoying. Can we get rid of
>that and call it automatically if you set a transaction.app.id when we
>do the first message send as we do with metadata? Arguably we should
> have
>included a general connect() or init() call in the producer, but given
> that
>we didn't do this it seems weird that the cluster metadata initializes
>automatically on demand and the transaction metadata doesn't.
>3. The equivalent concept of what we call "fetch.mode" in databases is
>called "isolation level" and takes values like "serializable", "read
>committed", "read uncommitted". Since we went with transaction as the
> name
>for the thing in between the begin/commit might make sense to use this
>terminology for the concept and levels? I think the behavior we are
>planning is "read committed" and the alternative re-ordering behavior is
>equivalent to "serializable"?
>4. Can the PID be made 4 bytes if we handle roll-over gracefully? 2
>billion concurrent producers should be enough for anyone, right?
>5. One implication of factoring out the message set seems to be you
>can't ever "repack" messages to improve compression beyond what is done
> by
>the producer. We'd talked about doing this either by buffering when
> writing
>or during log cleaning. This isn't a show stopper but I think one
>implication is that we won't be able to do this. Furthermore with log
>cleaning you'd assume that over time ALL messages would collapse down
> to a
>single wrapper as compaction removes the others.
>
> -Jay
>
> On Wed, Nov 30, 2016 at 2:19 PM, Guozhang Wang  wrote:
>
> > Hi all,
> >
> > I have just created KIP-98 to enhance Kafka with exactly once delivery
> > semantics:
> >
> > *https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> >  > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging>*
> >
> > This KIP adds a transactional messaging mechanism along with an
> idempotent
> > producer implementation to make sure that 1) duplicated messages sent
> from
> > the same identified producer can be detected on the broker side, and 2) a
> > group of messages sent within a transaction will atomically be either
> > reflected and fetchable to consumers or not as a whole.
> >
> > The above wiki page provides a high-level view of the proposed changes as
> > well as 

Build failed in Jenkins: kafka-trunk-jdk8 #1073

2016-12-01 Thread Apache Jenkins Server
See 

Changes:

[jason] KAFKA-4399; Fix deadlock between cleanupGroupMetadata and offset commit

--
[...truncated 14660 lines...]
org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldAddInternalTopicConfigWithCompactForNonWindowStores PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSinkWithSameName STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSinkWithSameName PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSinkWithSelfParent STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSinkWithSelfParent PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddProcessorWithSelfParent STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddProcessorWithSelfParent PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldAssociateStateStoreNameWhenStateStoreSupplierIsInternal STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldAssociateStateStoreNameWhenStateStoreSupplierIsInternal PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddStateStoreWithSink STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddStateStoreWithSink PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > testTopicGroups STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > testTopicGroups PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > testBuild STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > testBuild PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAddNullStateStoreSupplier STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAddNullStateStoreSupplier PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAllowNullNameWhenAddingSource STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAllowNullNameWhenAddingSource PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAllowNullTopicWhenAddingSink STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAllowNullTopicWhenAddingSink PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddStateStoreWithSource STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddStateStoreWithSource PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldThroughOnUnassignedStateStoreAccess STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldThroughOnUnassignedStateStoreAccess PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAllowNullProcessorNameWhenConnectingProcessorAndStateStores STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAllowNullProcessorNameWhenConnectingProcessorAndStateStores PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSourceWithSameName STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSourceWithSameName PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddMoreThanOnePatternSourceNode STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddMoreThanOnePatternSourceNode PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddProcessorWithSameName STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddProcessorWithSameName PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldSetCorrectSourceNodesWithRegexUpdatedTopics STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldSetCorrectSourceNodesWithRegexUpdatedTopics PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSourceWithSameTopic STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSourceWithSameTopic PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testTopicGroupsByStateStore STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testTopicGroupsByStateStore PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddStateStoreWithDuplicates STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddStateStoreWithDuplicates PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > testPatternSourceTopic 
STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > testPatternSourceTopic 
PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldAssociateStateStoreNameWhenStateStoreSupplierIsExternal STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldAssociateStateStoreNameWhenStateStoreSupplierIsExternal PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSinkWithWrongParent STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSinkWithWrongParent PASSED


Re: [DISCUSS] KIP-82 - Add Record Headers

2016-12-01 Thread Todd Palino
Come on, I’ve done at least 2 talks on this one :)

Producing counts to a topic is part of it, but that’s only part. So you
count you have 100 messages in topic A. When you mirror topic A to another
cluster, you have 99 messages. Where was your problem? Or worse, you have
100 messages, but one producer duplicated messages and another one lost
messages. You need details about where the message came from in order to
pinpoint problems when they happen. Source producer info, where it was
produced into your infrastructure, and when it was produced. This requires
you to add the information to the message.

And yes, you still need to maintain your clients. So maybe my original
example was not the best. My thoughts on not wanting to be responsible for
message formats stands, because that’s very much separate from the client.
As you know, we have our own internal client library that can insert the
right headers, and right now inserts the right audit information into the
message fields. If they exist, and assuming the message is Avro encoded.
What if someone wants to use JSON instead for a good reason? What if user X
wants to encrypt messages, but user Y does not? Maintaining the client
library is still much easier than maintaining the message formats.

-Todd


On Thu, Dec 1, 2016 at 6:21 PM, Gwen Shapira  wrote:

> Based on your last sentence, consider me convinced :)
>
> I get why headers are critical for Mirroring (you need tags to prevent
> loops and sometimes to route messages to the correct destination).
> But why do you need headers to audit? We are auditing by producing
> counts to a side topic (and I was under the impression you do the
> same), so we never need to modify the message.
>
> Another thing - after we added headers, wouldn't you be in the
> business of making sure everyone uses them properly? Making sure
> everyone includes the right headers you need, not using the header
> names you intend to use, etc. I don't think the "policing" business
> will ever go away.
>
> On Thu, Dec 1, 2016 at 5:25 PM, Todd Palino  wrote:
> > Got it. As an ops guy, I'm not very happy with the workaround. Avro means
> > that I have to be concerned with the format of the messages in order to
> run
> > the infrastructure (audit, mirroring, etc.). That means that I have to
> > handle the schemas, and I have to enforce rules about good formats. This
> is
> > not something I want to be in the business of, because I should be able
> to
> > run a service infrastructure without needing to be in the weeds of
> dealing
> > with customer data formats.
> >
> > Trust me, a sizable portion of my support time is spent dealing with
> schema
> > issues. I really would like to get away from that. Maybe I'd have more
> time
> > for other hobbies. Like writing. ;)
> >
> > -Todd
> >
> > On Thu, Dec 1, 2016 at 4:04 PM Gwen Shapira  wrote:
> >
> >> I'm pretty satisfied with the current workarounds (Avro container
> >> format), so I'm not too excited about the extra work required to do
> >> headers in Kafka. I absolutely don't mind it if you do it...
> >> I think the Apache convention for "good idea, but not willing to put
> >> any work toward it" is +0.5? anyway, that's what I was trying to
> >> convey :)
> >>
> >> On Thu, Dec 1, 2016 at 3:05 PM, Todd Palino  wrote:
> >> > Well I guess my question for you, then, is what is holding you back
> from
> >> > full support for headers? What’s the bit that you’re missing that has
> you
> >> > under a full +1?
> >> >
> >> > -Todd
> >> >
> >> >
> >> > On Thu, Dec 1, 2016 at 1:59 PM, Gwen Shapira 
> wrote:
> >> >
> >> >> I know why people who support headers support them, and I've seen
> what
> >> >> the discussion is like.
> >> >>
> >> >> This is why I'm asking people who are against headers (especially
> >> >> committers) what will make them change their mind - so we can get
> this
> >> >> part over one way or another.
> >> >>
> >> >> If I sound frustrated it is not at Radai, Jun or you (Todd)... I am
> >> >> just looking for something concrete we can do to move the discussion
> >> >> along to the yummy design details (which is the argument I really am
> >> >> looking forward to).
> >> >>
> >> >> On Thu, Dec 1, 2016 at 1:53 PM, Todd Palino 
> wrote:
> >> >> > So, Gwen, to your question (even though I’m not a committer)...
> >> >> >
> >> >> > I have always been a strong supporter of introducing the concept
> of an
> >> >> > envelope to messages, which headers accomplishes. The message key
> is
> >> >> > already an example of a piece of envelope information. By
> providing a
> >> >> means
> >> >> > to do this within Kafka itself, and not relying on use-case
> specific
> >> >> > implementations, you make it much easier for components to
> >> interoperate.
> >> >> It
> >> >> > simplifies development of all these things (message routing,
> auditing,
> >> >> > encryption, etc.) because each one 

[GitHub] kafka pull request #2202: MINOR: Improvements in group metadata cleanup and ...

2016-12-01 Thread hachikuji
GitHub user hachikuji opened a pull request:

https://github.com/apache/kafka/pull/2202

MINOR: Improvements in group metadata cleanup and test coverage



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hachikuji/kafka group-expiration-cleanup

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2202.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2202


commit d4ca10893cf7246d3ccf5f6bfe66539b400b6bdb
Author: Jason Gustafson 
Date:   2016-12-02T04:37:42Z

MINOR: Improvements in group metadata cleanup and test coverage




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-93: Improve invalid timestamp handling in Kafka Streams

2016-12-01 Thread Matthias J. Sax
Yes! Sorry for that typo.

On 12/1/16 5:07 PM, Guozhang Wang wrote:
> You mean "it is a backward incompatible change" right?
> 
> On Wed, Nov 30, 2016 at 4:28 PM, Matthias J. Sax 
> wrote:
> 
>> Thanks for this clarification (and your +1)
>>
>> I completely agree and just want to add my thoughts:
>>
>> 1. Yes, it is a backward compatible change but as I discusses with
>> others, we want accept this for now. All previous releases did contain
>> non-compatible changes for Kafka Streams, too. And as Kafka Streams API
>> is not guaranteed to be stable at this point, we better do breaking
>> changes now than later.
>>
>> 2. At some point, we need to be more conservative with breaking chances
>> and only allow them for major releases.
>>
>> 3. As we expect that most people do use default timestamp extractor,
>> they will not be effected anyway. Only if custom extractors are used,
>> the application needs to be recompiled. Thus, the effort to make the
>> change backward compatible seems not to be worth the effort.
>>
>>
>> -Matthias
>>
>> On 11/29/16 9:57 PM, Ewen Cheslack-Postava wrote:
>>> I think this looks reasonable, but just a more general note on
>>> compatibility -- I think it's worth trying to clarify what types of
>>> compatibility we're trying to achieve. Guozhang's 1 & 2 give an important
>>> breakdown (compile vs runtime compatibility). For the type of change
>>> described here, I think it makes sense to clarify the compatibility
>> goals.
>>> The (pure) compile time compatibility vs (pure) runtime compatibility
>>> aren't the only options -- you have some additional intermediate choices
>> as
>>> well.
>>>
>>> The proposal describes a change which requires recompiling the plugin
>>> (TimestampExtractor) *and* substituting a runtime library (kafka-streams
>>> jar) to get correct updated behavior. This can get interesting if you
>>> already have N streams apps sharing the same TimestampExtractor. You now
>>> *must* update all of them to the new streams jar if any are to be updated
>>> for the new TimestampExtractor API. For folks with a monolithic
>>> repo/versioning setup, this could potentially be painful since they're
>>> forced to update all apps at once. It's at least not too bad since it can
>>> be isolated to a single commit (without deployment needing to be
>>> coordinated, for example), but if the # of apps gets > 4 or 5, these
>> types
>>> of updates start to be a real pain.
>>>
>>> I think this API change is an acceptable (albeit annoying) API
>>> incompatibility right now, but wanted to raise this in the discussion of
>>> this KIP so we consider this moving forward. There definitely are
>>> alternatives that add the new functionality but maintain compatibility
>>> better. In particular, it's possible to define the new interface to
>> require
>>> both APIs:
>>>
>>> // new interface
>>> public interface TimestampExtractor {
>>> long extract(ConsumerRecord record);
>>> long extract(ConsumerRecord record, long
>>> previousTimestamp);
>>> }
>>>
>>> which requires more effort for the implementor of the new API, but
>>> maintains compatibility if you want to use a new jar including the
>>> TimestampExtractor even with the old version of streams/the
>>> TimestampExtractor interface (since it will correctly dispatch to the old
>>> implementation). It requires more effort on the part of the framework
>> since
>>> it needs to catch runtime exceptions when the second version of extract()
>>> is missing and fall back to the first version. But in some cases that
>> might
>>> be warranted for the sake of compatibility.
>>>
>>> I suspect this update won't cause too much pain right now just because
>> the
>>> number of streams app any user has won't be too large quite yet, but this
>>> seems important to consider moving forward. I think we had some similar
>>> concerns & discussion around the changes to the consumer APIs when trying
>>> to generalize the collection types used in those APIs.
>>>
>>> -Ewen
>>>
>>>
>>> On Mon, Nov 28, 2016 at 10:46 AM, Matthias J. Sax >>
>>> wrote:
>>>
 Done.

 If there is no further comments, I would like to start a voting thread
 in a separate email.

 -Matthias

 On 11/28/16 9:08 AM, Guozhang Wang wrote:
> Yes it does not include these, again in my previous previous email I
 meant
> when you say "This is a breaking, incompatible change" people may
 interpret
> it differently. So better explain it more clearly.
>
>
> Guozhang
>
> On Thu, Nov 24, 2016 at 10:31 PM, Matthias J. Sax <
>> matth...@confluent.io
>
> wrote:
>
>> That does make sense. But KIP-93 does not change anything like this,
>> so
>> there is nothing to mention, IMHO.
>>
>> Or do you mean, the KIP should include that the change is backward
>> compatible with this regard?
>>
>> -Matthias
>>
>>
>>
>> 

Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging

2016-12-01 Thread Jay Kreps
Looks great!

A few questions:

   1. What is the relationship between transaction.app.id and the existing
   config application.id in streams?
   2. The initTransactions() call is a little annoying. Can we get rid of
   that and call it automatically if you set a transaction.app.id when we
   do the first message send as we do with metadata? Arguably we should have
   included a general connect() or init() call in the producer, but given that
   we didn't do this it seems weird that the cluster metadata initializes
   automatically on demand and the transaction metadata doesn't.
   3. The equivalent concept of what we call "fetch.mode" in databases is
   called "isolation level" and takes values like "serializable", "read
   committed", "read uncommitted". Since we went with transaction as the name
   for the thing in between the begin/commit might make sense to use this
   terminology for the concept and levels? I think the behavior we are
   planning is "read committed" and the alternative re-ordering behavior is
   equivalent to "serializable"?
   4. Can the PID be made 4 bytes if we handle roll-over gracefully? 2
   billion concurrent producers should be enough for anyone, right?
   5. One implication of factoring out the message set seems to be you
   can't ever "repack" messages to improve compression beyond what is done by
   the producer. We'd talked about doing this either by buffering when writing
   or during log cleaning. This isn't a show stopper but I think one
   implication is that we won't be able to do this. Furthermore with log
   cleaning you'd assume that over time ALL messages would collapse down to a
   single wrapper as compaction removes the others.

-Jay

On Wed, Nov 30, 2016 at 2:19 PM, Guozhang Wang  wrote:

> Hi all,
>
> I have just created KIP-98 to enhance Kafka with exactly once delivery
> semantics:
>
> *https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
>  98+-+Exactly+Once+Delivery+and+Transactional+Messaging>*
>
> This KIP adds a transactional messaging mechanism along with an idempotent
> producer implementation to make sure that 1) duplicated messages sent from
> the same identified producer can be detected on the broker side, and 2) a
> group of messages sent within a transaction will atomically be either
> reflected and fetchable to consumers or not as a whole.
>
> The above wiki page provides a high-level view of the proposed changes as
> well as summarized guarantees. Initial draft of the detailed implementation
> design is described in this Google doc:
>
> https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF
> 0wSw9ra8
>
>
> We would love to hear your comments and suggestions.
>
> Thanks,
>
> -- Guozhang
>


[jira] [Updated] (KAFKA-4399) Deadlock between cleanupGroupMetadata and offset commit

2016-12-01 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-4399:
---
   Resolution: Fixed
Fix Version/s: 0.10.2.0
   Status: Resolved  (was: Patch Available)

Issue resolved by pull request 2125
[https://github.com/apache/kafka/pull/2125]

> Deadlock between cleanupGroupMetadata and offset commit
> ---
>
> Key: KAFKA-4399
> URL: https://issues.apache.org/jira/browse/KAFKA-4399
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>Reporter: Alexey Ozeritskiy
>Assignee: Alexey Ozeritskiy
>Priority: Blocker
> Fix For: 0.10.2.0, 0.10.1.1
>
> Attachments: deadlock-stack
>
>
> We have upgraded our clusters to 0.10.1.0 and got deadlock issue.
> We thought it smth like https://issues.apache.org/jira/browse/KAFKA-3994, but 
> patch did not help us and our stacks is different. I think it is other issue.
> Stack traces attached



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4399) Deadlock between cleanupGroupMetadata and offset commit

2016-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15713941#comment-15713941
 ] 

ASF GitHub Bot commented on KAFKA-4399:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2125


> Deadlock between cleanupGroupMetadata and offset commit
> ---
>
> Key: KAFKA-4399
> URL: https://issues.apache.org/jira/browse/KAFKA-4399
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>Reporter: Alexey Ozeritskiy
>Assignee: Alexey Ozeritskiy
>Priority: Blocker
> Fix For: 0.10.1.1
>
> Attachments: deadlock-stack
>
>
> We have upgraded our clusters to 0.10.1.0 and got deadlock issue.
> We thought it smth like https://issues.apache.org/jira/browse/KAFKA-3994, but 
> patch did not help us and our stacks is different. I think it is other issue.
> Stack traces attached



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2125: KAFKA-4399; deadlock cleanupGroupMetadata and offs...

2016-12-01 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2125


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-82 - Add Record Headers

2016-12-01 Thread Gwen Shapira
Based on your last sentence, consider me convinced :)

I get why headers are critical for Mirroring (you need tags to prevent
loops and sometimes to route messages to the correct destination).
But why do you need headers to audit? We are auditing by producing
counts to a side topic (and I was under the impression you do the
same), so we never need to modify the message.

Another thing - after we added headers, wouldn't you be in the
business of making sure everyone uses them properly? Making sure
everyone includes the right headers you need, not using the header
names you intend to use, etc. I don't think the "policing" business
will ever go away.

On Thu, Dec 1, 2016 at 5:25 PM, Todd Palino  wrote:
> Got it. As an ops guy, I'm not very happy with the workaround. Avro means
> that I have to be concerned with the format of the messages in order to run
> the infrastructure (audit, mirroring, etc.). That means that I have to
> handle the schemas, and I have to enforce rules about good formats. This is
> not something I want to be in the business of, because I should be able to
> run a service infrastructure without needing to be in the weeds of dealing
> with customer data formats.
>
> Trust me, a sizable portion of my support time is spent dealing with schema
> issues. I really would like to get away from that. Maybe I'd have more time
> for other hobbies. Like writing. ;)
>
> -Todd
>
> On Thu, Dec 1, 2016 at 4:04 PM Gwen Shapira  wrote:
>
>> I'm pretty satisfied with the current workarounds (Avro container
>> format), so I'm not too excited about the extra work required to do
>> headers in Kafka. I absolutely don't mind it if you do it...
>> I think the Apache convention for "good idea, but not willing to put
>> any work toward it" is +0.5? anyway, that's what I was trying to
>> convey :)
>>
>> On Thu, Dec 1, 2016 at 3:05 PM, Todd Palino  wrote:
>> > Well I guess my question for you, then, is what is holding you back from
>> > full support for headers? What’s the bit that you’re missing that has you
>> > under a full +1?
>> >
>> > -Todd
>> >
>> >
>> > On Thu, Dec 1, 2016 at 1:59 PM, Gwen Shapira  wrote:
>> >
>> >> I know why people who support headers support them, and I've seen what
>> >> the discussion is like.
>> >>
>> >> This is why I'm asking people who are against headers (especially
>> >> committers) what will make them change their mind - so we can get this
>> >> part over one way or another.
>> >>
>> >> If I sound frustrated it is not at Radai, Jun or you (Todd)... I am
>> >> just looking for something concrete we can do to move the discussion
>> >> along to the yummy design details (which is the argument I really am
>> >> looking forward to).
>> >>
>> >> On Thu, Dec 1, 2016 at 1:53 PM, Todd Palino  wrote:
>> >> > So, Gwen, to your question (even though I’m not a committer)...
>> >> >
>> >> > I have always been a strong supporter of introducing the concept of an
>> >> > envelope to messages, which headers accomplishes. The message key is
>> >> > already an example of a piece of envelope information. By providing a
>> >> means
>> >> > to do this within Kafka itself, and not relying on use-case specific
>> >> > implementations, you make it much easier for components to
>> interoperate.
>> >> It
>> >> > simplifies development of all these things (message routing, auditing,
>> >> > encryption, etc.) because each one does not have to reinvent the
>> wheel.
>> >> >
>> >> > It also makes it much easier from a client point of view if the
>> headers
>> >> are
>> >> > defined as part of the protocol and/or message format in general
>> because
>> >> > you can easily produce and consume messages without having to take
>> into
>> >> > account specific cases. For example, I want to route messages, but
>> >> client A
>> >> > doesn’t support the way audit implemented headers, and client B
>> doesn’t
>> >> > support the way encryption or routing implemented headers, so now my
>> >> > application has to create some really fragile (my autocorrect just
>> tried
>> >> to
>> >> > make that “tragic”, which is probably appropriate too) code to strip
>> >> > everything off, rather than just consuming the messages, picking out
>> the
>> >> 1
>> >> > or 2 headers it’s interested in, and performing its function.
>> >> >
>> >> > Honestly, this discussion has been going on for a long time, and it’s
>> >> > always “Oh, you came up with 2 use cases, and yeah, those use cases
>> are
>> >> > real things that someone would want to do. Here’s an alternate way to
>> >> > implement them so let’s not do headers.” If we have a few use cases
>> that
>> >> we
>> >> > actually came up with, you can be sure that over the next year
>> there’s a
>> >> > dozen others that we didn’t think of that someone would like to do. I
>> >> > really think it’s time to stop rehashing this discussion and instead
>> >> focus
>> >> > on a workable standard that we 

Re: A strange controller log in Kafka 0.9.0.1

2016-12-01 Thread Json Tu
Hi,
Can someone else help to review the pr in jira: 
https://issues.apache.org/jira/browse/KAFKA-4447 
.

> 在 2016年11月23日,下午11:28,Json Tu  写道:
> 
> Hi,
>   We have a cluster of kafka 0.9.0.1 with 3 nodes, and we found a strange 
> controller log as below.
> 
> [2016-11-07 03:14:48,575] INFO [SessionExpirationListener on 100], ZK 
> expired; shut down all controller components and try to re-elect 
> (kafka.controller.KafkaController$SessionExpirationListener)
> [2016-11-07 03:14:48,578] DEBUG [Controller 100]: Controller resigning, 
> broker id 100 (kafka.controller.KafkaController)
> [2016-11-07 03:14:48,579] DEBUG [Controller 100]: De-registering 
> IsrChangeNotificationListener (kafka.controller.KafkaController)
> [2016-11-07 03:14:48,579] INFO [delete-topics-thread-100], Shutting down 
> (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
> [2016-11-07 03:14:48,579] INFO [delete-topics-thread-100], Stopped  
> (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
> [2016-11-07 03:14:48,579] INFO [delete-topics-thread-100], Shutdown completed 
> (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
> [2016-11-07 03:14:48,580] INFO [Partition state machine on Controller 100]: 
> Stopped partition state machine (kafka.controller.PartitionStateMachine)
> [2016-11-07 03:14:48,580] INFO [Replica state machine on controller 100]: 
> Stopped replica state machine (kafka.controller.ReplicaStateMachine)
> [2016-11-07 03:14:48,583] INFO [Controller-100-to-broker-101-send-thread], 
> Shutting down (kafka.controller.RequestSendThread)
> [2016-11-07 03:14:48,584] INFO [Controller-100-to-broker-101-send-thread], 
> Stopped  (kafka.controller.RequestSendThread)
> [2016-11-07 03:14:48,584] INFO [Controller-100-to-broker-101-send-thread], 
> Shutdown completed (kafka.controller.RequestSendThread)
> [2016-11-07 03:14:48,586] INFO [Controller-100-to-broker-100-send-thread], 
> Shutting down (kafka.controller.RequestSendThread)
> [2016-11-07 03:14:48,587] INFO [Controller-100-to-broker-100-send-thread], 
> Stopped  (kafka.controller.RequestSendThread)
> [2016-11-07 03:14:48,587] INFO [Controller-100-to-broker-100-send-thread], 
> Shutdown completed (kafka.controller.RequestSendThread)
> [2016-11-07 03:14:48,587] INFO [Controller 100]: Broker 100 resigned as the 
> controller (kafka.controller.KafkaController)
> [2016-11-07 03:14:48,652] DEBUG [IsrChangeNotificationListener] Fired!!! 
> (kafka.controller.IsrChangeNotificationListener)
> [2016-11-07 03:14:48,668] INFO [BrokerChangeListener on Controller 100]: 
> Broker change listener fired for path /brokers/ids with children 101,100 
> (kafka.controller.ReplicaStateMachine$BrokerChangeListener)
> [2016-11-07 03:14:48,683] DEBUG [DeleteTopicsListener on 100]: Delete topics 
> listener fired for topics  to be deleted 
> (kafka.controller.PartitionStateMachine$DeleteTopicsListener)
> [2016-11-07 03:14:48,687] INFO [AddPartitionsListener on 100]: Add Partition 
> triggered 
> {"version":1,"partitions":{"4":[102,101],"5":[100,102],"1":[102,100],"0":[101,102],"2":[100,101],"3":[101,100]}}
>  for path /brokers/topics/movie.gateway.merselllog.syncCinema 
> (kafka.controller.PartitionStateMachine$AddPartitionsListener)
> [2016-11-07 03:14:48,694] INFO [AddPartitionsListener on 100]: Add Partition 
> triggered 
> {"version":1,"partitions":{"4":[102,101],"5":[100,102],"1":[102,100],"0":[101,102],"2":[100,101],"3":[101,100]}}
>  for path /brokers/topics/push_3rdparty_high 
> (kafka.controller.PartitionStateMachine$AddPartitionsListener)
> [2016-11-07 03:14:48,707] INFO [AddPartitionsListener on 100]: Add Partition 
> triggered 
> {"version":1,"partitions":{"4":[101,102],"5":[102,100],"1":[101,100],"0":[100,102],"2":[102,101],"3":[100,101]}}
>  for path /brokers/topics/icb_msg_push_high_02 
> (kafka.controller.PartitionStateMachine$AddPartitionsListener)
> [2016-11-07 03:14:48,715] INFO [AddPartitionsListener on 100]: Add Partition 
> triggered 
> {"version":1,"partitions":{"4":[102,100],"5":[100,101],"1":[102,101],"0":[101,100],"2":[100,102],"3":[101,102]}}
>  for path /brokers/topics/movie.gateway.merselllog.unlockSeat 
> (kafka.controller.PartitionStateMachine$AddPartitionsListener)
> 
> 
>   From the log we can see that old controller 100 resigned as the 
> controller successfully,but what confused me is that it can also receive 
> Fired!!! from IsrChangeNotificationListener which have beed de-register 
> before,
> and we can see broker 100 not elect as new controller next time. but we can 
> see IsrChangeNotificationListener、DeleteTopicsListener、AddPartitionsListener 
> all fired after ressign,does it seems something run with zookeeper.
>   Any suggestion is appreciated, thanks in advance.
> 
> 



[GitHub] kafka pull request #2179: MINOR: Fix typos in KafkaConsumer docs

2016-12-01 Thread jeffwidman
Github user jeffwidman closed the pull request at:

https://github.com/apache/kafka/pull/2179


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-82 - Add Record Headers

2016-12-01 Thread Todd Palino
Got it. As an ops guy, I'm not very happy with the workaround. Avro means
that I have to be concerned with the format of the messages in order to run
the infrastructure (audit, mirroring, etc.). That means that I have to
handle the schemas, and I have to enforce rules about good formats. This is
not something I want to be in the business of, because I should be able to
run a service infrastructure without needing to be in the weeds of dealing
with customer data formats.

Trust me, a sizable portion of my support time is spent dealing with schema
issues. I really would like to get away from that. Maybe I'd have more time
for other hobbies. Like writing. ;)

-Todd

On Thu, Dec 1, 2016 at 4:04 PM Gwen Shapira  wrote:

> I'm pretty satisfied with the current workarounds (Avro container
> format), so I'm not too excited about the extra work required to do
> headers in Kafka. I absolutely don't mind it if you do it...
> I think the Apache convention for "good idea, but not willing to put
> any work toward it" is +0.5? anyway, that's what I was trying to
> convey :)
>
> On Thu, Dec 1, 2016 at 3:05 PM, Todd Palino  wrote:
> > Well I guess my question for you, then, is what is holding you back from
> > full support for headers? What’s the bit that you’re missing that has you
> > under a full +1?
> >
> > -Todd
> >
> >
> > On Thu, Dec 1, 2016 at 1:59 PM, Gwen Shapira  wrote:
> >
> >> I know why people who support headers support them, and I've seen what
> >> the discussion is like.
> >>
> >> This is why I'm asking people who are against headers (especially
> >> committers) what will make them change their mind - so we can get this
> >> part over one way or another.
> >>
> >> If I sound frustrated it is not at Radai, Jun or you (Todd)... I am
> >> just looking for something concrete we can do to move the discussion
> >> along to the yummy design details (which is the argument I really am
> >> looking forward to).
> >>
> >> On Thu, Dec 1, 2016 at 1:53 PM, Todd Palino  wrote:
> >> > So, Gwen, to your question (even though I’m not a committer)...
> >> >
> >> > I have always been a strong supporter of introducing the concept of an
> >> > envelope to messages, which headers accomplishes. The message key is
> >> > already an example of a piece of envelope information. By providing a
> >> means
> >> > to do this within Kafka itself, and not relying on use-case specific
> >> > implementations, you make it much easier for components to
> interoperate.
> >> It
> >> > simplifies development of all these things (message routing, auditing,
> >> > encryption, etc.) because each one does not have to reinvent the
> wheel.
> >> >
> >> > It also makes it much easier from a client point of view if the
> headers
> >> are
> >> > defined as part of the protocol and/or message format in general
> because
> >> > you can easily produce and consume messages without having to take
> into
> >> > account specific cases. For example, I want to route messages, but
> >> client A
> >> > doesn’t support the way audit implemented headers, and client B
> doesn’t
> >> > support the way encryption or routing implemented headers, so now my
> >> > application has to create some really fragile (my autocorrect just
> tried
> >> to
> >> > make that “tragic”, which is probably appropriate too) code to strip
> >> > everything off, rather than just consuming the messages, picking out
> the
> >> 1
> >> > or 2 headers it’s interested in, and performing its function.
> >> >
> >> > Honestly, this discussion has been going on for a long time, and it’s
> >> > always “Oh, you came up with 2 use cases, and yeah, those use cases
> are
> >> > real things that someone would want to do. Here’s an alternate way to
> >> > implement them so let’s not do headers.” If we have a few use cases
> that
> >> we
> >> > actually came up with, you can be sure that over the next year
> there’s a
> >> > dozen others that we didn’t think of that someone would like to do. I
> >> > really think it’s time to stop rehashing this discussion and instead
> >> focus
> >> > on a workable standard that we can adopt.
> >> >
> >> > -Todd
> >> >
> >> >
> >> > On Thu, Dec 1, 2016 at 1:39 PM, Todd Palino 
> wrote:
> >> >
> >> >> C. per message encryption
> >> >>> One drawback of this approach is that this significantly reduce the
> >> >>> effectiveness of compression, which happens on a set of serialized
> >> >>> messages. An alternative is to enable SSL for wire encryption and
> rely
> >> on
> >> >>> the storage system (e.g. LUKS) for at rest encryption.
> >> >>
> >> >>
> >> >> Jun, this is not sufficient. While this does cover the case of
> removing
> >> a
> >> >> drive from the system, it will not satisfy most compliance
> requirements
> >> for
> >> >> encryption of data as whoever has access to the broker itself still
> has
> >> >> access to the unencrypted data. For end-to-end encryption you need to
> >> 

Re: [DISCUSS] KIP-93: Improve invalid timestamp handling in Kafka Streams

2016-12-01 Thread Guozhang Wang
You mean "it is a backward incompatible change" right?

On Wed, Nov 30, 2016 at 4:28 PM, Matthias J. Sax 
wrote:

> Thanks for this clarification (and your +1)
>
> I completely agree and just want to add my thoughts:
>
> 1. Yes, it is a backward compatible change but as I discusses with
> others, we want accept this for now. All previous releases did contain
> non-compatible changes for Kafka Streams, too. And as Kafka Streams API
> is not guaranteed to be stable at this point, we better do breaking
> changes now than later.
>
> 2. At some point, we need to be more conservative with breaking chances
> and only allow them for major releases.
>
> 3. As we expect that most people do use default timestamp extractor,
> they will not be effected anyway. Only if custom extractors are used,
> the application needs to be recompiled. Thus, the effort to make the
> change backward compatible seems not to be worth the effort.
>
>
> -Matthias
>
> On 11/29/16 9:57 PM, Ewen Cheslack-Postava wrote:
> > I think this looks reasonable, but just a more general note on
> > compatibility -- I think it's worth trying to clarify what types of
> > compatibility we're trying to achieve. Guozhang's 1 & 2 give an important
> > breakdown (compile vs runtime compatibility). For the type of change
> > described here, I think it makes sense to clarify the compatibility
> goals.
> > The (pure) compile time compatibility vs (pure) runtime compatibility
> > aren't the only options -- you have some additional intermediate choices
> as
> > well.
> >
> > The proposal describes a change which requires recompiling the plugin
> > (TimestampExtractor) *and* substituting a runtime library (kafka-streams
> > jar) to get correct updated behavior. This can get interesting if you
> > already have N streams apps sharing the same TimestampExtractor. You now
> > *must* update all of them to the new streams jar if any are to be updated
> > for the new TimestampExtractor API. For folks with a monolithic
> > repo/versioning setup, this could potentially be painful since they're
> > forced to update all apps at once. It's at least not too bad since it can
> > be isolated to a single commit (without deployment needing to be
> > coordinated, for example), but if the # of apps gets > 4 or 5, these
> types
> > of updates start to be a real pain.
> >
> > I think this API change is an acceptable (albeit annoying) API
> > incompatibility right now, but wanted to raise this in the discussion of
> > this KIP so we consider this moving forward. There definitely are
> > alternatives that add the new functionality but maintain compatibility
> > better. In particular, it's possible to define the new interface to
> require
> > both APIs:
> >
> > // new interface
> > public interface TimestampExtractor {
> > long extract(ConsumerRecord record);
> > long extract(ConsumerRecord record, long
> > previousTimestamp);
> > }
> >
> > which requires more effort for the implementor of the new API, but
> > maintains compatibility if you want to use a new jar including the
> > TimestampExtractor even with the old version of streams/the
> > TimestampExtractor interface (since it will correctly dispatch to the old
> > implementation). It requires more effort on the part of the framework
> since
> > it needs to catch runtime exceptions when the second version of extract()
> > is missing and fall back to the first version. But in some cases that
> might
> > be warranted for the sake of compatibility.
> >
> > I suspect this update won't cause too much pain right now just because
> the
> > number of streams app any user has won't be too large quite yet, but this
> > seems important to consider moving forward. I think we had some similar
> > concerns & discussion around the changes to the consumer APIs when trying
> > to generalize the collection types used in those APIs.
> >
> > -Ewen
> >
> >
> > On Mon, Nov 28, 2016 at 10:46 AM, Matthias J. Sax  >
> > wrote:
> >
> >> Done.
> >>
> >> If there is no further comments, I would like to start a voting thread
> >> in a separate email.
> >>
> >> -Matthias
> >>
> >> On 11/28/16 9:08 AM, Guozhang Wang wrote:
> >>> Yes it does not include these, again in my previous previous email I
> >> meant
> >>> when you say "This is a breaking, incompatible change" people may
> >> interpret
> >>> it differently. So better explain it more clearly.
> >>>
> >>>
> >>> Guozhang
> >>>
> >>> On Thu, Nov 24, 2016 at 10:31 PM, Matthias J. Sax <
> matth...@confluent.io
> >>>
> >>> wrote:
> >>>
>  That does make sense. But KIP-93 does not change anything like this,
> so
>  there is nothing to mention, IMHO.
> 
>  Or do you mean, the KIP should include that the change is backward
>  compatible with this regard?
> 
>  -Matthias
> 
> 
> 
>  On 11/24/16 5:31 PM, Guozhang Wang wrote:
> > What I meant is that, for some changes (e.g. say we change 

Re: Kafka SNAPSHOT artifact repositories.

2016-12-01 Thread Gwen Shapira
We are not. Do you mean nightly snapshots? We'll need to set this up
in Jenkins so it will run as part of the nightly build (with the hope
that build will pass most of the time).

If you can figure out what's the magic we need to publish snapshots
into that repo, I'll be happy to do whatever updates you tell me to do
on our Jenkins job.

Gwen

On Thu, Dec 1, 2016 at 4:50 PM, Sean McCauliff  wrote:
> Is there an artifact repository where to-be-released versions of Kafka
> are published?
>
> There appears to be one at http://repository.apache.org/snapshots/  ,
> but I'm not seeing anything published there after 0.8.2.
>
> Thanks!
> Sean



-- 
Gwen Shapira
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter | blog


[GitHub] kafka pull request #2185: MINOR: added logging to debug test

2016-12-01 Thread mjsax
Github user mjsax closed the pull request at:

https://github.com/apache/kafka/pull/2185


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Build failed in Jenkins: kafka-trunk-jdk7 #1723

2016-12-01 Thread Apache Jenkins Server
See 

Changes:

[me] KAFKA-3008: Parallel start and stop of connectors and tasks in Connect

[me] KAFKA-4161: KIP-89: Allow sink connectors to decouple flush and offset

--
[...truncated 14426 lines...]
org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldGetInstanceWithKeyWithMergedStreams PASSED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldNotThrowNPEWhenOnChangeNotCalled STARTED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldNotThrowNPEWhenOnChangeNotCalled PASSED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldThrowIfStoreNameIsNull STARTED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldThrowIfStoreNameIsNull PASSED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldReturnNotAvailableWhenClusterIsEmpty STARTED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldReturnNotAvailableWhenClusterIsEmpty PASSED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldReturnEmptyCollectionOnGetAllInstancesWithStoreWhenStoreDoesntExist 
STARTED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldReturnEmptyCollectionOnGetAllInstancesWithStoreWhenStoreDoesntExist PASSED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldReturnNullOnGetWithKeyWhenStoreDoesntExist STARTED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldReturnNullOnGetWithKeyWhenStoreDoesntExist PASSED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldThrowIfStreamPartitionerIsNull STARTED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldThrowIfStreamPartitionerIsNull PASSED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldGetInstanceWithKeyAndCustomPartitioner STARTED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldGetInstanceWithKeyAndCustomPartitioner PASSED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldThrowIfStoreNameIsNullOnGetAllInstancesWithStore STARTED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldThrowIfStoreNameIsNullOnGetAllInstancesWithStore PASSED

org.apache.kafka.streams.processor.internals.AbstractTaskTest > 
shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenAuthorizationException 
STARTED

org.apache.kafka.streams.processor.internals.AbstractTaskTest > 
shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenAuthorizationException 
PASSED

org.apache.kafka.streams.processor.internals.AbstractTaskTest > 
shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenKafkaException STARTED

org.apache.kafka.streams.processor.internals.AbstractTaskTest > 
shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenKafkaException PASSED

org.apache.kafka.streams.processor.internals.AbstractTaskTest > 
shouldThrowWakeupExceptionOnInitializeOffsetsWhenWakeupException STARTED

org.apache.kafka.streams.processor.internals.AbstractTaskTest > 
shouldThrowWakeupExceptionOnInitializeOffsetsWhenWakeupException PASSED

org.apache.kafka.streams.processor.internals.InternalTopicConfigTest > 
shouldHaveCompactionPropSetIfSupplied STARTED

org.apache.kafka.streams.processor.internals.InternalTopicConfigTest > 
shouldHaveCompactionPropSetIfSupplied PASSED

org.apache.kafka.streams.processor.internals.InternalTopicConfigTest > 
shouldThrowIfNameIsNull STARTED

org.apache.kafka.streams.processor.internals.InternalTopicConfigTest > 
shouldThrowIfNameIsNull PASSED

org.apache.kafka.streams.processor.internals.InternalTopicConfigTest > 
shouldConfigureRetentionMsWithAdditionalRetentionWhenCompactAndDelete STARTED

org.apache.kafka.streams.processor.internals.InternalTopicConfigTest > 
shouldConfigureRetentionMsWithAdditionalRetentionWhenCompactAndDelete PASSED

org.apache.kafka.streams.processor.internals.InternalTopicConfigTest > 
shouldBeCompactedIfCleanupPolicyCompactOrCompactAndDelete STARTED

org.apache.kafka.streams.processor.internals.InternalTopicConfigTest > 
shouldBeCompactedIfCleanupPolicyCompactOrCompactAndDelete PASSED

org.apache.kafka.streams.processor.internals.InternalTopicConfigTest > 
shouldNotBeCompactedWhenCleanupPolicyIsDelete STARTED

org.apache.kafka.streams.processor.internals.InternalTopicConfigTest > 
shouldNotBeCompactedWhenCleanupPolicyIsDelete PASSED

org.apache.kafka.streams.processor.internals.InternalTopicConfigTest > 
shouldHavePropertiesSuppliedByUser STARTED

org.apache.kafka.streams.processor.internals.InternalTopicConfigTest > 
shouldHavePropertiesSuppliedByUser PASSED

org.apache.kafka.streams.processor.internals.InternalTopicConfigTest > 
shouldUseCleanupPolicyFromConfigIfSupplied STARTED


Kafka SNAPSHOT artifact repositories.

2016-12-01 Thread Sean McCauliff
Is there an artifact repository where to-be-released versions of Kafka
are published?

There appears to be one at http://repository.apache.org/snapshots/  ,
but I'm not seeing anything published there after 0.8.2.

Thanks!
Sean


Re: [DISCUSS] 0.10.1.1 Plan

2016-12-01 Thread Sean McCauliff
Well I would like KAFKA-4250 (make ProducerRecord and ConsumerRecord
extensible) in the 0.10.1 branch if is not a big deal.  They are just
dumb structs.  But they are final so no extensibility is possible.

Sean

On Tue, Nov 29, 2016 at 5:32 PM, Ignacio Solis  wrote:
> I don't think anybody from LinkedIn asked for features on this release.  We
> just jumped in at the discussion of including a patch which was not a bug
> fix and whether it mattered.
>
> Having said that, the internal release we're working on came off the 0.10.1
> branch with a few internal hotfix patches and a few cherry picked fixes...
> Including the final keyword removal patch.
>
> Nacho
>
> On Tue, Nov 29, 2016, 5:15 PM Gwen Shapira  wrote:
>
>> btw. is LinkedIn no longer running from trunk? I'm not used to
>> LinkedIn employees requesting specific patches to be included in a
>> bugfix release.
>>
>> Any discussion on the content of any release is obviously welcome, I'm
>> just wondering if there was a change in policy.
>>
>> On Tue, Nov 29, 2016 at 2:17 PM, Ismael Juma  wrote:
>> > OK, so it seems like there are no changes that break compatibility in the
>> > 0.10.1 branch since we offer no compatibility guarantees for logging
>> > output. That's good. :)
>> >
>> > About the removal of final, it happened in trunk and it doesn't seem like
>> > anyone is still asking for it to be included in the 0.10.1 branch so it
>> is
>> > indeed not important for this bug fix release (I thought we had
>> established
>> > that quite a while ago).
>> >
>> > Ismael
>> >
>> > On Tue, Nov 29, 2016 at 9:35 PM, Ignacio Solis  wrote:
>> >
>> >> Sorry, that was a hasty reply.  There are also various logging things
>> that
>> >> change format. This could break parsers.
>> >>
>> >> None of them are important, my only argument is that the final keyword
>> >> removal is not important either.
>> >>
>> >> Nacho
>> >>
>> >>
>> >> On Tue, Nov 29, 2016 at 1:25 PM, Ignacio Solis  wrote:
>> >>
>> >> > https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=commit;h=
>> >> > 10cfc1628df024f7596d3af5c168fa90f59035ca
>> >> >
>> >> > On Tue, Nov 29, 2016 at 1:24 PM, Ismael Juma 
>> wrote:
>> >> >
>> >> >> Which changes break compatibility in the 0.10.1 branch? It would be
>> good
>> >> >> to
>> >> >> fix before the release goes out.
>> >> >>
>> >> >> Ismael
>> >> >>
>> >> >> On 29 Nov 2016 9:09 pm, "Ignacio Solis"  wrote:
>> >> >>
>> >> >> > Some of the changes in the 0.10.1 branch already are not bug fixes.
>> >> Some
>> >> >> > break compatibility.
>> >> >> >
>> >> >> > Having said that, at this level we should maintain a stable API and
>> >> >> leave
>> >> >> > any changes for real version bumps.  This should be only a bugfix
>> >> >> release.
>> >> >> >
>> >> >> > Nacho
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > On Tue, Nov 29, 2016 at 8:35 AM, Ismael Juma 
>> >> wrote:
>> >> >> >
>> >> >> > > I disagree, but let's discuss it another time and in a separate
>> >> >> thread.
>> >> >> > :)
>> >> >> > >
>> >> >> > > Ismael
>> >> >> > >
>> >> >> > > On Tue, Nov 29, 2016 at 4:30 PM, radai <
>> radai.rosenbl...@gmail.com>
>> >> >> > wrote:
>> >> >> > >
>> >> >> > > > designing kafka code for stable extensibility is a worthy and
>> >> noble
>> >> >> > > cause.
>> >> >> > > > however, seeing as there are no such derivatives out in the
>> wild
>> >> >> yet i
>> >> >> > > > think investing the effort right now is a bit premature from
>> >> kafka's
>> >> >> > pov.
>> >> >> > > > I think its enough simply not to purposefully prevent such
>> >> >> extensions.
>> >> >> > > >
>> >> >> > > > On Tue, Nov 29, 2016 at 4:05 AM, Ismael Juma <
>> ism...@juma.me.uk>
>> >> >> > wrote:
>> >> >> > > >
>> >> >> > > > > On Sat, Nov 26, 2016 at 11:08 PM, radai <
>> >> >> radai.rosenbl...@gmail.com>
>> >> >> > > > > wrote:
>> >> >> > > > >
>> >> >> > > > > > "compatibility guarantees that are expected by people who
>> >> >> subclass
>> >> >> > > > these
>> >> >> > > > > > classes"
>> >> >> > > > > >
>> >> >> > > > > > sorry if this is not the best thread for this discussion,
>> but
>> >> I
>> >> >> > just
>> >> >> > > > > wanted
>> >> >> > > > > > to pop in and say that since any subclassing of these will
>> >> >> > obviously
>> >> >> > > > not
>> >> >> > > > > be
>> >> >> > > > > > done within the kafka codebase - what guarantees are
>> needed?
>> >> >> > > > > >
>> >> >> > > > >
>> >> >> > > > > I elaborated a little in my other message in this thread. A
>> >> simple
>> >> >> > and
>> >> >> > > > > somewhat contrived example: `ConsumerRecord.toString` calls
>> the
>> >> >> > `topic`
>> >> >> > > > > method. Someone overrides the `topic` method and it all
>> works as
>> >> >> > > > expected.
>> >> >> > > > > In a subsequent release, we change `toString` to use the
>> field
>> >> >> > directly
>> >> >> > > > > (like it's done for other fields like 

Build failed in Jenkins: kafka-trunk-jdk8 #1072

2016-12-01 Thread Apache Jenkins Server
See 

Changes:

[me] KAFKA-3008: Parallel start and stop of connectors and tasks in Connect

[me] KAFKA-4161: KIP-89: Allow sink connectors to decouple flush and offset

--
[...truncated 12182 lines...]
org.apache.kafka.common.record.RecordTest > testChecksum[190] PASSED

org.apache.kafka.common.record.RecordTest > testEquality[190] STARTED

org.apache.kafka.common.record.RecordTest > testEquality[190] PASSED

org.apache.kafka.common.record.RecordTest > testFields[190] STARTED

org.apache.kafka.common.record.RecordTest > testFields[190] PASSED

org.apache.kafka.common.record.RecordTest > testChecksum[191] STARTED

org.apache.kafka.common.record.RecordTest > testChecksum[191] PASSED

org.apache.kafka.common.record.RecordTest > testEquality[191] STARTED

org.apache.kafka.common.record.RecordTest > testEquality[191] PASSED

org.apache.kafka.common.record.RecordTest > testFields[191] STARTED

org.apache.kafka.common.record.RecordTest > testFields[191] PASSED

org.apache.kafka.common.record.KafkaLZ4Test > testKafkaLZ4[0] STARTED

org.apache.kafka.common.record.KafkaLZ4Test > testKafkaLZ4[0] PASSED

org.apache.kafka.common.record.KafkaLZ4Test > testKafkaLZ4[1] STARTED

org.apache.kafka.common.record.KafkaLZ4Test > testKafkaLZ4[1] PASSED

org.apache.kafka.common.record.KafkaLZ4Test > testKafkaLZ4[2] STARTED

org.apache.kafka.common.record.KafkaLZ4Test > testKafkaLZ4[2] PASSED

org.apache.kafka.common.record.KafkaLZ4Test > testKafkaLZ4[3] STARTED

org.apache.kafka.common.record.KafkaLZ4Test > testKafkaLZ4[3] PASSED

org.apache.kafka.common.SerializeCompatibilityTopicPartitionTest > 
testSerializationRoundtrip STARTED

org.apache.kafka.common.SerializeCompatibilityTopicPartitionTest > 
testSerializationRoundtrip PASSED

org.apache.kafka.common.SerializeCompatibilityTopicPartitionTest > 
testTopiPartitionSerializationCompatibility STARTED

org.apache.kafka.common.SerializeCompatibilityTopicPartitionTest > 
testTopiPartitionSerializationCompatibility PASSED

org.apache.kafka.common.protocol.ApiKeysTest > testForIdWithInvalidIdLow STARTED

org.apache.kafka.common.protocol.ApiKeysTest > testForIdWithInvalidIdLow PASSED

org.apache.kafka.common.protocol.ApiKeysTest > testForIdWithInvalidIdHigh 
STARTED

org.apache.kafka.common.protocol.ApiKeysTest > testForIdWithInvalidIdHigh PASSED

org.apache.kafka.common.protocol.ErrorsTest > testExceptionName STARTED

org.apache.kafka.common.protocol.ErrorsTest > testExceptionName PASSED

org.apache.kafka.common.protocol.ErrorsTest > testForExceptionDefault STARTED

org.apache.kafka.common.protocol.ErrorsTest > testForExceptionDefault PASSED

org.apache.kafka.common.protocol.ErrorsTest > testUniqueExceptions STARTED

org.apache.kafka.common.protocol.ErrorsTest > testUniqueExceptions PASSED

org.apache.kafka.common.protocol.ErrorsTest > testForExceptionInheritance 
STARTED

org.apache.kafka.common.protocol.ErrorsTest > testForExceptionInheritance PASSED

org.apache.kafka.common.protocol.ErrorsTest > testNoneException STARTED

org.apache.kafka.common.protocol.ErrorsTest > testNoneException PASSED

org.apache.kafka.common.protocol.ErrorsTest > testUniqueErrorCodes STARTED

org.apache.kafka.common.protocol.ErrorsTest > testUniqueErrorCodes PASSED

org.apache.kafka.common.protocol.ErrorsTest > testExceptionsAreNotGeneric 
STARTED

org.apache.kafka.common.protocol.ErrorsTest > testExceptionsAreNotGeneric PASSED

org.apache.kafka.common.protocol.types.ProtocolSerializationTest > testNulls 
STARTED

org.apache.kafka.common.protocol.types.ProtocolSerializationTest > testNulls 
PASSED

org.apache.kafka.common.protocol.types.ProtocolSerializationTest > testToString 
STARTED

org.apache.kafka.common.protocol.types.ProtocolSerializationTest > testToString 
PASSED

org.apache.kafka.common.protocol.types.ProtocolSerializationTest > 
testReadStringSizeTooLarge STARTED

org.apache.kafka.common.protocol.types.ProtocolSerializationTest > 
testReadStringSizeTooLarge PASSED

org.apache.kafka.common.protocol.types.ProtocolSerializationTest > 
testNullableDefault STARTED

org.apache.kafka.common.protocol.types.ProtocolSerializationTest > 
testNullableDefault PASSED

org.apache.kafka.common.protocol.types.ProtocolSerializationTest > 
testReadNegativeStringSize STARTED

org.apache.kafka.common.protocol.types.ProtocolSerializationTest > 
testReadNegativeStringSize PASSED

org.apache.kafka.common.protocol.types.ProtocolSerializationTest > 
testReadArraySizeTooLarge STARTED

org.apache.kafka.common.protocol.types.ProtocolSerializationTest > 
testReadArraySizeTooLarge PASSED

org.apache.kafka.common.protocol.types.ProtocolSerializationTest > testDefault 
STARTED

org.apache.kafka.common.protocol.types.ProtocolSerializationTest > testDefault 
PASSED

org.apache.kafka.common.protocol.types.ProtocolSerializationTest > 
testReadNegativeBytesSize STARTED


Re: [DISCUSS] KIP-82 - Add Record Headers

2016-12-01 Thread Gwen Shapira
I'm pretty satisfied with the current workarounds (Avro container
format), so I'm not too excited about the extra work required to do
headers in Kafka. I absolutely don't mind it if you do it...
I think the Apache convention for "good idea, but not willing to put
any work toward it" is +0.5? anyway, that's what I was trying to
convey :)

On Thu, Dec 1, 2016 at 3:05 PM, Todd Palino  wrote:
> Well I guess my question for you, then, is what is holding you back from
> full support for headers? What’s the bit that you’re missing that has you
> under a full +1?
>
> -Todd
>
>
> On Thu, Dec 1, 2016 at 1:59 PM, Gwen Shapira  wrote:
>
>> I know why people who support headers support them, and I've seen what
>> the discussion is like.
>>
>> This is why I'm asking people who are against headers (especially
>> committers) what will make them change their mind - so we can get this
>> part over one way or another.
>>
>> If I sound frustrated it is not at Radai, Jun or you (Todd)... I am
>> just looking for something concrete we can do to move the discussion
>> along to the yummy design details (which is the argument I really am
>> looking forward to).
>>
>> On Thu, Dec 1, 2016 at 1:53 PM, Todd Palino  wrote:
>> > So, Gwen, to your question (even though I’m not a committer)...
>> >
>> > I have always been a strong supporter of introducing the concept of an
>> > envelope to messages, which headers accomplishes. The message key is
>> > already an example of a piece of envelope information. By providing a
>> means
>> > to do this within Kafka itself, and not relying on use-case specific
>> > implementations, you make it much easier for components to interoperate.
>> It
>> > simplifies development of all these things (message routing, auditing,
>> > encryption, etc.) because each one does not have to reinvent the wheel.
>> >
>> > It also makes it much easier from a client point of view if the headers
>> are
>> > defined as part of the protocol and/or message format in general because
>> > you can easily produce and consume messages without having to take into
>> > account specific cases. For example, I want to route messages, but
>> client A
>> > doesn’t support the way audit implemented headers, and client B doesn’t
>> > support the way encryption or routing implemented headers, so now my
>> > application has to create some really fragile (my autocorrect just tried
>> to
>> > make that “tragic”, which is probably appropriate too) code to strip
>> > everything off, rather than just consuming the messages, picking out the
>> 1
>> > or 2 headers it’s interested in, and performing its function.
>> >
>> > Honestly, this discussion has been going on for a long time, and it’s
>> > always “Oh, you came up with 2 use cases, and yeah, those use cases are
>> > real things that someone would want to do. Here’s an alternate way to
>> > implement them so let’s not do headers.” If we have a few use cases that
>> we
>> > actually came up with, you can be sure that over the next year there’s a
>> > dozen others that we didn’t think of that someone would like to do. I
>> > really think it’s time to stop rehashing this discussion and instead
>> focus
>> > on a workable standard that we can adopt.
>> >
>> > -Todd
>> >
>> >
>> > On Thu, Dec 1, 2016 at 1:39 PM, Todd Palino  wrote:
>> >
>> >> C. per message encryption
>> >>> One drawback of this approach is that this significantly reduce the
>> >>> effectiveness of compression, which happens on a set of serialized
>> >>> messages. An alternative is to enable SSL for wire encryption and rely
>> on
>> >>> the storage system (e.g. LUKS) for at rest encryption.
>> >>
>> >>
>> >> Jun, this is not sufficient. While this does cover the case of removing
>> a
>> >> drive from the system, it will not satisfy most compliance requirements
>> for
>> >> encryption of data as whoever has access to the broker itself still has
>> >> access to the unencrypted data. For end-to-end encryption you need to
>> >> encrypt at the producer, before it enters the system, and decrypt at the
>> >> consumer, after it exits the system.
>> >>
>> >> -Todd
>> >>
>> >>
>> >> On Thu, Dec 1, 2016 at 1:03 PM, radai 
>> wrote:
>> >>
>> >>> another big plus of headers in the protocol is that it would enable
>> rapid
>> >>> iteration on ideas outside of core kafka and would reduce the number of
>> >>> future wire format changes required.
>> >>>
>> >>> a lot of what is currently a KIP represents use cases that are not 100%
>> >>> relevant to all users, and some of them require rather invasive wire
>> >>> protocol changes. a thing a good recent example of this is kip-98.
>> >>> tx-utilizing traffic is expected to be a very small fraction of total
>> >>> traffic and yet the changes are invasive.
>> >>>
>> >>> every such wire format change translates into painful and slow
>> adoption of
>> >>> new versions.
>> >>>
>> >>> i think a lot of 

Re: [DISCUSS] KIP-82 - Add Record Headers

2016-12-01 Thread Sean McCauliff
Hi Jun,

It seems like the issue you are bringing up is "are headers really
needed" rather than "are headers useful"?

Internally LinkedIn has a large, diverse user base for Kafka.  We now
have enough diversity where the engineering costs of putting the
metadata for messages into the message value container is an expensive
option.  Having a way of representing metadata that is transparent to
the user is now the preferred option.  I have to believe that the
wider world of Kafka users have even more use cases.

Regarding A; content-type.  If topics are aggregated for some reason
(e.g. though mirror maker) the assumption of their content type based
on topic no longer holds. Or perhaps one might be sending or receiving
messages from a third-party where you don't have control over their
container format.

Regarding B; schema id.  The argument for A holds here as well.

Regarding C; per message encryption. In memory, in the broker the
messages will be in plaintext.  Why have that window of time where the
message sits in plaintext at all? LUKS is potentially involved. All
data then needs to be encrypted on write and decrypted on read  and
not just the encrypted payload.  LUKS requires that the password for
the encryption at mount time, this is another secret that needs to be
managed and protected.

Regarding F; auditing metadata. KIP-98 could actually use the general
purpose headers mechanism rather than another change to the underlying
message format.

Many different data storage and transport systems provide a way to
augment to the data being stored or sent in order to provide a common
system of augmentations that allows for suppliers of data to interact
with the users of that data without changing the semantics of the data
being stored or transported.  This is a fairly common design pattern
used in many other systems and data formats such as: HTTP, SMTP,  JMS,
AMQP, file system extended attributes, MP3, JPEG, and other media
files.  The problem being solved by having headers as a first class
entity is that the underlying data is not being altered and in general
the user need not even be aware of their existence.  It's not the
metadata in the headers can not be transmitted and used without
headers.  It's that it can't be done so without causing some
(potentially large) headache to the user of the actual data.  Most of
the alternatives to headers being proposed seem similar to the bad old
days of photography when you wanted to record the time and date of
your photo and so you could do this by getting a camera that would
expose a timestamp on the bottom of each of your photos.  Looks great.
http://photo.net/modern-film-cameras-forum/00dlA3

Sean

On Wed, Nov 30, 2016 at 6:50 PM, Jun Rao  wrote:
> Hi, Michael,
>
> In order to answer the first two questions, it would be helpful if we could
> identify 1 or 2 strong use cases for headers in the space for third-party
> vendors. For use cases within an organization, one could always use other
> approaches such as company-wise containers to get around w/o headers. I
> went through the use cases in the KIP and in Radai's wiki (
> https://cwiki.apache.org/confluence/display/KAFKA/A+Case+for+Kafka+Headers).
> The following are the ones that that I understand and could be in the
> third-party use case category.
>
> A. content-type
> It seems that in general, content-type should be set at the topic level.
> Not sure if mixing messages with different content types should be
> encouraged.
>
> B. schema id
> Since the value is mostly useless without schema id, it seems that storing
> the schema id together with serialized bytes in the value is better?
>
> C. per message encryption
> One drawback of this approach is that this significantly reduce the
> effectiveness of compression, which happens on a set of serialized
> messages. An alternative is to enable SSL for wire encryption and rely on
> the storage system (e.g. LUKS) for at rest encryption.
>
> D. cluster ID for mirroring across Kafka clusters
> This is actually interesting. Today, to avoid introducing cycles when doing
> mirroring across data centers, one would either have to set up two Kafka
> clusters (a local and an aggregate) per data center or rename topics.
> Neither is ideal. With headers, the producer could tag each message with
> the producing cluster ID in the header. MirrorMaker could then avoid
> mirroring messages to a cluster if they are tagged with the same cluster id.
>
> However, an alternative approach is to introduce sth like hierarchical
> topic and store messages from different clusters in different partitions
> under the same topic. This approach avoids filtering out unneeded data and
> makes offset preserving easier to support. It may make compaction trickier
> though since the same key may show up in different partitions.
>
> E. record-level lineage
> For example, a source connector could store in the message the metadata
> (e.g. UUID) of the source record. Similarly, if a 

[jira] [Commented] (KAFKA-4306) Connect workers won't shut down if brokers are not available

2016-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15713416#comment-15713416
 ] 

ASF GitHub Bot commented on KAFKA-4306:
---

GitHub user kkonstantine opened a pull request:

https://github.com/apache/kafka/pull/2201

KAFKA-4306: Shutdown distributed herder with a timeout.

Resolves

KAFKA-4306: Connect workers won't shut down if brokers are not available
KAFKA-4154: Kafka Connect fails to shutdown if it has not completed startup

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kkonstantine/kafka 
KAFKA-4306-Connect-workers-will-not-shut-down-if-brokers-are-not-available

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2201.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2201


commit b1decf8ab09ae8e89288d7548cc7561e2455aa0d
Author: Konstantine Karantasis 
Date:   2016-11-12T00:55:07Z

KAFKA-4306: Shutdown distributed herder with a timeout.

Resolves

KAFKA-4306: Connect workers won't shut down if brokers are not available
KAFKA-4154: Kafka Connect fails to shutdown if it has not completed startup




> Connect workers won't shut down if brokers are not available
> 
>
> Key: KAFKA-4306
> URL: https://issues.apache.org/jira/browse/KAFKA-4306
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.1.0
>Reporter: Gwen Shapira
>Assignee: Konstantine Karantasis
>
> If brokers are not available and we try to shut down connect workers, sink 
> connectors will be stuck in a loop retrying to commit offsets:
> 2016-10-17 09:39:14,907] INFO Marking the coordinator 192.168.1.9:9092 (id: 
> 2147483647 rack: null) dead for group connect-dump-kafka-config1 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:600)
> [2016-10-17 09:39:14,907] ERROR Commit of 
> WorkerSinkTask{id=dump-kafka-config1-0} offsets threw an unexpected 
> exception:  (org.apache.kafka.connect.runtime.WorkerSinkTask:194)
> org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset 
> commit failed with a retriable exception. You should retry committing offsets.
> Caused by: 
> org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException
> We should probably limit the number of retries before doing "unclean" 
> shutdown.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2201: KAFKA-4306: Shutdown distributed herder with a tim...

2016-12-01 Thread kkonstantine
GitHub user kkonstantine opened a pull request:

https://github.com/apache/kafka/pull/2201

KAFKA-4306: Shutdown distributed herder with a timeout.

Resolves

KAFKA-4306: Connect workers won't shut down if brokers are not available
KAFKA-4154: Kafka Connect fails to shutdown if it has not completed startup

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kkonstantine/kafka 
KAFKA-4306-Connect-workers-will-not-shut-down-if-brokers-are-not-available

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2201.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2201


commit b1decf8ab09ae8e89288d7548cc7561e2455aa0d
Author: Konstantine Karantasis 
Date:   2016-11-12T00:55:07Z

KAFKA-4306: Shutdown distributed herder with a timeout.

Resolves

KAFKA-4306: Connect workers won't shut down if brokers are not available
KAFKA-4154: Kafka Connect fails to shutdown if it has not completed startup




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4454) Authorizer should also include the Principal generated by the PrincipalBuilder.

2016-12-01 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15713402#comment-15713402
 ] 

Mayuresh Gharat commented on KAFKA-4454:


[~ijuma] Thanks for pointing me to the patch. The idea of using SimplePrincipal 
looks goods OR we can add an another constructor that takes in an additional 
parameter "channelPrincipal" of type Java.Principal. 
 
The main change will be required in SocketServer line : 
   val session = 
RequestChannel.Session(KafkaPrincipal.fromPrincipal(channel.principal), 
channel.socketAddress)
and change it to :
   val session = RequestChannel.Session(new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName, 
channel.principal())
where channel.principal() is going to return the Principal generated by the 
PrincipalBuilder.

Regarding "Do you have some examples of fields that you would want your 
principal to pass?"
---> Our Authorizer implementation delegates to Linkedin's security infra 
team's library that creates a Java.Principal with some additional information 
form the provided client cert. This information is required by their ACL 
service to ALLOW or DENY operations. 
This is likely to be a common use case for most of the companies, that have 
custom ACL service of their own.


> Authorizer should also include the Principal generated by the 
> PrincipalBuilder.
> ---
>
> Key: KAFKA-4454
> URL: https://issues.apache.org/jira/browse/KAFKA-4454
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.1
>Reporter: Mayuresh Gharat
>Assignee: Mayuresh Gharat
> Fix For: 0.10.2.0
>
>
> Currently kafka allows users to plugin a custom PrincipalBuilder and a custom 
> Authorizer.
> The Authorizer.authorize() object takes in a Session object that wraps 
> KafkaPrincipal and InetAddress.
> The KafkaPrincipal currently has a PrincipalType and Principal name, which is 
> the name of Principal generated by the PrincipalBuilder. 
> This Principal, generated by the pluggedin PrincipalBuilder might have other 
> fields that might be required by the pluggedin Authorizer but currently we 
> loose this information since we only extract the name of Principal while 
> creating KaflkaPrincipal in SocketServer.  
> It would be great if KafkaPrincipal has an additional field 
> "channelPrincipal" which is used to store the Principal generated by the 
> plugged in PrincipalBuilder.
> The pluggedin Authorizer can then use this "channelPrincipal" to do 
> authorization.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2200: KAFKA-4472: offsetRetentionMs miscalculated in Gro...

2016-12-01 Thread kichristensen
GitHub user kichristensen opened a pull request:

https://github.com/apache/kafka/pull/2200

KAFKA-4472: offsetRetentionMs miscalculated in GroupCoordinator

Fix possible integer overflow

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kichristensen/kafka 
MiscalculatedOffsetRetention

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2200.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2200


commit e0ea8736d104d68f3d2969edef485592b3aaf320
Author: Kim Christensen 
Date:   2016-12-01T23:11:30Z

Fix possible overflow




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4472) offsetRetentionMs miscalculated in GroupCoordinator

2016-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15713380#comment-15713380
 ] 

ASF GitHub Bot commented on KAFKA-4472:
---

GitHub user kichristensen opened a pull request:

https://github.com/apache/kafka/pull/2200

KAFKA-4472: offsetRetentionMs miscalculated in GroupCoordinator

Fix possible integer overflow

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kichristensen/kafka 
MiscalculatedOffsetRetention

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2200.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2200


commit e0ea8736d104d68f3d2969edef485592b3aaf320
Author: Kim Christensen 
Date:   2016-12-01T23:11:30Z

Fix possible overflow




> offsetRetentionMs miscalculated in GroupCoordinator
> ---
>
> Key: KAFKA-4472
> URL: https://issues.apache.org/jira/browse/KAFKA-4472
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0, 0.10.0.1
>Reporter: Jason Aliyetti
>Assignee: Kim Christensen
>
> The configuration "offsets.retention.minutes" is documented as being an 
> integer.  When large values are set (i.e. Integer.MAX_VALUE), an overflow 
> error occurs when converting from minutes to milliseconds.  For instance, 
> setting the config value as 2147483647 results in a offsetsRetentionMs of 
> -6.  This means that all committed offsets are past their expiration when 
> they are created and will be nullified on the next expiration check, which is 
> unexpected given the type of the configuration.
> The fix would be to change
> "offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L"
> to
> "offsetsRetentionMs = config.offsetsRetentionMinutes * 60L * 1000L"
> in GroupCoordinator.apply().



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-4472) offsetRetentionMs miscalculated in GroupCoordinator

2016-12-01 Thread Kim Christensen (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kim Christensen reassigned KAFKA-4472:
--

Assignee: Kim Christensen

> offsetRetentionMs miscalculated in GroupCoordinator
> ---
>
> Key: KAFKA-4472
> URL: https://issues.apache.org/jira/browse/KAFKA-4472
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0, 0.10.0.1
>Reporter: Jason Aliyetti
>Assignee: Kim Christensen
>
> The configuration "offsets.retention.minutes" is documented as being an 
> integer.  When large values are set (i.e. Integer.MAX_VALUE), an overflow 
> error occurs when converting from minutes to milliseconds.  For instance, 
> setting the config value as 2147483647 results in a offsetsRetentionMs of 
> -6.  This means that all committed offsets are past their expiration when 
> they are created and will be nullified on the next expiration check, which is 
> unexpected given the type of the configuration.
> The fix would be to change
> "offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L"
> to
> "offsetsRetentionMs = config.offsetsRetentionMinutes * 60L * 1000L"
> in GroupCoordinator.apply().



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-82 - Add Record Headers

2016-12-01 Thread Todd Palino
Well I guess my question for you, then, is what is holding you back from
full support for headers? What’s the bit that you’re missing that has you
under a full +1?

-Todd


On Thu, Dec 1, 2016 at 1:59 PM, Gwen Shapira  wrote:

> I know why people who support headers support them, and I've seen what
> the discussion is like.
>
> This is why I'm asking people who are against headers (especially
> committers) what will make them change their mind - so we can get this
> part over one way or another.
>
> If I sound frustrated it is not at Radai, Jun or you (Todd)... I am
> just looking for something concrete we can do to move the discussion
> along to the yummy design details (which is the argument I really am
> looking forward to).
>
> On Thu, Dec 1, 2016 at 1:53 PM, Todd Palino  wrote:
> > So, Gwen, to your question (even though I’m not a committer)...
> >
> > I have always been a strong supporter of introducing the concept of an
> > envelope to messages, which headers accomplishes. The message key is
> > already an example of a piece of envelope information. By providing a
> means
> > to do this within Kafka itself, and not relying on use-case specific
> > implementations, you make it much easier for components to interoperate.
> It
> > simplifies development of all these things (message routing, auditing,
> > encryption, etc.) because each one does not have to reinvent the wheel.
> >
> > It also makes it much easier from a client point of view if the headers
> are
> > defined as part of the protocol and/or message format in general because
> > you can easily produce and consume messages without having to take into
> > account specific cases. For example, I want to route messages, but
> client A
> > doesn’t support the way audit implemented headers, and client B doesn’t
> > support the way encryption or routing implemented headers, so now my
> > application has to create some really fragile (my autocorrect just tried
> to
> > make that “tragic”, which is probably appropriate too) code to strip
> > everything off, rather than just consuming the messages, picking out the
> 1
> > or 2 headers it’s interested in, and performing its function.
> >
> > Honestly, this discussion has been going on for a long time, and it’s
> > always “Oh, you came up with 2 use cases, and yeah, those use cases are
> > real things that someone would want to do. Here’s an alternate way to
> > implement them so let’s not do headers.” If we have a few use cases that
> we
> > actually came up with, you can be sure that over the next year there’s a
> > dozen others that we didn’t think of that someone would like to do. I
> > really think it’s time to stop rehashing this discussion and instead
> focus
> > on a workable standard that we can adopt.
> >
> > -Todd
> >
> >
> > On Thu, Dec 1, 2016 at 1:39 PM, Todd Palino  wrote:
> >
> >> C. per message encryption
> >>> One drawback of this approach is that this significantly reduce the
> >>> effectiveness of compression, which happens on a set of serialized
> >>> messages. An alternative is to enable SSL for wire encryption and rely
> on
> >>> the storage system (e.g. LUKS) for at rest encryption.
> >>
> >>
> >> Jun, this is not sufficient. While this does cover the case of removing
> a
> >> drive from the system, it will not satisfy most compliance requirements
> for
> >> encryption of data as whoever has access to the broker itself still has
> >> access to the unencrypted data. For end-to-end encryption you need to
> >> encrypt at the producer, before it enters the system, and decrypt at the
> >> consumer, after it exits the system.
> >>
> >> -Todd
> >>
> >>
> >> On Thu, Dec 1, 2016 at 1:03 PM, radai 
> wrote:
> >>
> >>> another big plus of headers in the protocol is that it would enable
> rapid
> >>> iteration on ideas outside of core kafka and would reduce the number of
> >>> future wire format changes required.
> >>>
> >>> a lot of what is currently a KIP represents use cases that are not 100%
> >>> relevant to all users, and some of them require rather invasive wire
> >>> protocol changes. a thing a good recent example of this is kip-98.
> >>> tx-utilizing traffic is expected to be a very small fraction of total
> >>> traffic and yet the changes are invasive.
> >>>
> >>> every such wire format change translates into painful and slow
> adoption of
> >>> new versions.
> >>>
> >>> i think a lot of functionality currently in KIPs could be "spun out"
> and
> >>> implemented as opt-in plugins transmitting data over headers. this
> would
> >>> keep the core wire format stable(r), core codebase smaller, and avoid
> the
> >>> "burden of proof" thats sometimes required to prove a certain feature
> is
> >>> useful enough for a wide-enough audience to warrant a wire format
> change
> >>> and code complexity additions.
> >>>
> >>> (to be clear - kip-98 goes beyond "mere" wire format changes and im not
> >>> saying it 

[jira] [Commented] (KAFKA-4161) Decouple flush and offset commits

2016-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15713356#comment-15713356
 ] 

ASF GitHub Bot commented on KAFKA-4161:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2139


> Decouple flush and offset commits
> -
>
> Key: KAFKA-4161
> URL: https://issues.apache.org/jira/browse/KAFKA-4161
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Shikhar Bhushan
>Assignee: Shikhar Bhushan
>  Labels: needs-kip
> Fix For: 0.10.2.0
>
>
> It is desirable to have, in addition to the time-based flush interval, volume 
> or size-based commits. E.g. a sink connector which is buffering in terms of 
> number of records may want to request a flush when the buffer is full, or 
> when sufficient amount of data has been buffered in a file.
> Having a method like say {{requestFlush()}} on the {{SinkTaskContext}} would 
> allow for connectors to have flexible policies around flushes. This would be 
> in addition to the time interval based flushes that are controlled with 
> {{offset.flush.interval.ms}}, for which the clock should be reset when any 
> kind of flush happens.
> We should probably also support requesting flushes via the 
> {{SourceTaskContext}} for consistency though a use-case doesn't come to mind 
> off the bat.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2139: KAFKA-4161: KIP-89: Allow sink connectors to decou...

2016-12-01 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2139


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (KAFKA-4161) Decouple flush and offset commits

2016-12-01 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4161?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava resolved KAFKA-4161.
--
   Resolution: Fixed
Fix Version/s: 0.10.2.0

Issue resolved by pull request 2139
[https://github.com/apache/kafka/pull/2139]

> Decouple flush and offset commits
> -
>
> Key: KAFKA-4161
> URL: https://issues.apache.org/jira/browse/KAFKA-4161
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Shikhar Bhushan
>Assignee: Shikhar Bhushan
>  Labels: needs-kip
> Fix For: 0.10.2.0
>
>
> It is desirable to have, in addition to the time-based flush interval, volume 
> or size-based commits. E.g. a sink connector which is buffering in terms of 
> number of records may want to request a flush when the buffer is full, or 
> when sufficient amount of data has been buffered in a file.
> Having a method like say {{requestFlush()}} on the {{SinkTaskContext}} would 
> allow for connectors to have flexible policies around flushes. This would be 
> in addition to the time interval based flushes that are controlled with 
> {{offset.flush.interval.ms}}, for which the clock should be reset when any 
> kind of flush happens.
> We should probably also support requesting flushes via the 
> {{SourceTaskContext}} for consistency though a use-case doesn't come to mind 
> off the bat.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3008) Connect should parallelize task start/stop

2016-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15713303#comment-15713303
 ] 

ASF GitHub Bot commented on KAFKA-3008:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1788


> Connect should parallelize task start/stop
> --
>
> Key: KAFKA-3008
> URL: https://issues.apache.org/jira/browse/KAFKA-3008
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.9.0.0
>Reporter: Ewen Cheslack-Postava
>Assignee: Konstantine Karantasis
>Priority: Minor
> Fix For: 0.10.2.0
>
>
> The Herder implementations currently iterate over all connectors/tasks and 
> sequentially start/stop them. We should parallelize this. This is less 
> critical for {{StandaloneHerder}}, but pretty important for 
> {{DistributedHerder}} since it will generally be managing more tasks and any 
> delay starting/stopping a single task will impact every other task on the 
> node (and can ultimately result in incorrect behavior in the case of a single 
> offset commit in one connector taking too long preventing all of the rest 
> from committing offsets).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1788: KAFKA-3008: Parallel start and stop of connectors ...

2016-12-01 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1788


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (KAFKA-3008) Connect should parallelize task start/stop

2016-12-01 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava resolved KAFKA-3008.
--
   Resolution: Fixed
Fix Version/s: 0.10.2.0

Issue resolved by pull request 1788
[https://github.com/apache/kafka/pull/1788]

> Connect should parallelize task start/stop
> --
>
> Key: KAFKA-3008
> URL: https://issues.apache.org/jira/browse/KAFKA-3008
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.9.0.0
>Reporter: Ewen Cheslack-Postava
>Assignee: Konstantine Karantasis
>Priority: Minor
> Fix For: 0.10.2.0
>
>
> The Herder implementations currently iterate over all connectors/tasks and 
> sequentially start/stop them. We should parallelize this. This is less 
> critical for {{StandaloneHerder}}, but pretty important for 
> {{DistributedHerder}} since it will generally be managing more tasks and any 
> delay starting/stopping a single task will impact every other task on the 
> node (and can ultimately result in incorrect behavior in the case of a single 
> offset commit in one connector taking too long preventing all of the rest 
> from committing offsets).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4476) Kafka Streams gets stuck if metadata is missing

2016-12-01 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-4476:
--

 Summary: Kafka Streams gets stuck if metadata is missing
 Key: KAFKA-4476
 URL: https://issues.apache.org/jira/browse/KAFKA-4476
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Matthias J. Sax
Assignee: Matthias J. Sax
Priority: Critical


When a Kafka Streams application gets started for the first time, it can happen 
that some topic metadata is missing when {{StreamPartitionAssigner#assign()}} 
is called on the group leader instance. This can result in an infinite loop 
within {{StreamPartitionAssigner#assign()}}. This issue was detected by 
{{ResetIntegrationTest}} that does have a transient timeout failure (c.f. 
https://issues.apache.org/jira/browse/KAFKA-4058 -- this issue was re-opened 
multiple times as the problem was expected to be in the test -- however, that 
is not the case).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Work started] (KAFKA-4476) Kafka Streams gets stuck if metadata is missing

2016-12-01 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-4476 started by Matthias J. Sax.
--
> Kafka Streams gets stuck if metadata is missing
> ---
>
> Key: KAFKA-4476
> URL: https://issues.apache.org/jira/browse/KAFKA-4476
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Critical
>
> When a Kafka Streams application gets started for the first time, it can 
> happen that some topic metadata is missing when 
> {{StreamPartitionAssigner#assign()}} is called on the group leader instance. 
> This can result in an infinite loop within 
> {{StreamPartitionAssigner#assign()}}. This issue was detected by 
> {{ResetIntegrationTest}} that does have a transient timeout failure (c.f. 
> https://issues.apache.org/jira/browse/KAFKA-4058 -- this issue was re-opened 
> multiple times as the problem was expected to be in the test -- however, that 
> is not the case).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-82 - Add Record Headers

2016-12-01 Thread Gwen Shapira
I know why people who support headers support them, and I've seen what
the discussion is like.

This is why I'm asking people who are against headers (especially
committers) what will make them change their mind - so we can get this
part over one way or another.

If I sound frustrated it is not at Radai, Jun or you (Todd)... I am
just looking for something concrete we can do to move the discussion
along to the yummy design details (which is the argument I really am
looking forward to).

On Thu, Dec 1, 2016 at 1:53 PM, Todd Palino  wrote:
> So, Gwen, to your question (even though I’m not a committer)...
>
> I have always been a strong supporter of introducing the concept of an
> envelope to messages, which headers accomplishes. The message key is
> already an example of a piece of envelope information. By providing a means
> to do this within Kafka itself, and not relying on use-case specific
> implementations, you make it much easier for components to interoperate. It
> simplifies development of all these things (message routing, auditing,
> encryption, etc.) because each one does not have to reinvent the wheel.
>
> It also makes it much easier from a client point of view if the headers are
> defined as part of the protocol and/or message format in general because
> you can easily produce and consume messages without having to take into
> account specific cases. For example, I want to route messages, but client A
> doesn’t support the way audit implemented headers, and client B doesn’t
> support the way encryption or routing implemented headers, so now my
> application has to create some really fragile (my autocorrect just tried to
> make that “tragic”, which is probably appropriate too) code to strip
> everything off, rather than just consuming the messages, picking out the 1
> or 2 headers it’s interested in, and performing its function.
>
> Honestly, this discussion has been going on for a long time, and it’s
> always “Oh, you came up with 2 use cases, and yeah, those use cases are
> real things that someone would want to do. Here’s an alternate way to
> implement them so let’s not do headers.” If we have a few use cases that we
> actually came up with, you can be sure that over the next year there’s a
> dozen others that we didn’t think of that someone would like to do. I
> really think it’s time to stop rehashing this discussion and instead focus
> on a workable standard that we can adopt.
>
> -Todd
>
>
> On Thu, Dec 1, 2016 at 1:39 PM, Todd Palino  wrote:
>
>> C. per message encryption
>>> One drawback of this approach is that this significantly reduce the
>>> effectiveness of compression, which happens on a set of serialized
>>> messages. An alternative is to enable SSL for wire encryption and rely on
>>> the storage system (e.g. LUKS) for at rest encryption.
>>
>>
>> Jun, this is not sufficient. While this does cover the case of removing a
>> drive from the system, it will not satisfy most compliance requirements for
>> encryption of data as whoever has access to the broker itself still has
>> access to the unencrypted data. For end-to-end encryption you need to
>> encrypt at the producer, before it enters the system, and decrypt at the
>> consumer, after it exits the system.
>>
>> -Todd
>>
>>
>> On Thu, Dec 1, 2016 at 1:03 PM, radai  wrote:
>>
>>> another big plus of headers in the protocol is that it would enable rapid
>>> iteration on ideas outside of core kafka and would reduce the number of
>>> future wire format changes required.
>>>
>>> a lot of what is currently a KIP represents use cases that are not 100%
>>> relevant to all users, and some of them require rather invasive wire
>>> protocol changes. a thing a good recent example of this is kip-98.
>>> tx-utilizing traffic is expected to be a very small fraction of total
>>> traffic and yet the changes are invasive.
>>>
>>> every such wire format change translates into painful and slow adoption of
>>> new versions.
>>>
>>> i think a lot of functionality currently in KIPs could be "spun out" and
>>> implemented as opt-in plugins transmitting data over headers. this would
>>> keep the core wire format stable(r), core codebase smaller, and avoid the
>>> "burden of proof" thats sometimes required to prove a certain feature is
>>> useful enough for a wide-enough audience to warrant a wire format change
>>> and code complexity additions.
>>>
>>> (to be clear - kip-98 goes beyond "mere" wire format changes and im not
>>> saying it could have been completely done with headers, but exactly-once
>>> delivery certainly could)
>>>
>>> On Thu, Dec 1, 2016 at 11:20 AM, Gwen Shapira  wrote:
>>>
>>> > On Thu, Dec 1, 2016 at 10:24 AM, radai 
>>> wrote:
>>> > > "For use cases within an organization, one could always use other
>>> > > approaches such as company-wise containers"
>>> > > this is what linkedin has traditionally done but there are now 

Re: [DISCUSS] KIP-82 - Add Record Headers

2016-12-01 Thread Todd Palino
So, Gwen, to your question (even though I’m not a committer)...

I have always been a strong supporter of introducing the concept of an
envelope to messages, which headers accomplishes. The message key is
already an example of a piece of envelope information. By providing a means
to do this within Kafka itself, and not relying on use-case specific
implementations, you make it much easier for components to interoperate. It
simplifies development of all these things (message routing, auditing,
encryption, etc.) because each one does not have to reinvent the wheel.

It also makes it much easier from a client point of view if the headers are
defined as part of the protocol and/or message format in general because
you can easily produce and consume messages without having to take into
account specific cases. For example, I want to route messages, but client A
doesn’t support the way audit implemented headers, and client B doesn’t
support the way encryption or routing implemented headers, so now my
application has to create some really fragile (my autocorrect just tried to
make that “tragic”, which is probably appropriate too) code to strip
everything off, rather than just consuming the messages, picking out the 1
or 2 headers it’s interested in, and performing its function.

Honestly, this discussion has been going on for a long time, and it’s
always “Oh, you came up with 2 use cases, and yeah, those use cases are
real things that someone would want to do. Here’s an alternate way to
implement them so let’s not do headers.” If we have a few use cases that we
actually came up with, you can be sure that over the next year there’s a
dozen others that we didn’t think of that someone would like to do. I
really think it’s time to stop rehashing this discussion and instead focus
on a workable standard that we can adopt.

-Todd


On Thu, Dec 1, 2016 at 1:39 PM, Todd Palino  wrote:

> C. per message encryption
>> One drawback of this approach is that this significantly reduce the
>> effectiveness of compression, which happens on a set of serialized
>> messages. An alternative is to enable SSL for wire encryption and rely on
>> the storage system (e.g. LUKS) for at rest encryption.
>
>
> Jun, this is not sufficient. While this does cover the case of removing a
> drive from the system, it will not satisfy most compliance requirements for
> encryption of data as whoever has access to the broker itself still has
> access to the unencrypted data. For end-to-end encryption you need to
> encrypt at the producer, before it enters the system, and decrypt at the
> consumer, after it exits the system.
>
> -Todd
>
>
> On Thu, Dec 1, 2016 at 1:03 PM, radai  wrote:
>
>> another big plus of headers in the protocol is that it would enable rapid
>> iteration on ideas outside of core kafka and would reduce the number of
>> future wire format changes required.
>>
>> a lot of what is currently a KIP represents use cases that are not 100%
>> relevant to all users, and some of them require rather invasive wire
>> protocol changes. a thing a good recent example of this is kip-98.
>> tx-utilizing traffic is expected to be a very small fraction of total
>> traffic and yet the changes are invasive.
>>
>> every such wire format change translates into painful and slow adoption of
>> new versions.
>>
>> i think a lot of functionality currently in KIPs could be "spun out" and
>> implemented as opt-in plugins transmitting data over headers. this would
>> keep the core wire format stable(r), core codebase smaller, and avoid the
>> "burden of proof" thats sometimes required to prove a certain feature is
>> useful enough for a wide-enough audience to warrant a wire format change
>> and code complexity additions.
>>
>> (to be clear - kip-98 goes beyond "mere" wire format changes and im not
>> saying it could have been completely done with headers, but exactly-once
>> delivery certainly could)
>>
>> On Thu, Dec 1, 2016 at 11:20 AM, Gwen Shapira  wrote:
>>
>> > On Thu, Dec 1, 2016 at 10:24 AM, radai 
>> wrote:
>> > > "For use cases within an organization, one could always use other
>> > > approaches such as company-wise containers"
>> > > this is what linkedin has traditionally done but there are now cases
>> > (read
>> > > - topics) where this is not acceptable. this makes headers useful even
>> > > within single orgs for cases where one-container-fits-all cannot
>> apply.
>> > >
>> > > as for the particular use cases listed, i dont want this to devolve
>> to a
>> > > discussion of particular use cases - i think its enough that some of
>> them
>> >
>> > I think a main point of contention is that: We identified few
>> > use-cases where headers are useful, do we want Kafka to be a system
>> > that supports those use-cases?
>> >
>> > For example, Jun said:
>> > "Not sure how widely useful record-level lineage is though since the
>> > overhead could
>> > be 

Re: [DISCUSS] KIP-82 - Add Record Headers

2016-12-01 Thread Todd Palino
>
> C. per message encryption
> One drawback of this approach is that this significantly reduce the
> effectiveness of compression, which happens on a set of serialized
> messages. An alternative is to enable SSL for wire encryption and rely on
> the storage system (e.g. LUKS) for at rest encryption.


Jun, this is not sufficient. While this does cover the case of removing a
drive from the system, it will not satisfy most compliance requirements for
encryption of data as whoever has access to the broker itself still has
access to the unencrypted data. For end-to-end encryption you need to
encrypt at the producer, before it enters the system, and decrypt at the
consumer, after it exits the system.

-Todd


On Thu, Dec 1, 2016 at 1:03 PM, radai  wrote:

> another big plus of headers in the protocol is that it would enable rapid
> iteration on ideas outside of core kafka and would reduce the number of
> future wire format changes required.
>
> a lot of what is currently a KIP represents use cases that are not 100%
> relevant to all users, and some of them require rather invasive wire
> protocol changes. a thing a good recent example of this is kip-98.
> tx-utilizing traffic is expected to be a very small fraction of total
> traffic and yet the changes are invasive.
>
> every such wire format change translates into painful and slow adoption of
> new versions.
>
> i think a lot of functionality currently in KIPs could be "spun out" and
> implemented as opt-in plugins transmitting data over headers. this would
> keep the core wire format stable(r), core codebase smaller, and avoid the
> "burden of proof" thats sometimes required to prove a certain feature is
> useful enough for a wide-enough audience to warrant a wire format change
> and code complexity additions.
>
> (to be clear - kip-98 goes beyond "mere" wire format changes and im not
> saying it could have been completely done with headers, but exactly-once
> delivery certainly could)
>
> On Thu, Dec 1, 2016 at 11:20 AM, Gwen Shapira  wrote:
>
> > On Thu, Dec 1, 2016 at 10:24 AM, radai 
> wrote:
> > > "For use cases within an organization, one could always use other
> > > approaches such as company-wise containers"
> > > this is what linkedin has traditionally done but there are now cases
> > (read
> > > - topics) where this is not acceptable. this makes headers useful even
> > > within single orgs for cases where one-container-fits-all cannot apply.
> > >
> > > as for the particular use cases listed, i dont want this to devolve to
> a
> > > discussion of particular use cases - i think its enough that some of
> them
> >
> > I think a main point of contention is that: We identified few
> > use-cases where headers are useful, do we want Kafka to be a system
> > that supports those use-cases?
> >
> > For example, Jun said:
> > "Not sure how widely useful record-level lineage is though since the
> > overhead could
> > be significant."
> >
> > We know NiFi supports record level lineage. I don't think it was
> > developed for lols, I think it is safe to assume that the NSA needed
> > that functionality. We also know that certain financial institutes
> > need to track tampering with records at a record level and there are
> > federal regulations that absolutely require this.  They also need to
> > prove that routing apps that "touches" the messages and either reads
> > or updates headers couldn't have possibly modified the payload itself.
> > They use record level encryption to do that - apps can read and
> > (sometimes) modify headers but can't touch the payload.
> >
> > We can totally say "those are corner cases and not worth adding
> > headers to Kafka for", they should use a different pubsub message for
> > that (Nifi or one of the other 1000 that cater specifically to the
> > financial industry).
> >
> > But this gets us into a catch 22:
> > If we discuss a specific use-case, someone can always say it isn't
> > interesting enough for Kafka. If we discuss more general trends,
> > others can say "well, we are not sure any of them really needs headers
> > specifically. This is just hand waving and not interesting.".
> >
> > I think discussing use-cases in specifics is super important to decide
> > implementation details for headers (my use-cases lean toward numerical
> > keys with namespaces and object values, others differ), but I think we
> > need to answer the general "Are we going to have headers" question
> > first.
> >
> > I'd love to hear from the other committers in the discussion:
> > What would it take to convince you that headers in Kafka are a good
> > idea in general, so we can move ahead and try to agree on the details?
> >
> > I feel like we keep moving the goal posts and this is truly exhausting.
> >
> > For the record, I mildly support adding headers to Kafka (+0.5?).
> > The community can continue to find workarounds to the issue and there
> > are some benefits to 

[jira] [Commented] (KAFKA-4473) KafkaStreams does *not* guarantee at-least-once delivery

2016-12-01 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15713038#comment-15713038
 ] 

Guozhang Wang commented on KAFKA-4473:
--

[~Thomas Schulz] Thanks for reporting this issue. I have a couple of questions 
to follow up:

1. NotLeaderForPartitionException is a retriable error and hence it should be 
keep retrying instead of throwing it out. If you turn on the debug level 
logging do you see entries indicating it has been retrying?

2. Averagely how long it took in your testing environment to bounce a broker?

> KafkaStreams does *not* guarantee at-least-once delivery
> 
>
> Key: KAFKA-4473
> URL: https://issues.apache.org/jira/browse/KAFKA-4473
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Thomas Schulz
>Priority: Critical
>
> see: https://groups.google.com/forum/#!topic/confluent-platform/DT5bk1oCVk8
> There is probably a bug in the RecordCollector as described in my detailed 
> Cluster test published in the aforementioned post.
> The class RecordCollector has the following behavior:
> - if there is no exception, add the message offset to a map
> - otherwise, do not add the message offset and instead log the above statement
> Is it possible that this offset map contains the latest offset to commit? If 
> so, a message that fails might be overriden be a successful (later) message 
> and the consumer commits every message up to the latest offset?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Build failed in Jenkins: kafka-trunk-jdk7 #1722

2016-12-01 Thread Apache Jenkins Server
See 

Changes:

[ismael] KAFKA-4443; Minor comment clean-up

--
[...truncated 14222 lines...]
org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAddNullStateStoreSupplier PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAllowNullNameWhenAddingSource STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAllowNullNameWhenAddingSource PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAllowNullTopicWhenAddingSink STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAllowNullTopicWhenAddingSink PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddStateStoreWithSource STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddStateStoreWithSource PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldThroughOnUnassignedStateStoreAccess STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldThroughOnUnassignedStateStoreAccess PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAllowNullProcessorNameWhenConnectingProcessorAndStateStores STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAllowNullProcessorNameWhenConnectingProcessorAndStateStores PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSourceWithSameName STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSourceWithSameName PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddMoreThanOnePatternSourceNode STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddMoreThanOnePatternSourceNode PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddProcessorWithSameName STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddProcessorWithSameName PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldSetCorrectSourceNodesWithRegexUpdatedTopics STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldSetCorrectSourceNodesWithRegexUpdatedTopics PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSourceWithSameTopic STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSourceWithSameTopic PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testTopicGroupsByStateStore STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testTopicGroupsByStateStore PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddStateStoreWithDuplicates STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddStateStoreWithDuplicates PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > testPatternSourceTopic 
STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > testPatternSourceTopic 
PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldAssociateStateStoreNameWhenStateStoreSupplierIsExternal STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldAssociateStateStoreNameWhenStateStoreSupplierIsExternal PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSinkWithWrongParent STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSinkWithWrongParent PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testPatternMatchesAlreadyProvidedTopicSource STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testPatternMatchesAlreadyProvidedTopicSource PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSinkConnectedWithMultipleParent STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSinkConnectedWithMultipleParent PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testSubscribeTopicNameAndPattern STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testSubscribeTopicNameAndPattern PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAddNullInternalTopic STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAddNullInternalTopic PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddProcessorWithWrongParent STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddProcessorWithWrongParent PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAllowNullNameWhenAddingProcessor STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAllowNullNameWhenAddingProcessor PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > testAddStateStore 
STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > testAddStateStore 
PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSinkConnectedWithParent STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSinkConnectedWithParent 

[jira] [Commented] (KAFKA-4454) Authorizer should also include the Principal generated by the PrincipalBuilder.

2016-12-01 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15712970#comment-15712970
 ] 

Ismael Juma commented on KAFKA-4454:


[~mgharat], thanks. That could work. Do you have some examples of fields that 
you would want your principal to pass? Generally, I think the current way we 
use `KafkaPrincipal` is a bit confusing. I created a PR[1] a while back that 
used `SimplePrincipal` for authentication and `KafkaPrincipal` for 
authorization. With the clear separation, adding a field for authorization 
purposes (like proposed here) would not affect the authentication cases.

[1] https://github.com/apache/kafka/pull/551/files

> Authorizer should also include the Principal generated by the 
> PrincipalBuilder.
> ---
>
> Key: KAFKA-4454
> URL: https://issues.apache.org/jira/browse/KAFKA-4454
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.1
>Reporter: Mayuresh Gharat
>Assignee: Mayuresh Gharat
> Fix For: 0.10.2.0
>
>
> Currently kafka allows users to plugin a custom PrincipalBuilder and a custom 
> Authorizer.
> The Authorizer.authorize() object takes in a Session object that wraps 
> KafkaPrincipal and InetAddress.
> The KafkaPrincipal currently has a PrincipalType and Principal name, which is 
> the name of Principal generated by the PrincipalBuilder. 
> This Principal, generated by the pluggedin PrincipalBuilder might have other 
> fields that might be required by the pluggedin Authorizer but currently we 
> loose this information since we only extract the name of Principal while 
> creating KaflkaPrincipal in SocketServer.  
> It would be great if KafkaPrincipal has an additional field 
> "channelPrincipal" which is used to store the Principal generated by the 
> plugged in PrincipalBuilder.
> The pluggedin Authorizer can then use this "channelPrincipal" to do 
> authorization.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4474) Poor kafka-streams throughput

2016-12-01 Thread Eno Thereska (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15712966#comment-15712966
 ] 

Eno Thereska commented on KAFKA-4474:
-

[~jjchorrobe] Thanks for reporting. A couple of questions:
- how long is the application running for, and are there enough records? I ask 
because if the run is very short (a few seconds), we won't be in steady state, 
and the cost of things like partitioning rebalancing might dominate. Ideally 
the application (single instance or multiple instance) should run for 60 
seconds or so.
- if you observe the CPU to be completely pegged at 100%, even small effects 
like having 2 processes rather than 2 threads might lead to some amount of 
thrashing, severely degrading the performance. Do you observe high CPU 
utilization ~100%? 

I'll try to answer your questions once I understand the above a bit better. One 
problem with running everything locally is that lots of different things end up 
mixed up. For example, partitions are used as a unit of storage parallelism, 
but in this case all 4 partitions are in the same local disk. In an ideal 
experiment, the 4 partitions would be in 4 different disks. Also, the fact that 
zookeeper and the kafka broker are on the same machine (that's my understanding 
of your setup, correct me if I'm wrong) further perturbs the measurements since 
they consume quite a bit of CPU as well, potentially adding to thrashing. Is 
there a way you can put the Kafka cluster on a separate machine? If not, we'll 
work with what you have, but it is not an ideal setup.

> Poor kafka-streams throughput
> -
>
> Key: KAFKA-4474
> URL: https://issues.apache.org/jira/browse/KAFKA-4474
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Juan Chorro
>Assignee: Eno Thereska
>
> Hi! 
> I'm writing because I have a worry about kafka-streams throughput.
> I have only a kafka-streams application instance that consumes from 'input' 
> topic, prints on the screen and produces in 'output' topic. All topics have 4 
> partitions. As can be observed the topology is very simple.
> I produce 120K messages/second to 'input' topic, when I measure the 'output' 
> topic I detect that I'm receiving ~4K messages/second. I had next 
> configuration (Remaining parameters by default):
> application.id: myApp
> bootstrap.servers: localhost:9092
> zookeeper.connect: localhost:2181
> num.stream.threads: 1
> I was doing proofs and tests without success, but when I created a new 
> 'input' topic with 1 partition (Maintain 'output' topic with 4 partitions) I 
> got in 'output' topic 120K messages/seconds.
> I have been doing some performance tests and proof with next cases (All 
> topics have 4 partitions in all cases):
> Case A - 1 Instance:
> - With num.stream.threads set to 1 I had ~3785 messages/second
> - With num.stream.threads set to 2 I had ~3938 messages/second
> - With num.stream.threads set to 4 I had ~120K messages/second
> Case B - 2 Instances:
> - With num.stream.threads set to 1 I had ~3930 messages/second for each 
> instance (And throughput ~8K messages/second)
> - With num.stream.threads set to 2 I had ~3945 messages/second for each 
> instance (And more or less same throughput that with num.stream.threads set 
> to 1)
> Case C - 4 Instances
> - With num.stream.threads set to 1 I had 3946 messages/seconds for each 
> instance (And throughput ~17K messages/second):
> As can be observed when num.stream.threads is set to #partitions I have best 
> results. Then I have next questions:
> - Why whether I have a topic with #partitions > 1 and with 
> num.streams.threads is set to 1 I have ~4K messages/second always?
> - In case C. 4 instances with num.stream.threads set to 1 should be better 
> that 1 instance with num.stream.threads set to 4. Is corrects this 
> supposition?
> This is the kafka-streams application that I use: 
> https://gist.github.com/Chorro/5522ec4acd1a005eb8c9663da86f5a18



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4461) When using ProcessorTopologyTestDriver, the combination of map and .groupByKey does not produce any result

2016-12-01 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-4461:
-
Labels: newbie  (was: )

> When using ProcessorTopologyTestDriver, the combination of map and 
> .groupByKey does not produce any result
> --
>
> Key: KAFKA-4461
> URL: https://issues.apache.org/jira/browse/KAFKA-4461
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Hamidreza Afzali
>Priority: Blocker
>  Labels: newbie
>
> *Problem*
> When using ProcessorTopologyTestDriver in the latest Kafka 0.10.1, the 
> combination of map and .groupByKey does not produce any result. However, it 
> works fine when using KStreamTestDriver.
> The topology looks like this:
> {code}
> builder.stream(Serdes.String, Serdes.Integer, inputTopic)
>  .map((k, v) => new KeyValue(fn(k), v))
>  .groupByKey(Serdes.String, Serdes.Integer)
>  .count(stateStore)
> {code}
> *Full examples*
> Examples for ProcessorTopologyTestDriver and KStreamTestDriver:
> https://gist.github.com/hrafzali/c2f50e7b957030dab13693eec1e49c13
> *Additional info*
> kafka-users mailing list:
> http://mail-archives.apache.org/mod_mbox/kafka-users/201611.mbox/%3CCAHwHRrVq1APVkNhP3HVqxujxRJEP9FwHV2NRcvPPsHX7Wujzng%40mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging

2016-12-01 Thread Henry Cai
I see, having both topic data and changelog in kafka can make the kafka
transaction atomic.  But in the case of streaming, the RocksDB is an
external data source during applying changelog to RocksDB.  In terms of
rollback and resume/re-apply kafka transaction, how do we make sure the
RocksDB transaction can also rolled back and re-apply during kafka
transaction boundary?


On Thu, Dec 1, 2016 at 11:05 AM, Guozhang Wang  wrote:

> @Henry Cai,
>
> I am working on a separate KIP on Streams to leverage this KIP to have
> exactly-once processing semantics (note the exactly-once processing is a
> bit different from exactly-once delivery semantics), which should cover
> your question.
>
> The short answer is that writing the changelog messages need to be part of
> the transaction, and when a fatal error happens within a transaction, since
> the store updates cannot be rolled back like the messages in the worst case
> we need to restore from the changelog from scratch, or from a checkpoint
> with a starting offset in changelog, and restoring consumer will fetch
> committed messages only as well.
>
>
> Guozhang
>
> On Thu, Dec 1, 2016 at 9:34 AM, Apurva Mehta  wrote:
>
> > Hi Daniel,
> >
> > That is a very good point. You are correct in saying that one does not
> need
> > a transaction coordinator to get idempotent semantics.
> >
> > There are, however, three reasons why we chose this route:
> >
> >1. The request to find a transaction coordinator is exactly the same
> as
> >the request consumers use to find the group coordinator. So if clients
> >already implement the new consumer, you should already have the code
> you
> >need to find the transaction coordinator. I would even so far as to
> say
> >that the majority coordinator discovery code can be effectively shared
> >between producers and consumers. Jason should correct me on this,
> > however,
> >since he is most familiar with that bit.
> >2. With this route, the broker side changes are simpler. In
> particular,
> >we have to implement the InitPIDRequest only in the coordinator.
> >3. By always having a transaction coordinator, we can enable
> >applications to use transactions even if they don't specify the AppId.
> > The
> >only thing you lose is transaction recovery across sessions.
> >
> > Needless to say, we did debate this point extensively. What swung our
> > decision ultimately was the following observation: if the user does not
> > provide a transaction.app.id, the client can generate a UUID and use
> that
> > as the appId for the rest of the session. This means that there are no
> > branches in the client and server code, and is overall simpler to
> maintain.
> > All the producer APIs are also available to the user and it would be more
> > intuitive.
> >
> > It also means that clients cannot choose idempotence without
> transactions,
> > and hence it does place a greater burden on implementors of kafka
> clients.
> > But the cost should be minimal given point 1 above, and was deemed worth
> > it.
> >
> > Thanks once more for your thoughtful comments. It would be great for
> other
> > client implementors to chime in on this.
> >
> > Regards,
> > Apurva
> >
> >
> > On Thu, Dec 1, 2016 at 3:16 AM, Daniel Schierbeck
> >  > > wrote:
> >
> > > Hi there,
> > >
> > > I'm the author of ruby-kafka, and as such am slightly biased towards
> > > simplicity of implementation :-)
> > >
> > > I like the proposal, and would love to use idempotent producer
> semantics
> > in
> > > our projects at Zendesk, but I'm a bit worried about the complexity
> that
> > > would go into the clients; specifically: it sounds to me that in order
> to
> > > get idempotent producer semantics, I'd have to implement the
> transaction
> > > coordinator discovery. I may be wrong, but it would seem that it's not
> > > strictly necessary if you're not using transactions – we could just use
> > the
> > > topic partition's leader as the coordinator, avoiding the extra
> > discovery.
> > > In my experience, most bugs are related to figuring out which broker is
> > the
> > > leader of which partition/group/whatever, so minimizing the number of
> > > moving parts would be beneficial to me. I'd also like to point out
> that I
> > > would be reluctant to implement the transaction API in the near future,
> > but
> > > would love to implement the idempotency API soon. The former seems only
> > > relevant to real stream processing frameworks, which is probably not
> the
> > > best use case for ruby-kafka.
> > >
> > > Cheers,
> > > Daniel Schierbeck
> > >
> > > On Thu, Dec 1, 2016 at 9:54 AM Jason Gustafson 
> > wrote:
> > >
> > > > Hey Neha,
> > > >
> > > > Thanks for the thoughtful questions. I'll try to address the first
> > > question
> > > > since Apurva addressed the second. Since most readers are probably
> > > getting
> > > > up to speed with this large 

Re: [DISCUSS] KIP-82 - Add Record Headers

2016-12-01 Thread Gwen Shapira
On Thu, Dec 1, 2016 at 10:24 AM, radai  wrote:
> "For use cases within an organization, one could always use other
> approaches such as company-wise containers"
> this is what linkedin has traditionally done but there are now cases (read
> - topics) where this is not acceptable. this makes headers useful even
> within single orgs for cases where one-container-fits-all cannot apply.
>
> as for the particular use cases listed, i dont want this to devolve to a
> discussion of particular use cases - i think its enough that some of them

I think a main point of contention is that: We identified few
use-cases where headers are useful, do we want Kafka to be a system
that supports those use-cases?

For example, Jun said:
"Not sure how widely useful record-level lineage is though since the
overhead could
be significant."

We know NiFi supports record level lineage. I don't think it was
developed for lols, I think it is safe to assume that the NSA needed
that functionality. We also know that certain financial institutes
need to track tampering with records at a record level and there are
federal regulations that absolutely require this.  They also need to
prove that routing apps that "touches" the messages and either reads
or updates headers couldn't have possibly modified the payload itself.
They use record level encryption to do that - apps can read and
(sometimes) modify headers but can't touch the payload.

We can totally say "those are corner cases and not worth adding
headers to Kafka for", they should use a different pubsub message for
that (Nifi or one of the other 1000 that cater specifically to the
financial industry).

But this gets us into a catch 22:
If we discuss a specific use-case, someone can always say it isn't
interesting enough for Kafka. If we discuss more general trends,
others can say "well, we are not sure any of them really needs headers
specifically. This is just hand waving and not interesting.".

I think discussing use-cases in specifics is super important to decide
implementation details for headers (my use-cases lean toward numerical
keys with namespaces and object values, others differ), but I think we
need to answer the general "Are we going to have headers" question
first.

I'd love to hear from the other committers in the discussion:
What would it take to convince you that headers in Kafka are a good
idea in general, so we can move ahead and try to agree on the details?

I feel like we keep moving the goal posts and this is truly exhausting.

For the record, I mildly support adding headers to Kafka (+0.5?).
The community can continue to find workarounds to the issue and there
are some benefits to keeping the message format and clients simpler.
But I see the usefulness of headers to many use-cases and if we can
find a good and generally useful way to add it to Kafka, it will make
Kafka easier to use for many - worthy goal in my eyes.

> are interesting/feasible, but:
> A+B. i think there are use cases for polyglot topics. especially if kafka
> is being used to "trunk" something else.
> D. multiple topics would make it harder to write portable consumer code.
> partition remapping would mess with locality of consumption guarantees.
> E+F. a use case I see for lineage/metadata is billing/chargeback. for that
> use case it is not enough to simply record the point of origin, but every
> replication stop (think mirror maker) must also add a record to form a
> "transit log".
>
> as for stream processing on top of kafka - i know samza has a metadata map
> which they carry around in addition to user values. headers are the perfect
> fit for these things.
>
>
>
> On Wed, Nov 30, 2016 at 6:50 PM, Jun Rao  wrote:
>
>> Hi, Michael,
>>
>> In order to answer the first two questions, it would be helpful if we could
>> identify 1 or 2 strong use cases for headers in the space for third-party
>> vendors. For use cases within an organization, one could always use other
>> approaches such as company-wise containers to get around w/o headers. I
>> went through the use cases in the KIP and in Radai's wiki (
>> https://cwiki.apache.org/confluence/display/KAFKA/A+Case+for+Kafka+Headers
>> ).
>> The following are the ones that that I understand and could be in the
>> third-party use case category.
>>
>> A. content-type
>> It seems that in general, content-type should be set at the topic level.
>> Not sure if mixing messages with different content types should be
>> encouraged.
>>
>> B. schema id
>> Since the value is mostly useless without schema id, it seems that storing
>> the schema id together with serialized bytes in the value is better?
>>
>> C. per message encryption
>> One drawback of this approach is that this significantly reduce the
>> effectiveness of compression, which happens on a set of serialized
>> messages. An alternative is to enable SSL for wire encryption and rely on
>> the storage system (e.g. LUKS) for at rest encryption.
>>
>> D. 

[jira] [Comment Edited] (KAFKA-4474) Poor kafka-streams throughput

2016-12-01 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15712821#comment-15712821
 ] 

Guozhang Wang edited comment on KAFKA-4474 at 12/1/16 7:19 PM:
---

[~enothereska] Could you take a look and see if there is any obvious issues 
causing the low throughput? It does not seem to match our nightly benchmark 
results.


was (Author: guozhang):
[~enothereska] Could you take a look and see if there is any obvious issues 
causing the low throughput? It does not match our nightly benchmark results.

> Poor kafka-streams throughput
> -
>
> Key: KAFKA-4474
> URL: https://issues.apache.org/jira/browse/KAFKA-4474
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Juan Chorro
>Assignee: Eno Thereska
>
> Hi! 
> I'm writing because I have a worry about kafka-streams throughput.
> I have only a kafka-streams application instance that consumes from 'input' 
> topic, prints on the screen and produces in 'output' topic. All topics have 4 
> partitions. As can be observed the topology is very simple.
> I produce 120K messages/second to 'input' topic, when I measure the 'output' 
> topic I detect that I'm receiving ~4K messages/second. I had next 
> configuration (Remaining parameters by default):
> application.id: myApp
> bootstrap.servers: localhost:9092
> zookeeper.connect: localhost:2181
> num.stream.threads: 1
> I was doing proofs and tests without success, but when I created a new 
> 'input' topic with 1 partition (Maintain 'output' topic with 4 partitions) I 
> got in 'output' topic 120K messages/seconds.
> I have been doing some performance tests and proof with next cases (All 
> topics have 4 partitions in all cases):
> Case A - 1 Instance:
> - With num.stream.threads set to 1 I had ~3785 messages/second
> - With num.stream.threads set to 2 I had ~3938 messages/second
> - With num.stream.threads set to 4 I had ~120K messages/second
> Case B - 2 Instances:
> - With num.stream.threads set to 1 I had ~3930 messages/second for each 
> instance (And throughput ~8K messages/second)
> - With num.stream.threads set to 2 I had ~3945 messages/second for each 
> instance (And more or less same throughput that with num.stream.threads set 
> to 1)
> Case C - 4 Instances
> - With num.stream.threads set to 1 I had 3946 messages/seconds for each 
> instance (And throughput ~17K messages/second):
> As can be observed when num.stream.threads is set to #partitions I have best 
> results. Then I have next questions:
> - Why whether I have a topic with #partitions > 1 and with 
> num.streams.threads is set to 1 I have ~4K messages/second always?
> - In case C. 4 instances with num.stream.threads set to 1 should be better 
> that 1 instance with num.stream.threads set to 4. Is corrects this 
> supposition?
> This is the kafka-streams application that I use: 
> https://gist.github.com/Chorro/5522ec4acd1a005eb8c9663da86f5a18



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4474) Poor kafka-streams throughput

2016-12-01 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-4474:
-
Assignee: Eno Thereska

> Poor kafka-streams throughput
> -
>
> Key: KAFKA-4474
> URL: https://issues.apache.org/jira/browse/KAFKA-4474
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Juan Chorro
>Assignee: Eno Thereska
>
> Hi! 
> I'm writing because I have a worry about kafka-streams throughput.
> I have only a kafka-streams application instance that consumes from 'input' 
> topic, prints on the screen and produces in 'output' topic. All topics have 4 
> partitions. As can be observed the topology is very simple.
> I produce 120K messages/second to 'input' topic, when I measure the 'output' 
> topic I detect that I'm receiving ~4K messages/second. I had next 
> configuration (Remaining parameters by default):
> application.id: myApp
> bootstrap.servers: localhost:9092
> zookeeper.connect: localhost:2181
> num.stream.threads: 1
> I was doing proofs and tests without success, but when I created a new 
> 'input' topic with 1 partition (Maintain 'output' topic with 4 partitions) I 
> got in 'output' topic 120K messages/seconds.
> I have been doing some performance tests and proof with next cases (All 
> topics have 4 partitions in all cases):
> Case A - 1 Instance:
> - With num.stream.threads set to 1 I had ~3785 messages/second
> - With num.stream.threads set to 2 I had ~3938 messages/second
> - With num.stream.threads set to 4 I had ~120K messages/second
> Case B - 2 Instances:
> - With num.stream.threads set to 1 I had ~3930 messages/second for each 
> instance (And throughput ~8K messages/second)
> - With num.stream.threads set to 2 I had ~3945 messages/second for each 
> instance (And more or less same throughput that with num.stream.threads set 
> to 1)
> Case C - 4 Instances
> - With num.stream.threads set to 1 I had 3946 messages/seconds for each 
> instance (And throughput ~17K messages/second):
> As can be observed when num.stream.threads is set to #partitions I have best 
> results. Then I have next questions:
> - Why whether I have a topic with #partitions > 1 and with 
> num.streams.threads is set to 1 I have ~4K messages/second always?
> - In case C. 4 instances with num.stream.threads set to 1 should be better 
> that 1 instance with num.stream.threads set to 4. Is corrects this 
> supposition?
> This is the kafka-streams application that I use: 
> https://gist.github.com/Chorro/5522ec4acd1a005eb8c9663da86f5a18



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4474) Poor kafka-streams throughput

2016-12-01 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15712821#comment-15712821
 ] 

Guozhang Wang commented on KAFKA-4474:
--

[~enothereska] Could you take a look and see if there is any obvious issues 
causing the low throughput? It does not match our nightly benchmark results.

> Poor kafka-streams throughput
> -
>
> Key: KAFKA-4474
> URL: https://issues.apache.org/jira/browse/KAFKA-4474
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Juan Chorro
>
> Hi! 
> I'm writing because I have a worry about kafka-streams throughput.
> I have only a kafka-streams application instance that consumes from 'input' 
> topic, prints on the screen and produces in 'output' topic. All topics have 4 
> partitions. As can be observed the topology is very simple.
> I produce 120K messages/second to 'input' topic, when I measure the 'output' 
> topic I detect that I'm receiving ~4K messages/second. I had next 
> configuration (Remaining parameters by default):
> application.id: myApp
> bootstrap.servers: localhost:9092
> zookeeper.connect: localhost:2181
> num.stream.threads: 1
> I was doing proofs and tests without success, but when I created a new 
> 'input' topic with 1 partition (Maintain 'output' topic with 4 partitions) I 
> got in 'output' topic 120K messages/seconds.
> I have been doing some performance tests and proof with next cases (All 
> topics have 4 partitions in all cases):
> Case A - 1 Instance:
> - With num.stream.threads set to 1 I had ~3785 messages/second
> - With num.stream.threads set to 2 I had ~3938 messages/second
> - With num.stream.threads set to 4 I had ~120K messages/second
> Case B - 2 Instances:
> - With num.stream.threads set to 1 I had ~3930 messages/second for each 
> instance (And throughput ~8K messages/second)
> - With num.stream.threads set to 2 I had ~3945 messages/second for each 
> instance (And more or less same throughput that with num.stream.threads set 
> to 1)
> Case C - 4 Instances
> - With num.stream.threads set to 1 I had 3946 messages/seconds for each 
> instance (And throughput ~17K messages/second):
> As can be observed when num.stream.threads is set to #partitions I have best 
> results. Then I have next questions:
> - Why whether I have a topic with #partitions > 1 and with 
> num.streams.threads is set to 1 I have ~4K messages/second always?
> - In case C. 4 instances with num.stream.threads set to 1 should be better 
> that 1 instance with num.stream.threads set to 4. Is corrects this 
> supposition?
> This is the kafka-streams application that I use: 
> https://gist.github.com/Chorro/5522ec4acd1a005eb8c9663da86f5a18



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-4475) Poor kafka-streams throughput

2016-12-01 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-4475.
--
Resolution: Duplicate

Duplicate of KAFKA-4474.

> Poor kafka-streams throughput
> -
>
> Key: KAFKA-4475
> URL: https://issues.apache.org/jira/browse/KAFKA-4475
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Juan Chorro
>
> Hi! 
> I'm writing because I have a worry about kafka-streams throughput.
> I have only a kafka-streams application instance that consumes from 'input' 
> topic, prints on the screen and produces in 'output' topic. All topics have 4 
> partitions. As can be observed the topology is very simple.
> I produce 120K messages/second to 'input' topic, when I measure the 'output' 
> topic I detect that I'm receiving ~4K messages/second. I had next 
> configuration (Remaining parameters by default):
> application.id: myApp
> bootstrap.servers: localhost:9092
> zookeeper.connect: localhost:2181
> num.stream.threads: 1
> I was doing proofs and tests without success, but when I created a new 
> 'input' topic with 1 partition (Maintain 'output' topic with 4 partitions) I 
> got in 'output' topic 120K messages/seconds.
> I have been doing some performance tests and proof with next cases (All 
> topics have 4 partitions in all cases):
> Case A - 1 Instance:
> - With num.stream.threads set to 1 I had ~3785 messages/second
> - With num.stream.threads set to 2 I had ~3938 messages/second
> - With num.stream.threads set to 4 I had ~120K messages/second
> Case B - 2 Instances:
> - With num.stream.threads set to 1 I had ~3930 messages/second for each 
> instance (And throughput ~8K messages/second)
> - With num.stream.threads set to 2 I had ~3945 messages/second for each 
> instance (And more or less same throughput that with num.stream.threads set 
> to 1)
> Case C - 4 Instances
> - With num.stream.threads set to 1 I had 3946 messages/seconds for each 
> instance (And throughput ~17K messages/second):
> As can be observed when num.stream.threads is set to #partitions I have best 
> results. Then I have next questions:
> - Why whether I have a topic with #partitions > 1 and with 
> num.streams.threads is set to 1 I have ~4K messages/second always?
> - In case C. 4 instances with num.stream.threads set to 1 should be better 
> that 1 instance with num.stream.threads set to 4. Is corrects this 
> supposition?
> This is the kafka-streams application that I use: 
> https://gist.github.com/Chorro/5522ec4acd1a005eb8c9663da86f5a18



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging

2016-12-01 Thread Guozhang Wang
@Henry Cai,

I am working on a separate KIP on Streams to leverage this KIP to have
exactly-once processing semantics (note the exactly-once processing is a
bit different from exactly-once delivery semantics), which should cover
your question.

The short answer is that writing the changelog messages need to be part of
the transaction, and when a fatal error happens within a transaction, since
the store updates cannot be rolled back like the messages in the worst case
we need to restore from the changelog from scratch, or from a checkpoint
with a starting offset in changelog, and restoring consumer will fetch
committed messages only as well.


Guozhang

On Thu, Dec 1, 2016 at 9:34 AM, Apurva Mehta  wrote:

> Hi Daniel,
>
> That is a very good point. You are correct in saying that one does not need
> a transaction coordinator to get idempotent semantics.
>
> There are, however, three reasons why we chose this route:
>
>1. The request to find a transaction coordinator is exactly the same as
>the request consumers use to find the group coordinator. So if clients
>already implement the new consumer, you should already have the code you
>need to find the transaction coordinator. I would even so far as to say
>that the majority coordinator discovery code can be effectively shared
>between producers and consumers. Jason should correct me on this,
> however,
>since he is most familiar with that bit.
>2. With this route, the broker side changes are simpler. In particular,
>we have to implement the InitPIDRequest only in the coordinator.
>3. By always having a transaction coordinator, we can enable
>applications to use transactions even if they don't specify the AppId.
> The
>only thing you lose is transaction recovery across sessions.
>
> Needless to say, we did debate this point extensively. What swung our
> decision ultimately was the following observation: if the user does not
> provide a transaction.app.id, the client can generate a UUID and use that
> as the appId for the rest of the session. This means that there are no
> branches in the client and server code, and is overall simpler to maintain.
> All the producer APIs are also available to the user and it would be more
> intuitive.
>
> It also means that clients cannot choose idempotence without transactions,
> and hence it does place a greater burden on implementors of kafka clients.
> But the cost should be minimal given point 1 above, and was deemed worth
> it.
>
> Thanks once more for your thoughtful comments. It would be great for other
> client implementors to chime in on this.
>
> Regards,
> Apurva
>
>
> On Thu, Dec 1, 2016 at 3:16 AM, Daniel Schierbeck
>  > wrote:
>
> > Hi there,
> >
> > I'm the author of ruby-kafka, and as such am slightly biased towards
> > simplicity of implementation :-)
> >
> > I like the proposal, and would love to use idempotent producer semantics
> in
> > our projects at Zendesk, but I'm a bit worried about the complexity that
> > would go into the clients; specifically: it sounds to me that in order to
> > get idempotent producer semantics, I'd have to implement the transaction
> > coordinator discovery. I may be wrong, but it would seem that it's not
> > strictly necessary if you're not using transactions – we could just use
> the
> > topic partition's leader as the coordinator, avoiding the extra
> discovery.
> > In my experience, most bugs are related to figuring out which broker is
> the
> > leader of which partition/group/whatever, so minimizing the number of
> > moving parts would be beneficial to me. I'd also like to point out that I
> > would be reluctant to implement the transaction API in the near future,
> but
> > would love to implement the idempotency API soon. The former seems only
> > relevant to real stream processing frameworks, which is probably not the
> > best use case for ruby-kafka.
> >
> > Cheers,
> > Daniel Schierbeck
> >
> > On Thu, Dec 1, 2016 at 9:54 AM Jason Gustafson 
> wrote:
> >
> > > Hey Neha,
> > >
> > > Thanks for the thoughtful questions. I'll try to address the first
> > question
> > > since Apurva addressed the second. Since most readers are probably
> > getting
> > > up to speed with this large proposal, let me first take a step back and
> > > explain why we need the AppID at all. As Confluent tradition demands, I
> > > present you a big wall of text:
> > >
> > > Clearly "exactly once" delivery requires resilience to client failures.
> > > When a client crashes or turns into a zombie, another client must
> > > eventually be started to resume the work. There are two problems: 1) we
> > > need to ensure that the old process is actually dead or at least that
> it
> > > cannot write any more data, and 2) we need to be able to pick up
> wherever
> > > the last process left off. To do either of these, we need some kind of
> > > identifier to tie the two instances together.
> > 

Re: Kafka JDBC client Phoenix support

2016-12-01 Thread Gwen Shapira
I don't think we tried it yet. If Pheonix supports JDBC, feel free to
test and report what you find. Maybe it "just works"?

On Thu, Dec 1, 2016 at 8:19 AM, Songo Songo  wrote:
> Hello community,
>
> Do you know if there is some activity to support Apache Phoenix in Kafka 
> Connect JDBC ?
>
> Thanks
> Alex
>
>



-- 
Gwen Shapira
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter | blog


Re: [DISCUSS] KIP-82 - Add Record Headers

2016-12-01 Thread Ignacio Solis
This is a very good question. Do we try to enable third party
"vendors" by building a platform?
I'm assuming the other (main) alternative is to let everybody
implement their own.

Disadvantages of a platform:
- can potentially decrease performance if done incorrectly
- requires the "platform" to be maintained
- API changes may break "vendors"
- it takes work to build it

Advantages of a platform:
- allows easy sharing of plugins
- implementing features doesn't require coordination with main
platform code base
- multiple alternatives for the same feature/plugin can be considered / swapped
- enables a market place
- ensures a base level of compatibility (to the stable platform API)


I say this loosely mostly because the discussion of whether it's a
platform (enabling third party plugins) vs whether it's done inside
the "V" part of the message is slightly different.

A company could create a platform inside the V part and internally
create a system to add, get, parse, remove headers. This could be used
inside the company and enforced in different ways. Inside the company
you could have different "third party vendors" creating and sharing
plugins. The security team or auditing team or infra team, etc. could
create plugins without the need to coordinate with each other.  For
example, without the need to force the users to use a specific Avro
schema or to update their avro schema every time a header changes.

The benefit of the platform is the community and marketplace it
enables. At the company level it's great (LinkedIn would like to move
in that direction); but at the world level it's better.  There is no
reason for every company to reinvent the wheel. Not everybody suffers
from NIH syndrome.

So, let's be clear when we're talking about things.  Is your argument that:

- headers are not good/useful?
- headers are not useful at the kafka protocol level?
- headers are not useful inside the apache kafka source (protocol or V)?
- headers are not useful in an open source project and should only be
done internally to a company?
- there is no benefit in sharing the work that could be done on headers?

Nacho


On Wed, Nov 30, 2016 at 6:50 PM, Jun Rao  wrote:
>
> In order to answer the first two questions, it would be helpful if we could
> identify 1 or 2 strong use cases for headers in the space for third-party
> vendors. For use cases within an organization, one could always use other
> approaches such as company-wise containers to get around w/o headers. I
> went through the use cases in the KIP and in Radai's wiki (
> https://cwiki.apache.org/confluence/display/KAFKA/A+Case+for+Kafka+Headers).
> The following are the ones that that I understand and could be in the
> third-party use case category.
>
> A. content-type
> It seems that in general, content-type should be set at the topic level.
> Not sure if mixing messages with different content types should be
> encouraged.
>
> B. schema id
> Since the value is mostly useless without schema id, it seems that storing
> the schema id together with serialized bytes in the value is better?
>
> C. per message encryption
> One drawback of this approach is that this significantly reduce the
> effectiveness of compression, which happens on a set of serialized
> messages. An alternative is to enable SSL for wire encryption and rely on
> the storage system (e.g. LUKS) for at rest encryption.
>
> D. cluster ID for mirroring across Kafka clusters
> This is actually interesting. Today, to avoid introducing cycles when doing
> mirroring across data centers, one would either have to set up two Kafka
> clusters (a local and an aggregate) per data center or rename topics.
> Neither is ideal. With headers, the producer could tag each message with
> the producing cluster ID in the header. MirrorMaker could then avoid
> mirroring messages to a cluster if they are tagged with the same cluster id.
>
> However, an alternative approach is to introduce sth like hierarchical
> topic and store messages from different clusters in different partitions
> under the same topic. This approach avoids filtering out unneeded data and
> makes offset preserving easier to support. It may make compaction trickier
> though since the same key may show up in different partitions.
>
> E. record-level lineage
> For example, a source connector could store in the message the metadata
> (e.g. UUID) of the source record. Similarly, if a stream job transforms
> messages from topic A to topic B, the library could include the source
> message offset in each of the transformed message in the header. Not sure
> how widely useful record-level lineage is though since the overhead could
> be significant.
>
> F. auditing metadata
> We could put things like clientId/host/user in the header in each message
> for auditing. These metadata are really at the producer level though. So, a
> more efficient way is to only include a "producerId" per message and send
> the producerId -> metadata mapping 

Re: [DISCUSS] KIP-82 - Add Record Headers

2016-12-01 Thread radai
"For use cases within an organization, one could always use other
approaches such as company-wise containers"
this is what linkedin has traditionally done but there are now cases (read
- topics) where this is not acceptable. this makes headers useful even
within single orgs for cases where one-container-fits-all cannot apply.

as for the particular use cases listed, i dont want this to devolve to a
discussion of particular use cases - i think its enough that some of them
are interesting/feasible, but:
A+B. i think there are use cases for polyglot topics. especially if kafka
is being used to "trunk" something else.
D. multiple topics would make it harder to write portable consumer code.
partition remapping would mess with locality of consumption guarantees.
E+F. a use case I see for lineage/metadata is billing/chargeback. for that
use case it is not enough to simply record the point of origin, but every
replication stop (think mirror maker) must also add a record to form a
"transit log".

as for stream processing on top of kafka - i know samza has a metadata map
which they carry around in addition to user values. headers are the perfect
fit for these things.



On Wed, Nov 30, 2016 at 6:50 PM, Jun Rao  wrote:

> Hi, Michael,
>
> In order to answer the first two questions, it would be helpful if we could
> identify 1 or 2 strong use cases for headers in the space for third-party
> vendors. For use cases within an organization, one could always use other
> approaches such as company-wise containers to get around w/o headers. I
> went through the use cases in the KIP and in Radai's wiki (
> https://cwiki.apache.org/confluence/display/KAFKA/A+Case+for+Kafka+Headers
> ).
> The following are the ones that that I understand and could be in the
> third-party use case category.
>
> A. content-type
> It seems that in general, content-type should be set at the topic level.
> Not sure if mixing messages with different content types should be
> encouraged.
>
> B. schema id
> Since the value is mostly useless without schema id, it seems that storing
> the schema id together with serialized bytes in the value is better?
>
> C. per message encryption
> One drawback of this approach is that this significantly reduce the
> effectiveness of compression, which happens on a set of serialized
> messages. An alternative is to enable SSL for wire encryption and rely on
> the storage system (e.g. LUKS) for at rest encryption.
>
> D. cluster ID for mirroring across Kafka clusters
> This is actually interesting. Today, to avoid introducing cycles when doing
> mirroring across data centers, one would either have to set up two Kafka
> clusters (a local and an aggregate) per data center or rename topics.
> Neither is ideal. With headers, the producer could tag each message with
> the producing cluster ID in the header. MirrorMaker could then avoid
> mirroring messages to a cluster if they are tagged with the same cluster
> id.
>
> However, an alternative approach is to introduce sth like hierarchical
> topic and store messages from different clusters in different partitions
> under the same topic. This approach avoids filtering out unneeded data and
> makes offset preserving easier to support. It may make compaction trickier
> though since the same key may show up in different partitions.
>
> E. record-level lineage
> For example, a source connector could store in the message the metadata
> (e.g. UUID) of the source record. Similarly, if a stream job transforms
> messages from topic A to topic B, the library could include the source
> message offset in each of the transformed message in the header. Not sure
> how widely useful record-level lineage is though since the overhead could
> be significant.
>
> F. auditing metadata
> We could put things like clientId/host/user in the header in each message
> for auditing. These metadata are really at the producer level though. So, a
> more efficient way is to only include a "producerId" per message and send
> the producerId -> metadata mapping independently. KIP-98 is actually
> proposing including such a producerId natively in the message.
>
> So, overall, I not sure that I am fully convinced of the strong third-party
> use cases of headers yet. Perhaps we could discuss a bit more to make one
> or two really convincing use cases.
>
> Another orthogonal  question is whether header should be exposed in stream
> processing systems such Kafka stream, Samza, and Spark streaming.
> Currently, those systems just deal with key/value pairs. Should we expose a
> third thing header there too or somehow map header to key or value?
>
> Thanks,
>
> Jun
>
>
> On Tue, Nov 29, 2016 at 3:35 AM, Michael Pearce 
> wrote:
>
> > I assume, that after a period of a week, that there is no concerns now
> > with points 1, and 2 and now we have agreement that headers are useful
> and
> > needed in Kafka. As such if put to a KIP vote, this wouldn’t be a reason
> to
> > reject.
> 

[GitHub] kafka pull request #2199: HOTFIX: Temporary suspension of 2 tests

2016-12-01 Thread enothereska
GitHub user enothereska opened a pull request:

https://github.com/apache/kafka/pull/2199

HOTFIX: Temporary suspension of 2 tests



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/enothereska/kafka 
hotfix-streams-test-reset-ignore

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2199.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2199


commit 5b48bb5333028285d1ae43117118ef41e2429341
Author: Eno Thereska 
Date:   2016-12-01T18:22:42Z

Temporary suspension of 2 tests




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Kafka JDBC client Phoenix support

2016-12-01 Thread Songo Songo
Hello community, 

Do you know if there is some activity to support Apache Phoenix in Kafka 
Connect JDBC ? 

Thanks
Alex




Build failed in Jenkins: kafka-trunk-jdk8 #1071

2016-12-01 Thread Apache Jenkins Server
See 

Changes:

[ismael] KAFKA-4443; Minor comment clean-up

--
[...truncated 7978 lines...]

kafka.integration.SaslPlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithCollision STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithCollision PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokerListWithNoTopics STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokerListWithNoTopics PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testAutoCreateTopic STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testAutoCreateTopic PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testGetAllTopicMetadata 
STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testGetAllTopicMetadata 
PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testBasicTopicMetadata 
STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testBasicTopicMetadata PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown PASSED

kafka.integration.PrimitiveApiTest > testMultiProduce STARTED

kafka.integration.PrimitiveApiTest > testMultiProduce PASSED

kafka.integration.PrimitiveApiTest > testDefaultEncoderProducerAndFetch STARTED

kafka.integration.PrimitiveApiTest > testDefaultEncoderProducerAndFetch PASSED

kafka.integration.PrimitiveApiTest > testFetchRequestCanProperlySerialize 
STARTED

kafka.integration.PrimitiveApiTest > testFetchRequestCanProperlySerialize PASSED

kafka.integration.PrimitiveApiTest > testPipelinedProduceRequests STARTED

kafka.integration.PrimitiveApiTest > testPipelinedProduceRequests PASSED

kafka.integration.PrimitiveApiTest > testProduceAndMultiFetch STARTED

kafka.integration.PrimitiveApiTest > testProduceAndMultiFetch PASSED

kafka.integration.PrimitiveApiTest > 
testDefaultEncoderProducerAndFetchWithCompression STARTED

kafka.integration.PrimitiveApiTest > 
testDefaultEncoderProducerAndFetchWithCompression PASSED

kafka.integration.PrimitiveApiTest > testConsumerEmptyTopic STARTED

kafka.integration.PrimitiveApiTest > testConsumerEmptyTopic PASSED

kafka.integration.PrimitiveApiTest > testEmptyFetchRequest STARTED

kafka.integration.PrimitiveApiTest > testEmptyFetchRequest PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack PASSED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopicWithCollision 
STARTED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopicWithCollision 
PASSED

kafka.integration.PlaintextTopicMetadataTest > testAliveBrokerListWithNoTopics 
STARTED

kafka.integration.PlaintextTopicMetadataTest > testAliveBrokerListWithNoTopics 
PASSED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopic STARTED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopic PASSED

kafka.integration.PlaintextTopicMetadataTest > testGetAllTopicMetadata STARTED

kafka.integration.PlaintextTopicMetadataTest > testGetAllTopicMetadata PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup PASSED

kafka.integration.PlaintextTopicMetadataTest > testBasicTopicMetadata STARTED

kafka.integration.PlaintextTopicMetadataTest > testBasicTopicMetadata PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown PASSED

kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooLow 
STARTED

kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooLow PASSED

kafka.integration.AutoOffsetResetTest > 

Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging

2016-12-01 Thread Apurva Mehta
Hi Daniel,

That is a very good point. You are correct in saying that one does not need
a transaction coordinator to get idempotent semantics.

There are, however, three reasons why we chose this route:

   1. The request to find a transaction coordinator is exactly the same as
   the request consumers use to find the group coordinator. So if clients
   already implement the new consumer, you should already have the code you
   need to find the transaction coordinator. I would even so far as to say
   that the majority coordinator discovery code can be effectively shared
   between producers and consumers. Jason should correct me on this, however,
   since he is most familiar with that bit.
   2. With this route, the broker side changes are simpler. In particular,
   we have to implement the InitPIDRequest only in the coordinator.
   3. By always having a transaction coordinator, we can enable
   applications to use transactions even if they don't specify the AppId. The
   only thing you lose is transaction recovery across sessions.

Needless to say, we did debate this point extensively. What swung our
decision ultimately was the following observation: if the user does not
provide a transaction.app.id, the client can generate a UUID and use that
as the appId for the rest of the session. This means that there are no
branches in the client and server code, and is overall simpler to maintain.
All the producer APIs are also available to the user and it would be more
intuitive.

It also means that clients cannot choose idempotence without transactions,
and hence it does place a greater burden on implementors of kafka clients.
But the cost should be minimal given point 1 above, and was deemed worth it.

Thanks once more for your thoughtful comments. It would be great for other
client implementors to chime in on this.

Regards,
Apurva


On Thu, Dec 1, 2016 at 3:16 AM, Daniel Schierbeck  wrote:

> Hi there,
>
> I'm the author of ruby-kafka, and as such am slightly biased towards
> simplicity of implementation :-)
>
> I like the proposal, and would love to use idempotent producer semantics in
> our projects at Zendesk, but I'm a bit worried about the complexity that
> would go into the clients; specifically: it sounds to me that in order to
> get idempotent producer semantics, I'd have to implement the transaction
> coordinator discovery. I may be wrong, but it would seem that it's not
> strictly necessary if you're not using transactions – we could just use the
> topic partition's leader as the coordinator, avoiding the extra discovery.
> In my experience, most bugs are related to figuring out which broker is the
> leader of which partition/group/whatever, so minimizing the number of
> moving parts would be beneficial to me. I'd also like to point out that I
> would be reluctant to implement the transaction API in the near future, but
> would love to implement the idempotency API soon. The former seems only
> relevant to real stream processing frameworks, which is probably not the
> best use case for ruby-kafka.
>
> Cheers,
> Daniel Schierbeck
>
> On Thu, Dec 1, 2016 at 9:54 AM Jason Gustafson  wrote:
>
> > Hey Neha,
> >
> > Thanks for the thoughtful questions. I'll try to address the first
> question
> > since Apurva addressed the second. Since most readers are probably
> getting
> > up to speed with this large proposal, let me first take a step back and
> > explain why we need the AppID at all. As Confluent tradition demands, I
> > present you a big wall of text:
> >
> > Clearly "exactly once" delivery requires resilience to client failures.
> > When a client crashes or turns into a zombie, another client must
> > eventually be started to resume the work. There are two problems: 1) we
> > need to ensure that the old process is actually dead or at least that it
> > cannot write any more data, and 2) we need to be able to pick up wherever
> > the last process left off. To do either of these, we need some kind of
> > identifier to tie the two instances together.
> >
> > There are only two choices for where this ID comes from: either the user
> > gives it to us or the server generates it. In the latter case, the user
> is
> > responsible for fetching it from the client and persisting it somewhere
> for
> > use after failure. We ultimately felt that the most flexible option is to
> > have the user give it to us. In many applications, there is already a
> > natural identifier which is already used to divide the workload. For
> > example, in Kafka Streams and Kafka Connect, we have a taskId. For
> > applications where there is no natural ID, the user can generate a UUID
> and
> > persist it locally, which is as good as having the server generate it.
> >
> > So the AppID is used to provide continuity between the instances of a
> > producer which are handling a certain workload. One of the early design
> > decisions we made in this work was to make the delivery guarantees 

Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging

2016-12-01 Thread Apurva Mehta
Hi Ismael,

That is a good suggestion. We did not plan to move the design to a wiki,
but I think it is valuable to move at least the message format and RPC
changes to the wiki. We shall do so once the design is close to final so
that we do not have to edit multiple places as we iterate.

Thanks,
Apurva

On Thu, Dec 1, 2016 at 3:28 AM, Ismael Juma  wrote:

> Thanks for submitting this KIP as it includes important improvements to
> Kafka's semantics. I will send a follow-up with more detailed feedback, but
> I have a process question in the meantime: is there a plan to move part or
> all of the Google Doc content to the wiki? At least protocol and message
> format changes should be in the wiki, in my opinion (Google Docs are not as
> discoverable, it's harder to track changes, not hosted on Apache Infra,
> etc.).
>
> Thanks,
> Ismael
>
> On Wed, Nov 30, 2016 at 10:19 PM, Guozhang Wang 
> wrote:
>
> > Hi all,
> >
> > I have just created KIP-98 to enhance Kafka with exactly once delivery
> > semantics:
> >
> > *https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> >  > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging>*
> >
> > This KIP adds a transactional messaging mechanism along with an
> idempotent
> > producer implementation to make sure that 1) duplicated messages sent
> from
> > the same identified producer can be detected on the broker side, and 2) a
> > group of messages sent within a transaction will atomically be either
> > reflected and fetchable to consumers or not as a whole.
> >
> > The above wiki page provides a high-level view of the proposed changes as
> > well as summarized guarantees. Initial draft of the detailed
> implementation
> > design is described in this Google doc:
> >
> > https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF
> > 0wSw9ra8
> >
> >
> > We would love to hear your comments and suggestions.
> >
> > Thanks,
> >
> > -- Guozhang
> >
>


Re: [VOTE] KIP-84: Support SASL SCRAM mechanisms

2016-12-01 Thread Ismael Juma
Hi Rajini,

Sorry for the delay. For some reason, both of your replies (for this and
KIP-85) were marked as spam by Gmail. Comments inline.

On Mon, Nov 28, 2016 at 3:47 PM, Rajini Sivaram <
rajinisiva...@googlemail.com> wrote:
>
> 1. I think you had asked earlier for SCRAM-SHA-1 to be removed since it is
> not secure :-) I am happy to add that back in so that clients which don't
> have access to a more secure algorithm can use it. But it would be a shame
> to prevent users who only need Java clients from using more secure
> mechanisms. Since SHA-1 is not secure, you need a secure Zookeeper
> installation (or store your credentials in an alternative secure store)..
> By supporting multiple algorithms, we are giving the choice to users. It
> doesn't add much additional code, just the additional tests (one
> integration test per mechanism). As more clients support new mechanisms,
> users can enable these without any changes to Kafka.
>

Yes, I remember that I asked for SCRAM-SHA-1 to be removed. I probably
wasn't clear. My suggestion was not to add that back, but whether we needed
so many variants. For example, we could support SCRAM-SHA-256 and
SCRAM-SHA-512.
Would that be sufficient? It's true that the cost is not that large for us,
but every other client also has to pay that additional extra cost and I am
not sure sure about the benefit of some of the options.

3. I am assuming that ZK authentication will be enabled and ZK
> configuration will be done directly using ZK commands. This is true for
> ACLs, quotas etc. as well?
>

Right, I also thought that ACLs was the closest example. However, it seems
like getting read access to the SCRAM DB has potentially worse consequences:

"For a specific secret compromised, if an exchange is obtained from the
wire by some mechanism, this gives sufficient information for an imposter
to pose as the client for that server (but not another one using the same
password). Note that this interception is only useful if the database has
been compromised – SCRAM is safe against replay attack. This is the primary
SCRAM weakness, and why it is important to protect the secret database
carefully and to use TLS."[1]

Also, because we are using fast hashes (instead of slow ones like bcrypt,
scrypt, etc.), we are more susceptible to dictionary attacks (potentially
mitigated by a reasonably large iteration count combined with good quality
passwords).

If nothing else, it may be worth mentioning some of this in the KIP and/or
documentation.

Ismael

[1] http://www.isode.com/whitepapers/scram.html


Re: [VOTE] KIP-85: Dynamic JAAS configuration for Kafka clients

2016-12-01 Thread Ismael Juma
Thanks for explaining your reasoning, Rajini. I do agree with all of it and
that's why I voted +1. :)

The reason for my comment was to highlight some of the areas that can be
improved in case someone has the time and interest. The Kerberos situation
is the obvious one, but one could also imagine flattening the JAAS format
into the properties format. That would make it easier for people who want
to provide a single properties file and it would be more consistent with
other properties. I haven't worked out all the details, so there may be
reasons why it doesn't work, but it's an interesting avenue to explore I
think.

Ismael

On Mon, Nov 28, 2016 at 3:29 PM, Rajini Sivaram <
rajinisiva...@googlemail.com> wrote:

> Ismael,
>
> Thank you for reviewing the KIP. I do agree that JAAS config format is not
> ideal. But I wanted to solve the generic configuration issue (need for
> physical file, single static config) for any SASL mechanism in an
> extensible, future-proof way. And that requires the ability to configure
> all the properties currently configured using the JAAS config file - login
> module and all its options. It didn't make sense to define a new format to
> do this when JAAS is supported by Kafka.
>
> Kerberos is a very special case. Unlike other mechanisms, I imagine all
> users of Kerberos use the login module included in the JRE. And these
> modules happen to use different options depending on the vendor. I am not
> very familiar with the Hadoop codebase, but it looks like Hadoop contains
> code that abstracts out Kerberos options so that it works with any JRE.
> This KIP does not preclude better handling for Kerberos in future.
>
> For other mechanisms like PLAIN, we want the login module to be pluggable.
> And that means the options need to be extensible. Here JAAS config enables
> a format that is consistent with the jaas config file, but without the
> current limitations.
>
>
> On Mon, Nov 28, 2016 at 1:00 PM, Ismael Juma  wrote:
>
> > I'm very late to this, but better late than never, I guess. I am +1 on
> this
> > because it improves on the status quo, satisfies a real need and is
> simple
> > to implement.
> >
> > Having said that, I'd also like to state that it's a bit of a shame that
> we
> > are doubling down on the JAAS config format. It is a peculiar format and
> in
> > the Kerberos case (one of the common usages), it requires users to
> provide
> > different configs depending on the Java implementation being used. It
> would
> > be nice if we looked into abstracting some of this to make users' lives
> > easier. Looking at the Hadoop codebase, it looks like they try to do that
> > although I don't know how well it worked out in practice.
> >
> > Ismael
> >
> > On Tue, Nov 1, 2016 at 1:42 PM, Rajini Sivaram <
> > rajinisiva...@googlemail.com
> > > wrote:
> >
> > > KIP-85 vote has passed with 4 binding (Harsha, Gwen, Jason, Jun) and 4
> > > non-binding (Mickael, Jim, Edo, me) votes.
> > >
> > > Thank you all for your votes and comments. I will update the KIP page
> and
> > > rebase the PR.
> > >
> > > Many thanks,
> > >
> > > Rajini
> > >
> > >
> > >
> > > On Mon, Oct 31, 2016 at 11:29 AM, Edoardo Comar 
> > wrote:
> > >
> > > > +1 great KIP
> > > > --
> > > > Edoardo Comar
> > > > IBM MessageHub
> > > > eco...@uk.ibm.com
> > > > IBM UK Ltd, Hursley Park, SO21 2JN
> > > >
> > > > IBM United Kingdom Limited Registered in England and Wales with
> number
> > > > 741598 Registered office: PO Box 41, North Harbour, Portsmouth,
> Hants.
> > > PO6
> > > > 3AU
> > > >
> > > >
> > > >
> > > > From:   Rajini Sivaram 
> > > > To: dev@kafka.apache.org
> > > > Date:   26/10/2016 16:27
> > > > Subject:[VOTE] KIP-85: Dynamic JAAS configuration for Kafka
> > > > clients
> > > >
> > > >
> > > >
> > > > I would like to initiate the voting process for KIP-85: Dynamic JAAS
> > > > configuration for Kafka Clients:
> > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 85%3A+Dynamic+JAAS+
> > > > configuration+for+Kafka+clients
> > > >
> > > >
> > > > This KIP enables Java clients to connect to Kafka using SASL without
> a
> > > > physical jaas.conf file. This will also be useful to configure
> multiple
> > > > KafkaClient login contexts when multiple users are supported within a
> > > JVM.
> > > >
> > > > Thank you...
> > > >
> > > > Regards,
> > > >
> > > > Rajini
> > > >
> > > >
> > > >
> > > > Unless stated otherwise above:
> > > > IBM United Kingdom Limited - Registered in England and Wales with
> > number
> > > > 741598.
> > > > Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire
> PO6
> > > 3AU
> > > >
> > >
> > >
> > >
> > > --
> > > Regards,
> > >
> > > Rajini
> > >
> >
>
>
>
> --
> Regards,
>
> Rajini
>


[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2016-12-01 Thread Harald Kirsch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15712362#comment-15712362
 ] 

Harald Kirsch commented on KAFKA-1194:
--

[~soumyajitsahu] Here are the server configs as logged:
{code}
[2016-11-29 16:23:13,731] INFO KafkaConfig values: 
advertised.host.name = null
advertised.listeners = null
advertised.port = null
authorizer.class.name = 
auto.create.topics.enable = false
auto.leader.rebalance.enable = true
background.threads = 10
broker.id = 0
broker.id.generation.enable = true
broker.rack = null
compression.type = producer
connections.max.idle.ms = 60
controlled.shutdown.enable = true
controlled.shutdown.max.retries = 3
controlled.shutdown.retry.backoff.ms = 5000
controller.socket.timeout.ms = 3
default.replication.factor = 1
delete.topic.enable = false
fetch.purgatory.purge.interval.requests = 1000
group.max.session.timeout.ms = 30
group.min.session.timeout.ms = 6000
host.name = 
inter.broker.protocol.version = 0.10.1-IV2
leader.imbalance.check.interval.seconds = 300
leader.imbalance.per.broker.percentage = 10
listeners = PLAINTEXT://:9092
log.cleaner.backoff.ms = 15000
log.cleaner.dedupe.buffer.size = 134217728
log.cleaner.delete.retention.ms = 8640
log.cleaner.enable = true
log.cleaner.io.buffer.load.factor = 0.9
log.cleaner.io.buffer.size = 524288
log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
log.cleaner.min.cleanable.ratio = 0.1
log.cleaner.min.compaction.lag.ms = 0
log.cleaner.threads = 1
log.cleanup.policy = [compact]
log.dir = /tmp/kafka-logs
log.dirs = d:\Search\kafka
log.flush.interval.messages = 9223372036854775807
log.flush.interval.ms = null
log.flush.offset.checkpoint.interval.ms = 6
log.flush.scheduler.interval.ms = 9223372036854775807
log.index.interval.bytes = 4096
log.index.size.max.bytes = 10485760
log.message.format.version = 0.10.1-IV2
log.message.timestamp.difference.max.ms = 9223372036854775807
log.message.timestamp.type = CreateTime
log.preallocate = false
log.retention.bytes = -1
log.retention.check.interval.ms = 300100
log.retention.hours = 168
log.retention.minutes = null
log.retention.ms = null
log.roll.hours = 24
log.roll.jitter.hours = 0
log.roll.jitter.ms = null
log.roll.ms = null
log.segment.bytes = 200111000
log.segment.delete.delay.ms = 6
max.connections.per.ip = 2147483647
max.connections.per.ip.overrides = 
message.max.bytes = 20999000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 3
min.insync.replicas = 1
num.io.threads = 36
num.network.threads = 36
num.partitions = 1
num.recovery.threads.per.data.dir = 1
num.replica.fetchers = 1
offset.metadata.max.bytes = 4096
offsets.commit.required.acks = -1
offsets.commit.timeout.ms = 5000
offsets.load.buffer.size = 5242880
offsets.retention.check.interval.ms = 60
offsets.retention.minutes = 2147483
offsets.topic.compression.codec = 0
offsets.topic.num.partitions = 50
offsets.topic.replication.factor = 3
offsets.topic.segment.bytes = 104857600
port = 9092
principal.builder.class = class 
org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
producer.purgatory.purge.interval.requests = 1000
queued.max.requests = 500
quota.consumer.default = 9223372036854775807
quota.producer.default = 9223372036854775807
quota.window.num = 11
quota.window.size.seconds = 1
replica.fetch.backoff.ms = 1000
replica.fetch.max.bytes = 20999000
replica.fetch.min.bytes = 1
replica.fetch.response.max.bytes = 10485760
replica.fetch.wait.max.ms = 500
replica.high.watermark.checkpoint.interval.ms = 5000
replica.lag.time.max.ms = 1
replica.socket.receive.buffer.bytes = 65536
replica.socket.timeout.ms = 3
replication.quota.throttled.rate = 9223372036854775807
replication.quota.window.num = 11
replication.quota.window.size.seconds = 1
request.timeout.ms = 32
reserved.broker.max.id = 1000
sasl.enabled.mechanisms = [GSSAPI]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 6
sasl.kerberos.principal.to.local.rules = [DEFAULT]
sasl.kerberos.service.name = null

[jira] [Created] (KAFKA-4474) Poor kafka-streams throughput

2016-12-01 Thread Juan Chorro (JIRA)
Juan Chorro created KAFKA-4474:
--

 Summary: Poor kafka-streams throughput
 Key: KAFKA-4474
 URL: https://issues.apache.org/jira/browse/KAFKA-4474
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.1.0
Reporter: Juan Chorro


Hi! 

I'm writing because I have a worry about kafka-streams throughput.

I have only a kafka-streams application instance that consumes from 'input' 
topic, prints on the screen and produces in 'output' topic. All topics have 4 
partitions. As can be observed the topology is very simple.

I produce 120K messages/second to 'input' topic, when I measure the 'output' 
topic I detect that I'm receiving ~4K messages/second. I had next configuration 
(Remaining parameters by default):

application.id: myApp
bootstrap.servers: localhost:9092
zookeeper.connect: localhost:2181
num.stream.threads: 1

I was doing proofs and tests without success, but when I created a new 'input' 
topic with 1 partition (Maintain 'output' topic with 4 partitions) I got in 
'output' topic 120K messages/seconds.

I have been doing some performance tests and proof with next cases (All topics 
have 4 partitions in all cases):

Case A - 1 Instance:

- With num.stream.threads set to 1 I had ~3785 messages/second
- With num.stream.threads set to 2 I had ~3938 messages/second
- With num.stream.threads set to 4 I had ~120K messages/second

Case B - 2 Instances:

- With num.stream.threads set to 1 I had ~3930 messages/second for each 
instance (And throughput ~8K messages/second)
- With num.stream.threads set to 2 I had ~3945 messages/second for each 
instance (And more or less same throughput that with num.stream.threads set to 
1)

Case C - 4 Instances

- With num.stream.threads set to 1 I had 3946 messages/seconds for each 
instance (And throughput ~17K messages/second):

As can be observed when num.stream.threads is set to #partitions I have best 
results. Then I have next questions:

- Why whether I have a topic with #partitions > 1 and with num.streams.threads 
is set to 1 I have ~4K messages/second always?
- In case C. 4 instances with num.stream.threads set to 1 should be better that 
1 instance with num.stream.threads set to 4. Is corrects this supposition?

This is the kafka-streams application that I use: 
https://gist.github.com/Chorro/5522ec4acd1a005eb8c9663da86f5a18



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4475) Poor kafka-streams throughput

2016-12-01 Thread Juan Chorro (JIRA)
Juan Chorro created KAFKA-4475:
--

 Summary: Poor kafka-streams throughput
 Key: KAFKA-4475
 URL: https://issues.apache.org/jira/browse/KAFKA-4475
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.1.0
Reporter: Juan Chorro


Hi! 

I'm writing because I have a worry about kafka-streams throughput.

I have only a kafka-streams application instance that consumes from 'input' 
topic, prints on the screen and produces in 'output' topic. All topics have 4 
partitions. As can be observed the topology is very simple.

I produce 120K messages/second to 'input' topic, when I measure the 'output' 
topic I detect that I'm receiving ~4K messages/second. I had next configuration 
(Remaining parameters by default):

application.id: myApp
bootstrap.servers: localhost:9092
zookeeper.connect: localhost:2181
num.stream.threads: 1

I was doing proofs and tests without success, but when I created a new 'input' 
topic with 1 partition (Maintain 'output' topic with 4 partitions) I got in 
'output' topic 120K messages/seconds.

I have been doing some performance tests and proof with next cases (All topics 
have 4 partitions in all cases):

Case A - 1 Instance:

- With num.stream.threads set to 1 I had ~3785 messages/second
- With num.stream.threads set to 2 I had ~3938 messages/second
- With num.stream.threads set to 4 I had ~120K messages/second

Case B - 2 Instances:

- With num.stream.threads set to 1 I had ~3930 messages/second for each 
instance (And throughput ~8K messages/second)
- With num.stream.threads set to 2 I had ~3945 messages/second for each 
instance (And more or less same throughput that with num.stream.threads set to 
1)

Case C - 4 Instances

- With num.stream.threads set to 1 I had 3946 messages/seconds for each 
instance (And throughput ~17K messages/second):

As can be observed when num.stream.threads is set to #partitions I have best 
results. Then I have next questions:

- Why whether I have a topic with #partitions > 1 and with num.streams.threads 
is set to 1 I have ~4K messages/second always?
- In case C. 4 instances with num.stream.threads set to 1 should be better that 
1 instance with num.stream.threads set to 4. Is corrects this supposition?

This is the kafka-streams application that I use: 
https://gist.github.com/Chorro/5522ec4acd1a005eb8c9663da86f5a18



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4443) Controller should send UpdateMetadataRequest prior to LeaderAndIsrRequest during failover

2016-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15712288#comment-15712288
 ] 

ASF GitHub Bot commented on KAFKA-4443:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2194


> Controller should send UpdateMetadataRequest prior to LeaderAndIsrRequest 
> during failover
> -
>
> Key: KAFKA-4443
> URL: https://issues.apache.org/jira/browse/KAFKA-4443
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>Reporter: Dong Lin
>Assignee: Dong Lin
>  Labels: reliability
> Fix For: 0.10.2.0
>
>
> Currently in onControllerFailover(), controller will startup 
> replicaStatemachine and partitionStateMachine before invoking 
> sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq).
>  However, if a broker starts right after controller election, the 
> LeaderAndIsrRequest sent to follower partitions on this broker will all be 
> ignored because broker doesn't know the leaders are alive. 
> To fix this problem, in onControllerFailover(), controller should send 
> UpdateMetadataRequest to brokers after initializeControllerContext() but 
> before it starts replicaStatemachine and partitionStateMachine. The first 
> MetadatUpdateRequest will include list of live broker. Although it will not 
> include partition leader information, it is OK because we will always send 
> MetadataUpdateRequest again when we send LeaderAndIsrRequest during 
> replicaStateMachine.startup() and partitionStateMachine.startup().



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2194: KAFKA-4443: Minor comment clean-up

2016-12-01 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2194


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4443) Controller should send UpdateMetadataRequest prior to LeaderAndIsrRequest during failover

2016-12-01 Thread Alexandre Vermeerbergen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15712243#comment-15712243
 ] 

Alexandre Vermeerbergen commented on KAFKA-4443:


Hello,

Could this fix be back-ported to Kafka 0.9.0.2 please, or better, as a patch 
for 0.9.0.1 ?
We had repeated occurrences in the past weeks with Kafka 0.9.0.1

Best regards,
Alex


> Controller should send UpdateMetadataRequest prior to LeaderAndIsrRequest 
> during failover
> -
>
> Key: KAFKA-4443
> URL: https://issues.apache.org/jira/browse/KAFKA-4443
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>Reporter: Dong Lin
>Assignee: Dong Lin
>  Labels: reliability
> Fix For: 0.10.2.0
>
>
> Currently in onControllerFailover(), controller will startup 
> replicaStatemachine and partitionStateMachine before invoking 
> sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq).
>  However, if a broker starts right after controller election, the 
> LeaderAndIsrRequest sent to follower partitions on this broker will all be 
> ignored because broker doesn't know the leaders are alive. 
> To fix this problem, in onControllerFailover(), controller should send 
> UpdateMetadataRequest to brokers after initializeControllerContext() but 
> before it starts replicaStatemachine and partitionStateMachine. The first 
> MetadatUpdateRequest will include list of live broker. Although it will not 
> include partition leader information, it is OK because we will always send 
> MetadataUpdateRequest again when we send LeaderAndIsrRequest during 
> replicaStateMachine.startup() and partitionStateMachine.startup().



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4391) On Windows, Kafka server stops with uncaught exception after coming back from sleep

2016-12-01 Thread Yiquan Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15712034#comment-15712034
 ] 

Yiquan Zhou commented on KAFKA-4391:


[~huxi_2b] Can you think of any possible workaround to this issue? For example 
if we are using a single Kafka server, is there a way to let Kafka not use this 
replication-offset-checkpoint file? Thanks.

> On Windows, Kafka server stops with uncaught exception after coming back from 
> sleep
> ---
>
> Key: KAFKA-4391
> URL: https://issues.apache.org/jira/browse/KAFKA-4391
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
> Environment: Windows 10, jdk1.8.0_111
>Reporter: Yiquan Zhou
>
> Steps to reproduce:
> 1. start the zookeeper
> $ bin\windows\zookeeper-server-start.bat config/zookeeper.properties
> 2. start the Kafka server with the default properties
> $ bin\windows\kafka-server-start.bat config/server.properties
> 3. put Windows into sleep mode for 1-2 hours
> 4. activate Windows again, an exception occurs in Kafka server console and 
> the server is stopped:
> {code:title=kafka console log}
> [2016-11-08 21:45:35,185] INFO Client session timed out, have not heard from 
> server in 10081379ms for sessionid 0x1584514da47, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
> [2016-11-08 21:45:40,698] INFO zookeeper state changed (Disconnected) 
> (org.I0Itec.zkclient.ZkClient)
> [2016-11-08 21:45:43,029] INFO Opening socket connection to server 
> 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
> [2016-11-08 21:45:43,044] INFO Socket connection established to 
> 127.0.0.1/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
> [2016-11-08 21:45:43,158] INFO Unable to reconnect to ZooKeeper service, 
> session 0x1584514da47 has expired, closing socket connection 
> (org.apache.zookeeper.ClientCnxn)
> [2016-11-08 21:45:43,158] INFO zookeeper state changed (Expired) 
> (org.I0Itec.zkclient.ZkClient)
> [2016-11-08 21:45:43,236] INFO Initiating client connection, 
> connectString=localhost:2181 sessionTimeout=6000 
> watcher=org.I0Itec.zkclient.ZkClient@11ca437b (org.apache.zookeeper.ZooKeeper)
> [2016-11-08 21:45:43,280] INFO EventThread shut down 
> (org.apache.zookeeper.ClientCnxn)
> log4j:ERROR Failed to rename [/controller.log] to 
> [/controller.log.2016-11-08-18].
> [2016-11-08 21:45:43,421] INFO Opening socket connection to server 
> 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
> [2016-11-08 21:45:43,483] INFO Socket connection established to 
> 127.0.0.1/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
> [2016-11-08 21:45:43,811] INFO Session establishment complete on server 
> 127.0.0.1/127.0.0.1:2181, sessionid = 0x1584514da470001, negotiated timeout = 
> 6000 (org.apache.zookeeper.ClientCnxn)
> [2016-11-08 21:45:43,827] INFO zookeeper state changed (SyncConnected) 
> (org.I0Itec.zkclient.ZkClient)
> log4j:ERROR Failed to rename [/server.log] to [/server.log.2016-11-08-18].
> [2016-11-08 21:45:43,827] INFO Creating /controller (is it secure? false) 
> (kafka.utils.ZKCheckedEphemeral)
> [2016-11-08 21:45:44,014] INFO Result of znode creation is: OK 
> (kafka.utils.ZKCheckedEphemeral)
> [2016-11-08 21:45:44,014] INFO 0 successfully elected as leader 
> (kafka.server.ZookeeperLeaderElector)
> log4j:ERROR Failed to rename [/state-change.log] to 
> [/state-change.log.2016-11-08-18].
> [2016-11-08 21:45:44,421] INFO re-registering broker info in ZK for broker 0 
> (kafka.server.KafkaHealthcheck)
> [2016-11-08 21:45:44,436] INFO Creating /brokers/ids/0 (is it secure? false) 
> (kafka.utils.ZKCheckedEphemeral)
> [2016-11-08 21:45:44,686] INFO Result of znode creation is: OK 
> (kafka.utils.ZKCheckedEphemeral)
> [2016-11-08 21:45:44,686] INFO Registered broker 0 at path /brokers/ids/0 
> with addresses: PLAINTEXT -> EndPoint(192.168.0.15,9092,PLAINTEXT) 
> (kafka.utils.ZkUtils)
> [2016-11-08 21:45:44,686] INFO done re-registering broker 
> (kafka.server.KafkaHealthcheck)
> [2016-11-08 21:45:44,686] INFO Subscribing to /brokers/topics path to watch 
> for new topics (kafka.server.KafkaHealthcheck)
> [2016-11-08 21:45:45,046] INFO [ReplicaFetcherManager on broker 0] Removed 
> fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)
> [2016-11-08 21:45:45,061] INFO New leader is 0 
> (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
> [2016-11-08 21:45:47,325] ERROR Uncaught exception in scheduled task 
> 'kafka-recovery-point-checkpoint' (kafka.utils.KafkaScheduler)
> java.io.IOException: File rename from 
> 

[GitHub] kafka-site pull request #34: Fix typo on introduction page

2016-12-01 Thread ashishg-qburst
GitHub user ashishg-qburst opened a pull request:

https://github.com/apache/kafka-site/pull/34

Fix typo on introduction page



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ashishg-qburst/kafka-site intro

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka-site/pull/34.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #34


commit 8585544c2c4ba4ea38e8be0e85ddd407b79a4af2
Author: ash 
Date:   2016-12-01T13:31:05Z

Fix typo on introduction page




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (KAFKA-4461) When using ProcessorTopologyTestDriver, the combination of map and .groupByKey does not produce any result

2016-12-01 Thread Hamidreza Afzali (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hamidreza Afzali updated KAFKA-4461:

Description: 
*Problem*

When using ProcessorTopologyTestDriver in the latest Kafka 0.10.1, the 
combination of map and .groupByKey does not produce any result. However, it 
works fine when using KStreamTestDriver.

The topology looks like this:

{code}
builder.stream(Serdes.String, Serdes.Integer, inputTopic)
 .map((k, v) => new KeyValue(fn(k), v))
 .groupByKey(Serdes.String, Serdes.Integer)
 .count(stateStore)
{code}

*Full examples*

Examples for ProcessorTopologyTestDriver and KStreamTestDriver:

https://gist.github.com/hrafzali/c2f50e7b957030dab13693eec1e49c13

*Additional info*

kafka-users mailing list:

http://mail-archives.apache.org/mod_mbox/kafka-users/201611.mbox/%3CCAHwHRrVq1APVkNhP3HVqxujxRJEP9FwHV2NRcvPPsHX7Wujzng%40mail.gmail.com%3E



  was:
*Problem*

When using ProcessorTopologyTestDriver in the latest Kafka 0.10.1, the 
combination of map and .groupByKey does not produce any result. However, it 
works fine when using KStreamTestDriver.

The topology looks like this:

{code}
builder.stream(Serdes.String, Serdes.Integer, inputTopic)
 .map((k, v) => new KeyValue(fn(k), v))
 .groupByKey(Serdes.String, Serdes.Integer)
 .count(stateStore)
{code}

*Full example*
https://gist.github.com/hrafzali/c2f50e7b957030dab13693eec1e49c13

*Additional info*

kafka-users mailing list:

http://mail-archives.apache.org/mod_mbox/kafka-users/201611.mbox/%3CCAHwHRrVq1APVkNhP3HVqxujxRJEP9FwHV2NRcvPPsHX7Wujzng%40mail.gmail.com%3E




> When using ProcessorTopologyTestDriver, the combination of map and 
> .groupByKey does not produce any result
> --
>
> Key: KAFKA-4461
> URL: https://issues.apache.org/jira/browse/KAFKA-4461
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Hamidreza Afzali
>Priority: Blocker
>
> *Problem*
> When using ProcessorTopologyTestDriver in the latest Kafka 0.10.1, the 
> combination of map and .groupByKey does not produce any result. However, it 
> works fine when using KStreamTestDriver.
> The topology looks like this:
> {code}
> builder.stream(Serdes.String, Serdes.Integer, inputTopic)
>  .map((k, v) => new KeyValue(fn(k), v))
>  .groupByKey(Serdes.String, Serdes.Integer)
>  .count(stateStore)
> {code}
> *Full examples*
> Examples for ProcessorTopologyTestDriver and KStreamTestDriver:
> https://gist.github.com/hrafzali/c2f50e7b957030dab13693eec1e49c13
> *Additional info*
> kafka-users mailing list:
> http://mail-archives.apache.org/mod_mbox/kafka-users/201611.mbox/%3CCAHwHRrVq1APVkNhP3HVqxujxRJEP9FwHV2NRcvPPsHX7Wujzng%40mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-96 - Add per partition metrics for in-sync and replica count

2016-12-01 Thread Tom Crayford
+1 (non-binding)

On Thu, Dec 1, 2016 at 12:11 AM, Apurva Mehta  wrote:

> +1 (non-binding)
>
> On Wed, Nov 30, 2016 at 2:00 PM, Jason Gustafson 
> wrote:
>
> > +1. Thanks for the KIP!
> >
> > On Wed, Nov 30, 2016 at 1:47 PM, Gwen Shapira  wrote:
> >
> > > +1 (binding)
> > >
> > > On Wed, Nov 30, 2016 at 1:34 PM, Xavier Léauté 
> > > wrote:
> > > > Based on the feedback KIP-96 seems pretty uncontroversial, so I'd
> like
> > to
> > > > initiate a vote on it.
> > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 96+-+Add+per+partition+metrics+for+in-sync+and+assigned+replica+count
> > > >
> > > > Xavier
> > >
> > >
> > >
> > > --
> > > Gwen Shapira
> > > Product Manager | Confluent
> > > 650.450.2760 | @gwenshap
> > > Follow us: Twitter | blog
> > >
> >
>


[jira] [Created] (KAFKA-4473) KafkaStreams does *not* guarantee at-least-once delivery

2016-12-01 Thread Thomas Schulz (JIRA)
Thomas Schulz created KAFKA-4473:


 Summary: KafkaStreams does *not* guarantee at-least-once delivery
 Key: KAFKA-4473
 URL: https://issues.apache.org/jira/browse/KAFKA-4473
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.1.0
Reporter: Thomas Schulz
Priority: Critical


see: https://groups.google.com/forum/#!topic/confluent-platform/DT5bk1oCVk8

There is probably a bug in the RecordCollector as described in my detailed 
Cluster test published in the aforementioned post.

The class RecordCollector has the following behavior:
- if there is no exception, add the message offset to a map
- otherwise, do not add the message offset and instead log the above statement

Is it possible that this offset map contains the latest offset to commit? If 
so, a message that fails might be overriden be a successful (later) message and 
the consumer commits every message up to the latest offset?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2198: HOTFIX: Forgot one transition

2016-12-01 Thread enothereska
GitHub user enothereska opened a pull request:

https://github.com/apache/kafka/pull/2198

HOTFIX: Forgot one transition



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/enothereska/kafka hotfix-stream-states

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2198.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2198


commit 4c3cafc30a94557abffdd44b57d90b8076fc6b33
Author: Eno Thereska 
Date:   2016-12-01T11:52:02Z

Forgot one transition




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging

2016-12-01 Thread Ismael Juma
Thanks for submitting this KIP as it includes important improvements to
Kafka's semantics. I will send a follow-up with more detailed feedback, but
I have a process question in the meantime: is there a plan to move part or
all of the Google Doc content to the wiki? At least protocol and message
format changes should be in the wiki, in my opinion (Google Docs are not as
discoverable, it's harder to track changes, not hosted on Apache Infra,
etc.).

Thanks,
Ismael

On Wed, Nov 30, 2016 at 10:19 PM, Guozhang Wang  wrote:

> Hi all,
>
> I have just created KIP-98 to enhance Kafka with exactly once delivery
> semantics:
>
> *https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
>  98+-+Exactly+Once+Delivery+and+Transactional+Messaging>*
>
> This KIP adds a transactional messaging mechanism along with an idempotent
> producer implementation to make sure that 1) duplicated messages sent from
> the same identified producer can be detected on the broker side, and 2) a
> group of messages sent within a transaction will atomically be either
> reflected and fetchable to consumers or not as a whole.
>
> The above wiki page provides a high-level view of the proposed changes as
> well as summarized guarantees. Initial draft of the detailed implementation
> design is described in this Google doc:
>
> https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF
> 0wSw9ra8
>
>
> We would love to hear your comments and suggestions.
>
> Thanks,
>
> -- Guozhang
>


Re: [DISCUSS] KIP-95: Incremental Batch Processing for Kafka Streams

2016-12-01 Thread Michael Noll
> Instead, why would we not have batch.mode=true/false?

Matthias already replied in detail, but let me also throw in my thoughts:

I'd argue that "there is no batch [mode]" (just like "there is no spoon"
for the Matrix fans out there :-P).  What would a batch mode look like?  It
is primarily defined by processing being done on bounded/finite data --
i.e. it's about the nature of the input data, not about the nature of the
execution mode.  The general case is processing on unbounded/infinite data,
i.e. what stream processing is typically associated with.

The question thus becomes how would we let users define which
bounded/finite data should be processed, i.e. how to split a continuous
stream into batch chunks.  The current proposal for incremental processing
does this (by intent) in a limited way, as Matthias outlined.

Does that make sense?




On Thu, Dec 1, 2016 at 1:17 AM, Matthias J. Sax 
wrote:

> Right now, there is only one config value and I am open to better
> suggestions.
>
> We did not go for batch.mode=true/false because we might want to have
> auto stop at specific stop-offsets or stop-timestamp later on. So we can
> extend the parameter with new values like autostop.at=timestamp in
> combination with a new parameter that does define the stop timestamp
> that gets applied over all input partitions.
>
> Of course, there are other ways to do this extension with different
> "where to stop" policies, too. However, only using "batch.mode" as
> parameter name right now also has the disadvantage to be less self
> descriptive compared to "autostop.at=eol" -- it is not immediately clear
> what "batch.mode" means and that is will stop at EOL.
>
> But as I said; I am more than happy to change this to a better name.
>
>
> -Matthias
>
>
> On 11/30/16 3:53 PM, Sriram Subramanian wrote:
> > I agree that the metadata topic is required to build a batching semantic
> > that is intuitive.
> >
> > One question on the config -
> >
> > autostop.at
> >
> > I see one value for it - eol. What other values can be used? Instead, why
> > would we not have batch.mode=true/false?
> >
> > On Wed, Nov 30, 2016 at 1:51 PM, Matthias J. Sax 
> > wrote:
> >
> >> Both types of intermediate topics are handled the exact same way and
> >> both types do connect different subtopologies (even if the user might
> >> not be aware that there are multiple subtopologies in case of internal
> >> data repartitioning). So there is no distinction between user
> >> intermediate topics (via through()) and internal intermediate
> >> repartitioning topics.
> >>
> >> I do also not understand your argument about "coupling instances"? The
> >> only "synchronization" is at startup time until the marker is written.
> >> Afterwards all instances just run as always. Furthermore, the metadata
> >> topic will be written within the leader while computing the overall
> >> partition assignment. Thus, the metadata topic will be fully populated
> >> (including the marker) before the individual instance will receive their
> >> assignment via group management protocol. So there is not more
> >> "synchronization" than before, as group management does synchronize
> >> instances anyway at startup.
> >>
> >> About startup failure. Yes, there is the case that the leader could
> >> potentially fail before the marker gets written. For this case, we have
> >> to consider a few things:
> >>
> >> 1. the net effect is, that no data will be processed by any instance
> >>(so application can start up, because no partition assignment will be
> >> distributed via group management, as the leader did fail while computing
> >> the assignment)
> >>
> >> 2. the failure would occur on partition assignment what would be a
> >> severe failure anyway and the application has bigger problems than a
> >> missing marker in the meta data topic (nobody will get partitioned
> >> assigned as the leader did not finish the assignment computation)
> >>
> >> 3. if the leader fails, a different application will become the leader.
> >>a) thus, if it is a permanent problem, eventually all instances are
> >> going down
> >>b) if the problem is transient, the probability is very high that the
> >> new leader will not fail
> >>
> >>
> >>
> >> -Matthias
> >>
> >> On 11/30/16 1:21 PM, Eno Thereska wrote:
> >>> In the KIP, two types of intermediate topics are described, 1) ones
> that
> >> connect two sub-topologies, and 2) others that are internal
> repartitioning
> >> topics (e.g., for joins).
> >>> I wasn't envisioning stopping the consumption of (2) at the HWM. The
> HWM
> >> can be used for the source topics only (so I agree with your "joins"
> >> scenario, but for a different reason).
> >>>
> >>> The case I am worried about is (1) when there are implicit connections
> >> between application instances where a 2nd instance's source topics
> would be
> >> the 1st instances output topics. In that case I was suggesting not to
> >> couple those 

Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging

2016-12-01 Thread Daniel Schierbeck
Hi there,

I'm the author of ruby-kafka, and as such am slightly biased towards
simplicity of implementation :-)

I like the proposal, and would love to use idempotent producer semantics in
our projects at Zendesk, but I'm a bit worried about the complexity that
would go into the clients; specifically: it sounds to me that in order to
get idempotent producer semantics, I'd have to implement the transaction
coordinator discovery. I may be wrong, but it would seem that it's not
strictly necessary if you're not using transactions – we could just use the
topic partition's leader as the coordinator, avoiding the extra discovery.
In my experience, most bugs are related to figuring out which broker is the
leader of which partition/group/whatever, so minimizing the number of
moving parts would be beneficial to me. I'd also like to point out that I
would be reluctant to implement the transaction API in the near future, but
would love to implement the idempotency API soon. The former seems only
relevant to real stream processing frameworks, which is probably not the
best use case for ruby-kafka.

Cheers,
Daniel Schierbeck

On Thu, Dec 1, 2016 at 9:54 AM Jason Gustafson  wrote:

> Hey Neha,
>
> Thanks for the thoughtful questions. I'll try to address the first question
> since Apurva addressed the second. Since most readers are probably getting
> up to speed with this large proposal, let me first take a step back and
> explain why we need the AppID at all. As Confluent tradition demands, I
> present you a big wall of text:
>
> Clearly "exactly once" delivery requires resilience to client failures.
> When a client crashes or turns into a zombie, another client must
> eventually be started to resume the work. There are two problems: 1) we
> need to ensure that the old process is actually dead or at least that it
> cannot write any more data, and 2) we need to be able to pick up wherever
> the last process left off. To do either of these, we need some kind of
> identifier to tie the two instances together.
>
> There are only two choices for where this ID comes from: either the user
> gives it to us or the server generates it. In the latter case, the user is
> responsible for fetching it from the client and persisting it somewhere for
> use after failure. We ultimately felt that the most flexible option is to
> have the user give it to us. In many applications, there is already a
> natural identifier which is already used to divide the workload. For
> example, in Kafka Streams and Kafka Connect, we have a taskId. For
> applications where there is no natural ID, the user can generate a UUID and
> persist it locally, which is as good as having the server generate it.
>
> So the AppID is used to provide continuity between the instances of a
> producer which are handling a certain workload. One of the early design
> decisions we made in this work was to make the delivery guarantees we
> provide agnostic of the workload that the producer is assigned. The
> producer is not in the business of trying to divide up the work among all
> its peers who are participating in the same duty (unlike the consumer, we
> don't know anything about where the data comes from). This has huge
> implications for "exactly-once" delivery because it puts the burden on the
> user to divide the total workload among producer instances and to assign
> AppIDs accordingly.
>
> I've been using the term "workload" loosely, but we usually imagine
> something like Kafka Connect's notion of a "source partition." A source
> partition could be a topic partition if the source is Kafka, or it could be
> a database table, a log file, or whatever makes sense for the source of the
> data. The point is that it's an independent source of data which can be
> assigned to a producer instance.
>
> If the same source partition is always assigned to the producer with the
> the same AppID, then Kafka transactions will give you "exactly once"
> delivery without much additional work. On initialization, the producer will
> ensure that 1) any previous producers using that AppID are "fenced" off,
> and 2) that any transaction which had been started by a previous producer
> with that AppID have either completed or aborted.
>
> Based on this, it should be clear that the ideal is to divide the workload
> so that you have a one-to-one mapping from the source partition to the
> AppID. If the source of the data is Kafka, then the source partition is
> just a topic partition, and the AppID can be generated from the name of the
> topic and the partition number.
>
> To finally get back to your auto-scaling question, let's assume for a
> moment the ideal mapping of source partition to AppID. The main question is
> whether the scaling is "horizontal" or "vertical." By horizontal, I mean an
> increase in the number of source partitions. This case is easy. Assign new
> AppIDs based on the new source partitions and you're done.
>
> But if the scaling is vertical (i.e. an 

Build failed in Jenkins: kafka-trunk-jdk7 #1721

2016-12-01 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] KAFKA-3637: Added initial states

--
[...truncated 14223 lines...]

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAllowNullNameWhenAddingSource STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAllowNullNameWhenAddingSource PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAllowNullTopicWhenAddingSink STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAllowNullTopicWhenAddingSink PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddStateStoreWithSource STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddStateStoreWithSource PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldThroughOnUnassignedStateStoreAccess STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldThroughOnUnassignedStateStoreAccess PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAllowNullProcessorNameWhenConnectingProcessorAndStateStores STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAllowNullProcessorNameWhenConnectingProcessorAndStateStores PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSourceWithSameName STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSourceWithSameName PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddMoreThanOnePatternSourceNode STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddMoreThanOnePatternSourceNode PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddProcessorWithSameName STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddProcessorWithSameName PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldSetCorrectSourceNodesWithRegexUpdatedTopics STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldSetCorrectSourceNodesWithRegexUpdatedTopics PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSourceWithSameTopic STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSourceWithSameTopic PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testTopicGroupsByStateStore STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testTopicGroupsByStateStore PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddStateStoreWithDuplicates STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddStateStoreWithDuplicates PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > testPatternSourceTopic 
STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > testPatternSourceTopic 
PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldAssociateStateStoreNameWhenStateStoreSupplierIsExternal STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldAssociateStateStoreNameWhenStateStoreSupplierIsExternal PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSinkWithWrongParent STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSinkWithWrongParent PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testPatternMatchesAlreadyProvidedTopicSource STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testPatternMatchesAlreadyProvidedTopicSource PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSinkConnectedWithMultipleParent STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSinkConnectedWithMultipleParent PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testSubscribeTopicNameAndPattern STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testSubscribeTopicNameAndPattern PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAddNullInternalTopic STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAddNullInternalTopic PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddProcessorWithWrongParent STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddProcessorWithWrongParent PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAllowNullNameWhenAddingProcessor STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAllowNullNameWhenAddingProcessor PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > testAddStateStore 
STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > testAddStateStore 
PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSinkConnectedWithParent STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSinkConnectedWithParent PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 

[jira] [Commented] (KAFKA-4405) Kafka consumer improperly send prefetch request

2016-12-01 Thread Eno Thereska (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15711481#comment-15711481
 ] 

Eno Thereska commented on KAFKA-4405:
-

[~guozhang] Ignore previous comment, I was testing by mistake with 
max.poll.records=1000 today. The PR is still showing benefits for 
max.poll.records=1

> Kafka consumer improperly send prefetch request
> ---
>
> Key: KAFKA-4405
> URL: https://issues.apache.org/jira/browse/KAFKA-4405
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.1
>Reporter: ysysberserk
>
> Now kafka consumer has added max.poll.records to limit the count of messages 
> return by poll().
> According to KIP-41, to implement  max.poll.records, the prefetch request 
> should only be sent when the total number of retained records is less than 
> max.poll.records.
> But in the code of 0.10.0.1 , the consumer will send a prefetch request if it 
> retained any records and never check if total number of retained records is 
> less than max.poll.records..
> If max.poll.records is set to a count much less than the count of message 
> fetched , the poll() loop will send a lot of requests than expected and will 
> have more and more records fetched and stored in memory before they can be 
> consumed.
> So before sending a  prefetch request , the consumer must check if total 
> number of retained records is less than max.poll.records.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-4405) Kafka consumer improperly send prefetch request

2016-12-01 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska reassigned KAFKA-4405:
---

Assignee: Eno Thereska

> Kafka consumer improperly send prefetch request
> ---
>
> Key: KAFKA-4405
> URL: https://issues.apache.org/jira/browse/KAFKA-4405
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.1
>Reporter: ysysberserk
>Assignee: Eno Thereska
>
> Now kafka consumer has added max.poll.records to limit the count of messages 
> return by poll().
> According to KIP-41, to implement  max.poll.records, the prefetch request 
> should only be sent when the total number of retained records is less than 
> max.poll.records.
> But in the code of 0.10.0.1 , the consumer will send a prefetch request if it 
> retained any records and never check if total number of retained records is 
> less than max.poll.records..
> If max.poll.records is set to a count much less than the count of message 
> fetched , the poll() loop will send a lot of requests than expected and will 
> have more and more records fetched and stored in memory before they can be 
> consumed.
> So before sending a  prefetch request , the consumer must check if total 
> number of retained records is less than max.poll.records.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2193: KAFKA-4405: Check max.poll.records before prefetch...

2016-12-01 Thread enothereska
GitHub user enothereska reopened a pull request:

https://github.com/apache/kafka/pull/2193

KAFKA-4405: Check max.poll.records before prefetching



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/enothereska/kafka KAFKA-4405-prefetch

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2193.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2193


commit 592482e0c41b14f66bd78f2f4f6247a686743280
Author: Eno Thereska 
Date:   2016-11-30T09:29:55Z

Check max.poll.records before prefetching




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4405) Kafka consumer improperly send prefetch request

2016-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15711475#comment-15711475
 ] 

ASF GitHub Bot commented on KAFKA-4405:
---

GitHub user enothereska reopened a pull request:

https://github.com/apache/kafka/pull/2193

KAFKA-4405: Check max.poll.records before prefetching



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/enothereska/kafka KAFKA-4405-prefetch

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2193.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2193


commit 592482e0c41b14f66bd78f2f4f6247a686743280
Author: Eno Thereska 
Date:   2016-11-30T09:29:55Z

Check max.poll.records before prefetching




> Kafka consumer improperly send prefetch request
> ---
>
> Key: KAFKA-4405
> URL: https://issues.apache.org/jira/browse/KAFKA-4405
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.1
>Reporter: ysysberserk
>
> Now kafka consumer has added max.poll.records to limit the count of messages 
> return by poll().
> According to KIP-41, to implement  max.poll.records, the prefetch request 
> should only be sent when the total number of retained records is less than 
> max.poll.records.
> But in the code of 0.10.0.1 , the consumer will send a prefetch request if it 
> retained any records and never check if total number of retained records is 
> less than max.poll.records..
> If max.poll.records is set to a count much less than the count of message 
> fetched , the poll() loop will send a lot of requests than expected and will 
> have more and more records fetched and stored in memory before they can be 
> consumed.
> So before sending a  prefetch request , the consumer must check if total 
> number of retained records is less than max.poll.records.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4405) Kafka consumer improperly send prefetch request

2016-12-01 Thread Eno Thereska (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15711461#comment-15711461
 ] 

Eno Thereska commented on KAFKA-4405:
-

[~guozhang]I tested with the new trunk containing KAFKA-4469 and I don't see 
the performance discrepancy anymore. So I'll close this PR. Thanks.

> Kafka consumer improperly send prefetch request
> ---
>
> Key: KAFKA-4405
> URL: https://issues.apache.org/jira/browse/KAFKA-4405
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.1
>Reporter: ysysberserk
>
> Now kafka consumer has added max.poll.records to limit the count of messages 
> return by poll().
> According to KIP-41, to implement  max.poll.records, the prefetch request 
> should only be sent when the total number of retained records is less than 
> max.poll.records.
> But in the code of 0.10.0.1 , the consumer will send a prefetch request if it 
> retained any records and never check if total number of retained records is 
> less than max.poll.records..
> If max.poll.records is set to a count much less than the count of message 
> fetched , the poll() loop will send a lot of requests than expected and will 
> have more and more records fetched and stored in memory before they can be 
> consumed.
> So before sending a  prefetch request , the consumer must check if total 
> number of retained records is less than max.poll.records.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4405) Kafka consumer improperly send prefetch request

2016-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15711450#comment-15711450
 ] 

ASF GitHub Bot commented on KAFKA-4405:
---

Github user enothereska closed the pull request at:

https://github.com/apache/kafka/pull/2193


> Kafka consumer improperly send prefetch request
> ---
>
> Key: KAFKA-4405
> URL: https://issues.apache.org/jira/browse/KAFKA-4405
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.1
>Reporter: ysysberserk
>
> Now kafka consumer has added max.poll.records to limit the count of messages 
> return by poll().
> According to KIP-41, to implement  max.poll.records, the prefetch request 
> should only be sent when the total number of retained records is less than 
> max.poll.records.
> But in the code of 0.10.0.1 , the consumer will send a prefetch request if it 
> retained any records and never check if total number of retained records is 
> less than max.poll.records..
> If max.poll.records is set to a count much less than the count of message 
> fetched , the poll() loop will send a lot of requests than expected and will 
> have more and more records fetched and stored in memory before they can be 
> consumed.
> So before sending a  prefetch request , the consumer must check if total 
> number of retained records is less than max.poll.records.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2193: KAFKA-4405: Check max.poll.records before prefetch...

2016-12-01 Thread enothereska
Github user enothereska closed the pull request at:

https://github.com/apache/kafka/pull/2193


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging

2016-12-01 Thread Jason Gustafson
Hey Neha,

Thanks for the thoughtful questions. I'll try to address the first question
since Apurva addressed the second. Since most readers are probably getting
up to speed with this large proposal, let me first take a step back and
explain why we need the AppID at all. As Confluent tradition demands, I
present you a big wall of text:

Clearly "exactly once" delivery requires resilience to client failures.
When a client crashes or turns into a zombie, another client must
eventually be started to resume the work. There are two problems: 1) we
need to ensure that the old process is actually dead or at least that it
cannot write any more data, and 2) we need to be able to pick up wherever
the last process left off. To do either of these, we need some kind of
identifier to tie the two instances together.

There are only two choices for where this ID comes from: either the user
gives it to us or the server generates it. In the latter case, the user is
responsible for fetching it from the client and persisting it somewhere for
use after failure. We ultimately felt that the most flexible option is to
have the user give it to us. In many applications, there is already a
natural identifier which is already used to divide the workload. For
example, in Kafka Streams and Kafka Connect, we have a taskId. For
applications where there is no natural ID, the user can generate a UUID and
persist it locally, which is as good as having the server generate it.

So the AppID is used to provide continuity between the instances of a
producer which are handling a certain workload. One of the early design
decisions we made in this work was to make the delivery guarantees we
provide agnostic of the workload that the producer is assigned. The
producer is not in the business of trying to divide up the work among all
its peers who are participating in the same duty (unlike the consumer, we
don't know anything about where the data comes from). This has huge
implications for "exactly-once" delivery because it puts the burden on the
user to divide the total workload among producer instances and to assign
AppIDs accordingly.

I've been using the term "workload" loosely, but we usually imagine
something like Kafka Connect's notion of a "source partition." A source
partition could be a topic partition if the source is Kafka, or it could be
a database table, a log file, or whatever makes sense for the source of the
data. The point is that it's an independent source of data which can be
assigned to a producer instance.

If the same source partition is always assigned to the producer with the
the same AppID, then Kafka transactions will give you "exactly once"
delivery without much additional work. On initialization, the producer will
ensure that 1) any previous producers using that AppID are "fenced" off,
and 2) that any transaction which had been started by a previous producer
with that AppID have either completed or aborted.

Based on this, it should be clear that the ideal is to divide the workload
so that you have a one-to-one mapping from the source partition to the
AppID. If the source of the data is Kafka, then the source partition is
just a topic partition, and the AppID can be generated from the name of the
topic and the partition number.

To finally get back to your auto-scaling question, let's assume for a
moment the ideal mapping of source partition to AppID. The main question is
whether the scaling is "horizontal" or "vertical." By horizontal, I mean an
increase in the number of source partitions. This case is easy. Assign new
AppIDs based on the new source partitions and you're done.

But if the scaling is vertical (i.e. an increase in the load on the source
partitions), there's not much this proposal can do to help. You're going to
have to break the source partition into child partitions, and assign each
of the new partitions a new AppID. To preserve "exactly once" delivery, you
must make sure that the producers using the AppID assigned to the parent
partition have been shutdown cleanly. We could provide a way to pass in a
"parent AppID" so that the producer could check the appropriate safety
conditions, but for the first version, we assume that users consider
scaling requirements when dividing the workload into source partitions.

Unfortunately, the real world is always falling short of the ideal, and
it's not always practical to have a one-to-one mapping of source partition
to AppID, since that also implies a one-to-one mapping of source partition
to producer instance. If I were a user, I'd push this limit as far as is
reasonable, but with enough source partitions, it eventually breaks down.
At some point, you need a producer to handle the load of more than one
source partition. This is fine in itself if the assignment is sticky: that
is, if we can ensure that the same source partition is assigned to the
producer using a certain AppID. If not, then the user is responsible for
ensuring a clean hand-off. The producer reading 

Build failed in Jenkins: kafka-trunk-jdk8 #1070

2016-12-01 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] KAFKA-3637: Added initial states

--
[...truncated 14449 lines...]

org.apache.kafka.streams.KafkaStreamsTest > 
shouldReturnFalseOnCloseWhenThreadsHaventTerminated PASSED

org.apache.kafka.streams.KafkaStreamsTest > 
shouldNotGetAllTasksWithStoreWhenNotRunning STARTED

org.apache.kafka.streams.KafkaStreamsTest > 
shouldNotGetAllTasksWithStoreWhenNotRunning PASSED

org.apache.kafka.streams.KafkaStreamsTest > testCannotStartOnceClosed STARTED

org.apache.kafka.streams.KafkaStreamsTest > testCannotStartOnceClosed PASSED

org.apache.kafka.streams.KafkaStreamsTest > testCleanup STARTED

org.apache.kafka.streams.KafkaStreamsTest > testCleanup PASSED

org.apache.kafka.streams.KafkaStreamsTest > testStartAndClose STARTED

org.apache.kafka.streams.KafkaStreamsTest > testStartAndClose PASSED

org.apache.kafka.streams.KafkaStreamsTest > testCloseIsIdempotent STARTED

org.apache.kafka.streams.KafkaStreamsTest > testCloseIsIdempotent PASSED

org.apache.kafka.streams.KafkaStreamsTest > testCannotCleanupWhileRunning 
STARTED

org.apache.kafka.streams.KafkaStreamsTest > testCannotCleanupWhileRunning PASSED

org.apache.kafka.streams.KafkaStreamsTest > testCannotStartTwice STARTED

org.apache.kafka.streams.KafkaStreamsTest > testCannotStartTwice PASSED

org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest > 
shouldCountClicksPerRegion[0] STARTED

org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest > 
shouldCountClicksPerRegion[0] PASSED

org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest > 
shouldCountClicksPerRegion[1] STARTED

org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest > 
shouldCountClicksPerRegion[1] PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldBeAbleToQueryState[0] STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldBeAbleToQueryState[0] PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldNotMakeStoreAvailableUntilAllStoresAvailable[0] STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldNotMakeStoreAvailableUntilAllStoresAvailable[0] PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
queryOnRebalance[0] STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
queryOnRebalance[0] PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
concurrentAccesses[0] STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
concurrentAccesses[0] PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldBeAbleToQueryState[1] STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldBeAbleToQueryState[1] PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldNotMakeStoreAvailableUntilAllStoresAvailable[1] STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldNotMakeStoreAvailableUntilAllStoresAvailable[1] FAILED
java.lang.AssertionError: Condition not met within timeout 3. waiting 
for store count-by-key
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:279)
at 
org.apache.kafka.streams.integration.QueryableStateIntegrationTest.shouldNotMakeStoreAvailableUntilAllStoresAvailable(QueryableStateIntegrationTest.java:490)

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
queryOnRebalance[1] STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
queryOnRebalance[1] PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
concurrentAccesses[1] STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
concurrentAccesses[1] PASSED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testShouldReadFromRegexAndNamedTopics STARTED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testShouldReadFromRegexAndNamedTopics PASSED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testRegexMatchesTopicsAWhenCreated STARTED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testRegexMatchesTopicsAWhenCreated PASSED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testMultipleConsumersCanReadFromPartitionedTopic STARTED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testMultipleConsumersCanReadFromPartitionedTopic PASSED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testRegexMatchesTopicsAWhenDeleted STARTED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testRegexMatchesTopicsAWhenDeleted PASSED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testNoMessagesSentExceptionFromOverlappingPatterns STARTED