[GitHub] kafka pull request #2620: MINOR: Doc change related to ZK sasl configs

2017-02-28 Thread omkreddy
GitHub user omkreddy opened a pull request:

https://github.com/apache/kafka/pull/2620

MINOR: Doc change related to ZK sasl configs



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/omkreddy/kafka MINOR-ZK-CHANGE

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2620.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2620


commit ae0eec2d3528bd83d8c36009d7d16c7ef636c6ca
Author: Manikumar Reddy O 
Date:   2017-03-01T07:19:31Z

MINOR: Doc change related to ZK sasl configs




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (KAFKA-4822) Kafka producer implementation without additional threads, similar to sync producer of 0.8.

2017-02-28 Thread Giri (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Giri updated KAFKA-4822:

Affects Version/s: 0.9.0.0
   0.9.0.1
   0.10.0.0
   0.10.0.1
   0.10.1.0
   0.10.1.1

> Kafka producer implementation without additional threads, similar to sync 
> producer of 0.8.
> --
>
> Key: KAFKA-4822
> URL: https://issues.apache.org/jira/browse/KAFKA-4822
> Project: Kafka
>  Issue Type: New Feature
>  Components: producer 
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1
>Reporter: Giri
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-128: Add ByteArrayConverter for Kafka Connect

2017-02-28 Thread Guozhang Wang
Generally I'd prefer not duplicating functional logic as sometimes you may
miss to sync one of them when you change the other. I understand for this
specific case such scenario may never happen as the logic is quite simple
and static, but still sounds a good coding practice to me?


Guozhang


On Tue, Feb 28, 2017 at 4:31 PM, Ewen Cheslack-Postava 
wrote:

> Guozhang,
>
> Did you look at the PR? I'm fine with doing that if we really think it's
> better, but since this is a config-less passthrough, it's actually just
> adding overhead to do that...
>
> -Ewen
>
> On Mon, Feb 27, 2017 at 11:47 AM, Guozhang Wang 
> wrote:
>
> > Thanks Ewen,
> >
> > "use the corresponding serializer internally and just add in the extra
> > conversion
> > steps for the data API" sounds good to me.
> >
> > Guozhang
> >
> >
> > On Mon, Feb 27, 2017 at 8:24 AM, Ewen Cheslack-Postava <
> e...@confluent.io>
> > wrote:
> >
> > > It's a different interface that's being implemented. The functionality
> is
> > > the same (since it's just a simple pass through), but we intentionally
> > > named Converters differently than Serializers since they do more work
> > than
> > > Serializers (besides the normal serialization they also need to convert
> > > between  and the Connect Data API.
> > >
> > > We could certainly reuse/extend that class instead, though I'm not sure
> > > there's much benefit in that and since they implement different
> > interfaces
> > > and this is Connect-specific, it will probably be clearer to have it
> > under
> > > a Connect package. Note that for other Converters the pattern we've
> used
> > is
> > > to use the corresponding serializer internally and just add in the
> extra
> > > conversion steps for the data API.
> > >
> > > -Ewen
> > >
> > > On Sat, Feb 25, 2017 at 6:52 PM, Guozhang Wang 
> > wrote:
> > >
> > > > I'm wondering why we can't just use ByteArarySerde in o.a.k.common?
> > > >
> > > > Guozhang
> > > >
> > > > On Sat, Feb 25, 2017 at 2:25 PM, Ewen Cheslack-Postava <
> > > e...@confluent.io>
> > > > wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > I've added a pretty trivial KIP for adding a pass-through Converter
> > for
> > > > > Kafka Connect, similar to ByteArraySerializer/Deserializer.
> > > > >
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > 128%3A+Add+ByteArrayConverter+for+Kafka+Connect
> > > > >
> > > > > This wasn't added with the framework originally because the idea
> was
> > to
> > > > > deal with structured data for the most part. However, we've seen a
> > > couple
> > > > > of use cases arise as the framework got more traction and I think
> it
> > > > makes
> > > > > sense to provide this out of the box now so people stop reinventing
> > the
> > > > > wheel (and using a different fully-qualified class name) for each
> > > > connector
> > > > > that needs this functionality.
> > > > >
> > > > > I imagine this will be a rather uncontroversial addition, so if I
> > don't
> > > > see
> > > > > any comments in the next day or two I'll just start the vote
> thread.
> > > > >
> > > > > -Ewen
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang


[jira] [Created] (KAFKA-4822) Kafka producer implementation without additional threads, similar to sync producer of 0.8.

2017-02-28 Thread Giri (JIRA)
Giri created KAFKA-4822:
---

 Summary: Kafka producer implementation without additional threads, 
similar to sync producer of 0.8.
 Key: KAFKA-4822
 URL: https://issues.apache.org/jira/browse/KAFKA-4822
 Project: Kafka
  Issue Type: New Feature
  Components: producer 
Reporter: Giri






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-82 - Add Record Headers

2017-02-28 Thread Michael Pearce
Sent to early:


Hi Radai:

RE:

Header header(String key) - returns JUST ONE (the very last) value given a
key
Iterable headers(String key) - returns ALL under a key
Iterable headers() - returns all, period. maybe allow null as key
to prev method instead?
void add(Header header) - appends a header (key inside).
void remove(String key) - removes ALL HEADERS under a key.

I don't think this one is needed:

"Iterable headers() - returns all, period. maybe allow null as key to 
prev method instead"


The class Headers implements Iterable anyhow.

On another note of the response for the add, remove. I note you put void. Any 
particular reason?

for add, i would normally either expect a boolean response if it succeeded or 
not (classical case) or the instance object returned like the "with" pattern. 
This avoids in the case of when its in read only, needing to throw exception, 
simply return false (in classical case).

Like wise on remove normally i would expect a boolean or previous object 
returned, so again operation didn't succeed either i get a false, or a null 
object.

any objections if i make the response's for these to booleans to denote if the 
operation succeeded?

Cheers
Mike



From: Michael Pearce
Sent: Wednesday, March 1, 2017 5:55 AM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-82 - Add Record Headers

Hi Radai:

RE:

Header header(String key) - returns JUST ONE (the very last) value given a
key
Iterable headers(String key) - returns ALL under a key

void add(Header header) - appends a header (key inside).
void remove(String key) - removes ALL HEADERS under a key.

I don't think this one is needed:




From: Becket Qin 
Sent: Wednesday, March 1, 2017 2:54 AM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-82 - Add Record Headers

Hi Ismael,

Yes, there is a difference between Batch and Headers. I was just trying to
see if that would work. Good point about sending the same ProducerRecord
twice, but in fact in that case any reuse of objects would cause problem.

As you can imagine if the ProducerRecord has a value as a List and the
Interceptor.onSend() can actually add an element to the List. If the
producer.send() is called on the same ProducerRecord again, the value list
would have one more element than the previously sent ProducerRecord even
though the ProducerRecord itself is not mutable, right? Same thing can
apply to any modifiable class type.

>From this standpoint allowing headers to be mutable doesn't really weaken
the mutability we already have. Admittedly a mutable header is kind of
guiding user towards to change the headers in the existing object instead
of creating a new one. But I think reusing an object while it is possible
to be modified by user code is a risk that users themselves are willing to
take. And we do not really protect against that. But there still seems
value to allow the users to not pay the overhead of creating tons of
objects if they do not reuse an object to send it twice, which is probably
a more common case.

Thanks,

Jiangjie (Becket) Qin


On Tue, Feb 28, 2017 at 12:43 PM, radai  wrote:

> I will settle for any API really, but just wanted to point out that as it
> stands right now the API targets the most "advanced" (hence obscure and
> rare) use cases, at the expense of the simple and common ones. i'd suggest
> (as the minimal set):
>
> Header header(String key) - returns JUST ONE (the very last) value given a
> key
> Iterable headers(String key) - returns ALL under a key
> Iterable headers() - returns all, period. maybe allow null as key
> to prev method instead?
> void add(Header header) - appends a header (key inside).
> void remove(String key) - removes ALL HEADERS under a key.
>
> this way naive get/set semantics map to header(key)/add(Header) cleanly and
> simply while preserving the ability to handle more advanced use cases.
> we can always add more convenience methods (like those dealing with lists -
> addAll etc) but i think the 5 (potentially 4) above are sufficient for
> basically everything.
>
> On Tue, Feb 28, 2017 at 4:08 AM, Ismael Juma  wrote:
>
> > Hi Becket,
> >
> > Comments inline.
> >
> > On Sat, Feb 25, 2017 at 10:33 PM, Becket Qin 
> wrote:
> > >
> > > 1. Regarding the mutability.
> > >
> > > I think it would be a big convenience to have headers mutable during
> > > certain stage in the message life cycle for the use cases you
> mentioned.
> > I
> > > agree there is a material benefit especially given that we may have to
> > > modify the headers for each message.
> > >
> > > That said, I also think it is fair to say that in the producer, in
> order
> > to
> > > guarantee the correctness of the entire logic, it is necessary that at
> > some
> > > point we need to make producer record immutable. For example we
> probably
> > > don't want to see that users 

Build failed in Jenkins: kafka-trunk-jdk8 #1311

2017-02-28 Thread Apache Jenkins Server
See 


Changes:

[ismael] KAFKA-4773; The Kafka build should run findbugs

--
[...truncated 159.37 KB...]

kafka.utils.timer.TimerTest > testTaskExpiration STARTED

kafka.utils.timer.TimerTest > testTaskExpiration PASSED

kafka.utils.timer.TimerTaskListTest > testAll STARTED

kafka.utils.timer.TimerTaskListTest > testAll PASSED

kafka.utils.CommandLineUtilsTest > testParseEmptyArg STARTED

kafka.utils.CommandLineUtilsTest > testParseEmptyArg PASSED

kafka.utils.CommandLineUtilsTest > testParseSingleArg STARTED

kafka.utils.CommandLineUtilsTest > testParseSingleArg PASSED

kafka.utils.CommandLineUtilsTest > testParseArgs STARTED

kafka.utils.CommandLineUtilsTest > testParseArgs PASSED

kafka.utils.CommandLineUtilsTest > testParseEmptyArgAsValid STARTED

kafka.utils.CommandLineUtilsTest > testParseEmptyArgAsValid PASSED

kafka.utils.ReplicationUtilsTest > testUpdateLeaderAndIsr STARTED

kafka.utils.ReplicationUtilsTest > testUpdateLeaderAndIsr PASSED

kafka.utils.ReplicationUtilsTest > testGetLeaderIsrAndEpochForPartition STARTED

kafka.utils.ReplicationUtilsTest > testGetLeaderIsrAndEpochForPartition PASSED

kafka.utils.JsonTest > testJsonEncoding STARTED

kafka.utils.JsonTest > testJsonEncoding PASSED

kafka.utils.ShutdownableThreadTest > testShutdownWhenCalledAfterThreadStart 
STARTED

kafka.utils.ShutdownableThreadTest > testShutdownWhenCalledAfterThreadStart 
PASSED

kafka.utils.SchedulerTest > testMockSchedulerNonPeriodicTask STARTED

kafka.utils.SchedulerTest > testMockSchedulerNonPeriodicTask PASSED

kafka.utils.SchedulerTest > testMockSchedulerPeriodicTask STARTED

kafka.utils.SchedulerTest > testMockSchedulerPeriodicTask PASSED

kafka.utils.SchedulerTest > testNonPeriodicTask STARTED

kafka.utils.SchedulerTest > testNonPeriodicTask PASSED

kafka.utils.SchedulerTest > testRestart STARTED

kafka.utils.SchedulerTest > testRestart PASSED

kafka.utils.SchedulerTest > testReentrantTaskInMockScheduler STARTED

kafka.utils.SchedulerTest > testReentrantTaskInMockScheduler PASSED

kafka.utils.SchedulerTest > testPeriodicTask STARTED

kafka.utils.SchedulerTest > testPeriodicTask PASSED

kafka.utils.ZkUtilsTest > testAbortedConditionalDeletePath STARTED

kafka.utils.ZkUtilsTest > testAbortedConditionalDeletePath PASSED

kafka.utils.ZkUtilsTest > testSuccessfulConditionalDeletePath STARTED

kafka.utils.ZkUtilsTest > testSuccessfulConditionalDeletePath PASSED

kafka.utils.ZkUtilsTest > testClusterIdentifierJsonParsing STARTED

kafka.utils.ZkUtilsTest > testClusterIdentifierJsonParsing PASSED

kafka.utils.IteratorTemplateTest > testIterator STARTED

kafka.utils.IteratorTemplateTest > testIterator PASSED

kafka.utils.UtilsTest > testGenerateUuidAsBase64 STARTED

kafka.utils.UtilsTest > testGenerateUuidAsBase64 PASSED

kafka.utils.UtilsTest > testAbs STARTED

kafka.utils.UtilsTest > testAbs PASSED

kafka.utils.UtilsTest > testReplaceSuffix STARTED

kafka.utils.UtilsTest > testReplaceSuffix PASSED

kafka.utils.UtilsTest > testCircularIterator STARTED

kafka.utils.UtilsTest > testCircularIterator PASSED

kafka.utils.UtilsTest > testReadBytes STARTED

kafka.utils.UtilsTest > testReadBytes PASSED

kafka.utils.UtilsTest > testCsvList STARTED

kafka.utils.UtilsTest > testCsvList PASSED

kafka.utils.UtilsTest > testReadInt STARTED

kafka.utils.UtilsTest > testReadInt PASSED

kafka.utils.UtilsTest > testUrlSafeBase64EncodeUUID STARTED

kafka.utils.UtilsTest > testUrlSafeBase64EncodeUUID PASSED

kafka.utils.UtilsTest > testCsvMap STARTED

kafka.utils.UtilsTest > testCsvMap PASSED

kafka.utils.UtilsTest > testInLock STARTED

kafka.utils.UtilsTest > testInLock PASSED

kafka.utils.UtilsTest > testSwallow STARTED

kafka.utils.UtilsTest > testSwallow PASSED

kafka.producer.AsyncProducerTest > testFailedSendRetryLogic STARTED

kafka.producer.AsyncProducerTest > testFailedSendRetryLogic PASSED

kafka.producer.AsyncProducerTest > testQueueTimeExpired STARTED

kafka.producer.AsyncProducerTest > testQueueTimeExpired PASSED

kafka.producer.AsyncProducerTest > testPartitionAndCollateEvents STARTED

kafka.producer.AsyncProducerTest > testPartitionAndCollateEvents PASSED

kafka.producer.AsyncProducerTest > testBatchSize STARTED

kafka.producer.AsyncProducerTest > testBatchSize PASSED

kafka.producer.AsyncProducerTest > testSerializeEvents STARTED

kafka.producer.AsyncProducerTest > testSerializeEvents PASSED

kafka.producer.AsyncProducerTest > testProducerQueueSize STARTED

kafka.producer.AsyncProducerTest > testProducerQueueSize PASSED

kafka.producer.AsyncProducerTest > testRandomPartitioner STARTED

kafka.producer.AsyncProducerTest > testRandomPartitioner PASSED

kafka.producer.AsyncProducerTest > testInvalidConfiguration STARTED

kafka.producer.AsyncProducerTest > testInvalidConfiguration PASSED

kafka.producer.AsyncProducerTest > testInvalidPartition STARTED

kafka.producer.AsyncProducerTest > 

Re: [DISCUSS] KIP-82 - Add Record Headers

2017-02-28 Thread Michael Pearce
Hi Radai:

RE:

Header header(String key) - returns JUST ONE (the very last) value given a
key
Iterable headers(String key) - returns ALL under a key

void add(Header header) - appends a header (key inside).
void remove(String key) - removes ALL HEADERS under a key.

I don't think this one is needed:




From: Becket Qin 
Sent: Wednesday, March 1, 2017 2:54 AM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-82 - Add Record Headers

Hi Ismael,

Yes, there is a difference between Batch and Headers. I was just trying to
see if that would work. Good point about sending the same ProducerRecord
twice, but in fact in that case any reuse of objects would cause problem.

As you can imagine if the ProducerRecord has a value as a List and the
Interceptor.onSend() can actually add an element to the List. If the
producer.send() is called on the same ProducerRecord again, the value list
would have one more element than the previously sent ProducerRecord even
though the ProducerRecord itself is not mutable, right? Same thing can
apply to any modifiable class type.

>From this standpoint allowing headers to be mutable doesn't really weaken
the mutability we already have. Admittedly a mutable header is kind of
guiding user towards to change the headers in the existing object instead
of creating a new one. But I think reusing an object while it is possible
to be modified by user code is a risk that users themselves are willing to
take. And we do not really protect against that. But there still seems
value to allow the users to not pay the overhead of creating tons of
objects if they do not reuse an object to send it twice, which is probably
a more common case.

Thanks,

Jiangjie (Becket) Qin


On Tue, Feb 28, 2017 at 12:43 PM, radai  wrote:

> I will settle for any API really, but just wanted to point out that as it
> stands right now the API targets the most "advanced" (hence obscure and
> rare) use cases, at the expense of the simple and common ones. i'd suggest
> (as the minimal set):
>
> Header header(String key) - returns JUST ONE (the very last) value given a
> key
> Iterable headers(String key) - returns ALL under a key
> Iterable headers() - returns all, period. maybe allow null as key
> to prev method instead?
> void add(Header header) - appends a header (key inside).
> void remove(String key) - removes ALL HEADERS under a key.
>
> this way naive get/set semantics map to header(key)/add(Header) cleanly and
> simply while preserving the ability to handle more advanced use cases.
> we can always add more convenience methods (like those dealing with lists -
> addAll etc) but i think the 5 (potentially 4) above are sufficient for
> basically everything.
>
> On Tue, Feb 28, 2017 at 4:08 AM, Ismael Juma  wrote:
>
> > Hi Becket,
> >
> > Comments inline.
> >
> > On Sat, Feb 25, 2017 at 10:33 PM, Becket Qin 
> wrote:
> > >
> > > 1. Regarding the mutability.
> > >
> > > I think it would be a big convenience to have headers mutable during
> > > certain stage in the message life cycle for the use cases you
> mentioned.
> > I
> > > agree there is a material benefit especially given that we may have to
> > > modify the headers for each message.
> > >
> > > That said, I also think it is fair to say that in the producer, in
> order
> > to
> > > guarantee the correctness of the entire logic, it is necessary that at
> > some
> > > point we need to make producer record immutable. For example we
> probably
> > > don't want to see that users accidentally updated the headers when the
> > > producer is doing the serialization or compression.
> > >
> > > Given that, would it be possible to make Headers to be able to switch
> > from
> > > mutable to immutable? We have done this for the Batch in the producer.
> > For
> > > example, initially the headers are mutable, but after it has gone
> through
> > > all the interceptors, we can call Headers.close() to make it immutable
> > > afterwards.
> > >
> >
> > The difference is that the batch is an internal class that is not exposed
> > to users. Can you please explain what happens if a user tries to send the
> > same ProducerRecord twice? Would an interceptor fail when trying to
> mutate
> > the header that is now closed? Or did you have something else in mind?
> >
> > Thanks,
> > Ismael
> >
>
The information contained in this email is strictly confidential and for the 
use of the addressee only, unless otherwise indicated. If you are not the 
intended recipient, please do not read, copy, use or disclose to others this 
message or any attachment. Please also notify the sender by replying to this 
email or by telephone (+44(020 7896 0011) and then delete the email and any 
copies of it. Opinions, conclusion (etc) that do not relate to the official 
business of this company shall be understood as neither given nor endorsed by 
it. IG is a trading name of IG 

[jira] [Commented] (KAFKA-4722) Add application.id to StreamThread name

2017-02-28 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15889540#comment-15889540
 ] 

Matthias J. Sax commented on KAFKA-4722:


Thanks for rebasing. I left a comment on the PR. Did you get an 
email/notification for this? I seems you only get notification from Jira -- I 
would recommend to enable Github notifications.

> Add application.id to StreamThread name
> ---
>
> Key: KAFKA-4722
> URL: https://issues.apache.org/jira/browse/KAFKA-4722
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.1.1
>Reporter: Steven Schlansker
>Assignee: Sharad
>Priority: Minor
>  Labels: beginner, easyfix, newbie
>
> StreamThread currently sets its name thusly:
> {code}
> super("StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
> {code}
> If you have multiple {{KafkaStreams}} instance within a single application, 
> it would help to add the application ID to {{StreamThread}} name to identify 
> which thread belong to what {{KafkaStreams}} instance.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4821) 9244L

2017-02-28 Thread Vamsi Jakkula (JIRA)
Vamsi Jakkula created KAFKA-4821:


 Summary: 9244L
 Key: KAFKA-4821
 URL: https://issues.apache.org/jira/browse/KAFKA-4821
 Project: Kafka
  Issue Type: Task
Reporter: Vamsi Jakkula


Creating of an issue using project keys and issue type names using the REST API



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Build failed in Jenkins: kafka-trunk-jdk8 #1310

2017-02-28 Thread Apache Jenkins Server
See 


Changes:

[ismael] MINOR: Add code quality checks (and suppressions) to checkstyle.xml

--
[...truncated 693.73 KB...]

org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStoreTest > 
shouldDelegateToUnderlyingStoreWhenFetching STARTED

org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStoreTest > 
shouldDelegateToUnderlyingStoreWhenFetching PASSED

org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreTest > 
shouldPutAllKeyValuePairs STARTED

org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreTest > 
shouldPutAllKeyValuePairs PASSED

org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreTest > testEvict 
STARTED

org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreTest > testEvict 
PASSED

org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreTest > 
shouldUpdateValuesForExistingKeysOnPutAll STARTED

org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreTest > 
shouldUpdateValuesForExistingKeysOnPutAll PASSED

org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreTest > testSize 
STARTED

org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreTest > testSize 
PASSED

org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreTest > 
testPutIfAbsent STARTED

org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreTest > 
testPutIfAbsent PASSED

org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreTest > 
testRestoreWithDefaultSerdes STARTED

org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreTest > 
testRestoreWithDefaultSerdes PASSED

org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreTest > 
testRestore STARTED

org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreTest > 
testRestore PASSED

org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreTest > 
testPutGetRange STARTED

org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreTest > 
testPutGetRange PASSED

org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreTest > 
testPutGetRangeWithDefaultSerdes STARTED

org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreTest > 
testPutGetRangeWithDefaultSerdes PASSED

org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreTest > 
shouldUseCustomRocksDbConfigSetter STARTED

org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreTest > 
shouldUseCustomRocksDbConfigSetter PASSED

org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreTest > 
shouldPerformAllQueriesWithCachingDisabled STARTED

org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreTest > 
shouldPerformAllQueriesWithCachingDisabled PASSED

org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreTest > 
shouldPerformRangeQueriesWithCachingDisabled STARTED

org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreTest > 
shouldPerformRangeQueriesWithCachingDisabled PASSED

org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreTest > 
shouldCloseOpenIteratorsWhenStoreClosedAndThrowInvalidStateStoreOnHasNextAndNext
 STARTED

org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreTest > 
shouldCloseOpenIteratorsWhenStoreClosedAndThrowInvalidStateStoreOnHasNextAndNext
 PASSED

org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreTest > testSize 
STARTED

org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreTest > testSize 
PASSED

org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreTest > 
testPutIfAbsent STARTED

org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreTest > 
testPutIfAbsent PASSED

org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreTest > 
testRestoreWithDefaultSerdes STARTED

org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreTest > 
testRestoreWithDefaultSerdes PASSED

org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreTest > testRestore 
STARTED

org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreTest > testRestore 
PASSED

org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreTest > 
testPutGetRange STARTED

org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreTest > 
testPutGetRange PASSED

org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreTest > 
testPutGetRangeWithDefaultSerdes STARTED

org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreTest > 
testPutGetRangeWithDefaultSerdes PASSED

org.apache.kafka.streams.KeyValueTest > shouldHaveSaneEqualsAndHashCode STARTED

org.apache.kafka.streams.KeyValueTest > shouldHaveSaneEqualsAndHashCode PASSED

org.apache.kafka.streams.KafkaStreamsTest > testIllegalMetricsConfig STARTED

org.apache.kafka.streams.KafkaStreamsTest > testIllegalMetricsConfig PASSED

org.apache.kafka.streams.KafkaStreamsTest > shouldNotGetAllTasksWhenNotRunning 
STARTED

org.apache.kafka.streams.KafkaStreamsTest > shouldNotGetAllTasksWhenNotRunning 
PASSED


[jira] [Commented] (KAFKA-4722) Add application.id to StreamThread name

2017-02-28 Thread Sharad (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15889483#comment-15889483
 ] 

Sharad commented on KAFKA-4722:
---

PR submitted:
https://github.com/apache/kafka/pull/2617

This is has just one commit.

> Add application.id to StreamThread name
> ---
>
> Key: KAFKA-4722
> URL: https://issues.apache.org/jira/browse/KAFKA-4722
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.1.1
>Reporter: Steven Schlansker
>Assignee: Sharad
>Priority: Minor
>  Labels: beginner, easyfix, newbie
>
> StreamThread currently sets its name thusly:
> {code}
> super("StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
> {code}
> If you have multiple {{KafkaStreams}} instance within a single application, 
> it would help to add the application ID to {{StreamThread}} name to identify 
> which thread belong to what {{KafkaStreams}} instance.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Issue Comment Deleted] (KAFKA-4722) Add application.id to StreamThread name

2017-02-28 Thread Sharad (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sharad updated KAFKA-4722:
--
Comment: was deleted

(was: PR submitted:
https://github.com/apache/kafka/pull/2487)

> Add application.id to StreamThread name
> ---
>
> Key: KAFKA-4722
> URL: https://issues.apache.org/jira/browse/KAFKA-4722
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.1.1
>Reporter: Steven Schlansker
>Assignee: Sharad
>Priority: Minor
>  Labels: beginner, easyfix, newbie
>
> StreamThread currently sets its name thusly:
> {code}
> super("StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
> {code}
> If you have multiple {{KafkaStreams}} instance within a single application, 
> it would help to add the application ID to {{StreamThread}} name to identify 
> which thread belong to what {{KafkaStreams}} instance.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4820) ConsumerNetworkClient.send() should not require global lock

2017-02-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15889476#comment-15889476
 ] 

ASF GitHub Bot commented on KAFKA-4820:
---

GitHub user lindong28 opened a pull request:

https://github.com/apache/kafka/pull/2619

KAFKA-4820; ConsumerNetworkClient.send() should not require global lock



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lindong28/kafka KAFKA-4820

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2619.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2619


commit 6a06f25748e7ff47eae27a5e40e74b2d9ac746e9
Author: Dong Lin 
Date:   2017-03-01T04:15:11Z

KAFKA-4820; ConsumerNetworkClient.send() should not require global lock




> ConsumerNetworkClient.send() should not require global lock
> ---
>
> Key: KAFKA-4820
> URL: https://issues.apache.org/jira/browse/KAFKA-4820
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dong Lin
>Assignee: Dong Lin
>
> Currently `ConsumerNetworkClient.send()` needs to acquire global lock of 
> `ConumserNetworkClient` in order to enqueue requests. If another thread has 
> called `ConsumerNetworkClient.poll(..)`, that thread may be holding the lock 
> while blocked on `nioSelector.select(ms)`. This causes problem because the 
> user thread which calls `ConsumerNetworkClient.send()` will also block 
> waiting for that `nioSelector.select(ms)` to finish.
> One way to address this problem is to use `synchronized (unsent)` to protect 
> access to `ConsumeNetworkClient.unsent` instead of protecting it using a 
> global lock. So that user thread should be able to enqueue requests 
> immediately while another thread is blocked on `nioSelector.select(ms)`.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2619: KAFKA-4820; ConsumerNetworkClient.send() should no...

2017-02-28 Thread lindong28
GitHub user lindong28 opened a pull request:

https://github.com/apache/kafka/pull/2619

KAFKA-4820; ConsumerNetworkClient.send() should not require global lock



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lindong28/kafka KAFKA-4820

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2619.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2619


commit 6a06f25748e7ff47eae27a5e40e74b2d9ac746e9
Author: Dong Lin 
Date:   2017-03-01T04:15:11Z

KAFKA-4820; ConsumerNetworkClient.send() should not require global lock




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (KAFKA-4820) ConsumerNetworkClient.send() should not require global lock

2017-02-28 Thread Dong Lin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4820?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-4820:

Description: 
Currently `ConsumerNetworkClient.send()` needs to acquire global lock of 
`ConumserNetworkClient` in order to enqueue requests. If another thread has 
called `ConsumerNetworkClient.poll(..)`, that thread may be holding the lock 
while blocked on `nioSelector.select(ms)`. This causes problem because the user 
thread which calls `ConsumerNetworkClient.send()` will also block waiting for 
that `nioSelector.select(ms)` to finish.

One way to address this problem is to use `synchronized (unsent)` to protect 
access to `ConsumeNetworkClient.unsent` instead of protecting it using a global 
lock. So that user thread should be able to enqueue requests immediately while 
another thread is blocked on `nioSelector.select(ms)`.


  was:
Currently `ConsumerNetworkClient.send()` needs to acquire global lock of 
`ConumserNetworkClient` in order to enqueue requests. If another thread has 
called `ConsumerNetworkClient.poll(..)`, that thread may be holding the lock 
while blocked on `nioSelector.select(ms)`. This causes problem because the user 
thread which calls `ConsumerNetworkClient.send()` will also block waiting for 
that `nioSelector.select(ms)` to finish.

One way to address this problem is to use thread-safe classes for the variable 
`ConsumeNetworkClient.unsent` so that it is protected by its own lock. Then 
`ConsumerNetworkClient.send()` will no longer require global lock in order to 
enqueue request.




> ConsumerNetworkClient.send() should not require global lock
> ---
>
> Key: KAFKA-4820
> URL: https://issues.apache.org/jira/browse/KAFKA-4820
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dong Lin
>Assignee: Dong Lin
>
> Currently `ConsumerNetworkClient.send()` needs to acquire global lock of 
> `ConumserNetworkClient` in order to enqueue requests. If another thread has 
> called `ConsumerNetworkClient.poll(..)`, that thread may be holding the lock 
> while blocked on `nioSelector.select(ms)`. This causes problem because the 
> user thread which calls `ConsumerNetworkClient.send()` will also block 
> waiting for that `nioSelector.select(ms)` to finish.
> One way to address this problem is to use `synchronized (unsent)` to protect 
> access to `ConsumeNetworkClient.unsent` instead of protecting it using a 
> global lock. So that user thread should be able to enqueue requests 
> immediately while another thread is blocked on `nioSelector.select(ms)`.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4820) ConsumerNetworkClient.send() should not require global lock

2017-02-28 Thread Dong Lin (JIRA)
Dong Lin created KAFKA-4820:
---

 Summary: ConsumerNetworkClient.send() should not require global 
lock
 Key: KAFKA-4820
 URL: https://issues.apache.org/jira/browse/KAFKA-4820
 Project: Kafka
  Issue Type: Bug
Reporter: Dong Lin
Assignee: Dong Lin


Currently `ConsumerNetworkClient.send()` needs to acquire global lock of 
`ConumserNetworkClient` in order to enqueue requests. If another thread has 
called `ConsumerNetworkClient.poll(..)`, that thread may be holding the lock 
while blocked on `nioSelector.select(ms)`. This causes problem because the user 
thread which calls `ConsumerNetworkClient.send()` will also block waiting for 
that `nioSelector.select(ms)` to finish.

One way to address this problem is to use thread-safe classes for the variable 
`ConsumeNetworkClient.unsent` so that it is protected by its own lock. Then 
`ConsumerNetworkClient.send()` will no longer require global lock in order to 
enqueue request.





--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4738) Remove generic type of class ClientState

2017-02-28 Thread Sharad (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15889417#comment-15889417
 ] 

Sharad commented on KAFKA-4738:
---

PR submitted:
https://github.com/apache/kafka/pull/2616 

This PR just has one commit for the fix.

> Remove generic type of class ClientState
> 
>
> Key: KAFKA-4738
> URL: https://issues.apache.org/jira/browse/KAFKA-4738
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Sharad
>Priority: Minor
>  Labels: beginner, newbie
>
> Currently, class 
> {{org.apache.kafka.streams.processor.internals.assignment.ClientState}} 
> uses a generic type. However, within actual Streams code base the type will 
> always be {{TaskId}} (from package {{org.apache.kafka.streams.processor}}).
> Thus, this ticket is about removing the generic type and replace it with 
> {{TaskId}}, to simplify the code base.
> There are some tests, that use {{ClientState}} (what allows for a 
> slightly simplified test setup).  Those tests need to be updated to work 
> properly using {{TaskId}} instead of {{Integer}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Issue Comment Deleted] (KAFKA-4738) Remove generic type of class ClientState

2017-02-28 Thread Sharad (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sharad updated KAFKA-4738:
--
Comment: was deleted

(was: Yes, its done.

PR submitted:
https://github.com/apache/kafka/pull/2605)

> Remove generic type of class ClientState
> 
>
> Key: KAFKA-4738
> URL: https://issues.apache.org/jira/browse/KAFKA-4738
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Sharad
>Priority: Minor
>  Labels: beginner, newbie
>
> Currently, class 
> {{org.apache.kafka.streams.processor.internals.assignment.ClientState}} 
> uses a generic type. However, within actual Streams code base the type will 
> always be {{TaskId}} (from package {{org.apache.kafka.streams.processor}}).
> Thus, this ticket is about removing the generic type and replace it with 
> {{TaskId}}, to simplify the code base.
> There are some tests, that use {{ClientState}} (what allows for a 
> slightly simplified test setup).  Those tests need to be updated to work 
> properly using {{TaskId}} instead of {{Integer}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2616: KAFKA:4738 - Remove generic type of class ClientSt...

2017-02-28 Thread sharad-develop
Github user sharad-develop closed the pull request at:

https://github.com/apache/kafka/pull/2616


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-82 - Add Record Headers

2017-02-28 Thread Becket Qin
Hi Ismael,

Yes, there is a difference between Batch and Headers. I was just trying to
see if that would work. Good point about sending the same ProducerRecord
twice, but in fact in that case any reuse of objects would cause problem.

As you can imagine if the ProducerRecord has a value as a List and the
Interceptor.onSend() can actually add an element to the List. If the
producer.send() is called on the same ProducerRecord again, the value list
would have one more element than the previously sent ProducerRecord even
though the ProducerRecord itself is not mutable, right? Same thing can
apply to any modifiable class type.

>From this standpoint allowing headers to be mutable doesn't really weaken
the mutability we already have. Admittedly a mutable header is kind of
guiding user towards to change the headers in the existing object instead
of creating a new one. But I think reusing an object while it is possible
to be modified by user code is a risk that users themselves are willing to
take. And we do not really protect against that. But there still seems
value to allow the users to not pay the overhead of creating tons of
objects if they do not reuse an object to send it twice, which is probably
a more common case.

Thanks,

Jiangjie (Becket) Qin


On Tue, Feb 28, 2017 at 12:43 PM, radai  wrote:

> I will settle for any API really, but just wanted to point out that as it
> stands right now the API targets the most "advanced" (hence obscure and
> rare) use cases, at the expense of the simple and common ones. i'd suggest
> (as the minimal set):
>
> Header header(String key) - returns JUST ONE (the very last) value given a
> key
> Iterable headers(String key) - returns ALL under a key
> Iterable headers() - returns all, period. maybe allow null as key
> to prev method instead?
> void add(Header header) - appends a header (key inside).
> void remove(String key) - removes ALL HEADERS under a key.
>
> this way naive get/set semantics map to header(key)/add(Header) cleanly and
> simply while preserving the ability to handle more advanced use cases.
> we can always add more convenience methods (like those dealing with lists -
> addAll etc) but i think the 5 (potentially 4) above are sufficient for
> basically everything.
>
> On Tue, Feb 28, 2017 at 4:08 AM, Ismael Juma  wrote:
>
> > Hi Becket,
> >
> > Comments inline.
> >
> > On Sat, Feb 25, 2017 at 10:33 PM, Becket Qin 
> wrote:
> > >
> > > 1. Regarding the mutability.
> > >
> > > I think it would be a big convenience to have headers mutable during
> > > certain stage in the message life cycle for the use cases you
> mentioned.
> > I
> > > agree there is a material benefit especially given that we may have to
> > > modify the headers for each message.
> > >
> > > That said, I also think it is fair to say that in the producer, in
> order
> > to
> > > guarantee the correctness of the entire logic, it is necessary that at
> > some
> > > point we need to make producer record immutable. For example we
> probably
> > > don't want to see that users accidentally updated the headers when the
> > > producer is doing the serialization or compression.
> > >
> > > Given that, would it be possible to make Headers to be able to switch
> > from
> > > mutable to immutable? We have done this for the Batch in the producer.
> > For
> > > example, initially the headers are mutable, but after it has gone
> through
> > > all the interceptors, we can call Headers.close() to make it immutable
> > > afterwards.
> > >
> >
> > The difference is that the batch is an internal class that is not exposed
> > to users. Can you please explain what happens if a user tries to send the
> > same ProducerRecord twice? Would an interceptor fail when trying to
> mutate
> > the header that is now closed? Or did you have something else in mind?
> >
> > Thanks,
> > Ismael
> >
>


[jira] [Updated] (KAFKA-4791) Kafka Streams - unable to add state stores when using wildcard topics on the source

2017-02-28 Thread Bill Bejeck (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-4791:
---
Status: Patch Available  (was: In Progress)

> Kafka Streams - unable to add state stores when using wildcard topics on the 
> source
> ---
>
> Key: KAFKA-4791
> URL: https://issues.apache.org/jira/browse/KAFKA-4791
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1
> Environment: Java 8
>Reporter: Bart Vercammen
>Assignee: Bill Bejeck
>
> I'm trying to build up a topology (using TopologyBuilder) with following 
> components :
> {code}
> new TopologyBuilder()
>   .addSource("ingest", Pattern.compile( ... ))
>   .addProcessor("myprocessor", ..., "ingest")
>   .addStateStore(dataStore, "myprocessor")
> {code}
> Somehow this does not seem to work.
> When creating the topology with exact topic names, all works fine, but it 
> seems not possible to attach state stores when using wildcard topics on the 
> sources.
> Inside {{addStateStore}}, the processor gets connected to the state store 
> with {{connectProcessorAndStateStore}}, and there it will try to connect the 
> state store with the source topics from the processor: 
> {{connectStateStoreNameToSourceTopics}}  
> Here lies the problem: 
> {code}
> private Set findSourceTopicsForProcessorParents(String [] 
> parents) {
> final Set sourceTopics = new HashSet<>();
> for (String parent : parents) {
> NodeFactory nodeFactory = nodeFactories.get(parent);
> if (nodeFactory instanceof SourceNodeFactory) {
> sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()));
> } else if (nodeFactory instanceof ProcessorNodeFactory) {
> 
> sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory)
>  nodeFactory).parents));
> }
> }
> return sourceTopics;
> }
> {code}
> The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()))}} will fail as there are no topics inside the 
> {{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null)
> I also tried to search for some unit tests inside the Kafka Streams project 
> that cover this scenario, but alas, I was not able to find any.
> Only some tests on state stores with exact topic names, and some tests on 
> wildcard topics, but no combination of both ...



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4791) Kafka Streams - unable to add state stores when using wildcard topics on the source

2017-02-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15889382#comment-15889382
 ] 

ASF GitHub Bot commented on KAFKA-4791:
---

GitHub user bbejeck opened a pull request:

https://github.com/apache/kafka/pull/2618

KAFKA-4791: unable to add state store with regex matched topics

Fix for adding state stores with regex defined sources

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bbejeck/kafka 
KAFKA-4791_unable_to_add_statestore_regex_topics

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2618.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2618


commit 828928905e393113e2a4f0148b8f1edc48d6e077
Author: bbejeck 
Date:   2017-03-01T02:27:50Z

KAFKA-4791: unable to add state store with regex matched topics




> Kafka Streams - unable to add state stores when using wildcard topics on the 
> source
> ---
>
> Key: KAFKA-4791
> URL: https://issues.apache.org/jira/browse/KAFKA-4791
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1
> Environment: Java 8
>Reporter: Bart Vercammen
>Assignee: Bill Bejeck
>
> I'm trying to build up a topology (using TopologyBuilder) with following 
> components :
> {code}
> new TopologyBuilder()
>   .addSource("ingest", Pattern.compile( ... ))
>   .addProcessor("myprocessor", ..., "ingest")
>   .addStateStore(dataStore, "myprocessor")
> {code}
> Somehow this does not seem to work.
> When creating the topology with exact topic names, all works fine, but it 
> seems not possible to attach state stores when using wildcard topics on the 
> sources.
> Inside {{addStateStore}}, the processor gets connected to the state store 
> with {{connectProcessorAndStateStore}}, and there it will try to connect the 
> state store with the source topics from the processor: 
> {{connectStateStoreNameToSourceTopics}}  
> Here lies the problem: 
> {code}
> private Set findSourceTopicsForProcessorParents(String [] 
> parents) {
> final Set sourceTopics = new HashSet<>();
> for (String parent : parents) {
> NodeFactory nodeFactory = nodeFactories.get(parent);
> if (nodeFactory instanceof SourceNodeFactory) {
> sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()));
> } else if (nodeFactory instanceof ProcessorNodeFactory) {
> 
> sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory)
>  nodeFactory).parents));
> }
> }
> return sourceTopics;
> }
> {code}
> The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()))}} will fail as there are no topics inside the 
> {{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null)
> I also tried to search for some unit tests inside the Kafka Streams project 
> that cover this scenario, but alas, I was not able to find any.
> Only some tests on state stores with exact topic names, and some tests on 
> wildcard topics, but no combination of both ...



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2618: KAFKA-4791: unable to add state store with regex m...

2017-02-28 Thread bbejeck
GitHub user bbejeck opened a pull request:

https://github.com/apache/kafka/pull/2618

KAFKA-4791: unable to add state store with regex matched topics

Fix for adding state stores with regex defined sources

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bbejeck/kafka 
KAFKA-4791_unable_to_add_statestore_regex_topics

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2618.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2618


commit 828928905e393113e2a4f0148b8f1edc48d6e077
Author: bbejeck 
Date:   2017-03-01T02:27:50Z

KAFKA-4791: unable to add state store with regex matched topics




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka-site pull request #49: Fix bad kafka stream link on use cases page

2017-02-28 Thread haoch
GitHub user haoch opened a pull request:

https://github.com/apache/kafka-site/pull/49

Fix bad kafka stream link on use cases page

When clicking [Kafka 
Streams](https://kafka.apache.org/%7B%7Bversion%7D%7D/documentation/streams) 
link on [Use Cases](https://kafka.apache.org/uses) page, redirected to 
https://kafka.apache.org/%7B%7Bversion%7D%7D/documentation/streams) and got:

404 Not Found: The requested URL /{{version}}/documentation/streams was 
not found on this server.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/haoch/kafka-site patch-1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka-site/pull/49.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #49






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #2617: KAFKA:4722 - Add application.id to StreamThread na...

2017-02-28 Thread sharad-develop
GitHub user sharad-develop opened a pull request:

https://github.com/apache/kafka/pull/2617

KAFKA:4722 - Add application.id to StreamThread name

Add application.id to StreamThread name

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sharad-develop/kafka KAFKA-4722

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2617.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2617


commit 5132da450bdf407f1202d83ed687c7bc31191397
Author: sharad.develop 
Date:   2017-03-01T01:41:06Z

KAFKA:4722 - Add application.id to StreamThread name




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #2487: Kafka-4722 : Add application.id to StreamThread na...

2017-02-28 Thread sharad-develop
Github user sharad-develop closed the pull request at:

https://github.com/apache/kafka/pull/2487


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #2616: KAFKA:4738 - Remove generic type of class ClientSt...

2017-02-28 Thread sharad-develop
GitHub user sharad-develop opened a pull request:

https://github.com/apache/kafka/pull/2616

KAFKA:4738 - Remove generic type of class ClientState

Remove generic type of class ClientState and generic T inTaskAssignor.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sharad-develop/kafka KAFKA-4738

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2616.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2616


commit 3e06cc8dcdef3afaaed71576d66dc14f61da6d7a
Author: sharad.develop 
Date:   2017-03-01T00:29:10Z

KAFKA:4738 - Remove generic type of class ClientState




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #2605: Kafka 4738:Remove generic type of class ClientStat...

2017-02-28 Thread sharad-develop
Github user sharad-develop closed the pull request at:

https://github.com/apache/kafka/pull/2605


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-128: Add ByteArrayConverter for Kafka Connect

2017-02-28 Thread Ewen Cheslack-Postava
Guozhang,

Did you look at the PR? I'm fine with doing that if we really think it's
better, but since this is a config-less passthrough, it's actually just
adding overhead to do that...

-Ewen

On Mon, Feb 27, 2017 at 11:47 AM, Guozhang Wang  wrote:

> Thanks Ewen,
>
> "use the corresponding serializer internally and just add in the extra
> conversion
> steps for the data API" sounds good to me.
>
> Guozhang
>
>
> On Mon, Feb 27, 2017 at 8:24 AM, Ewen Cheslack-Postava 
> wrote:
>
> > It's a different interface that's being implemented. The functionality is
> > the same (since it's just a simple pass through), but we intentionally
> > named Converters differently than Serializers since they do more work
> than
> > Serializers (besides the normal serialization they also need to convert
> > between  and the Connect Data API.
> >
> > We could certainly reuse/extend that class instead, though I'm not sure
> > there's much benefit in that and since they implement different
> interfaces
> > and this is Connect-specific, it will probably be clearer to have it
> under
> > a Connect package. Note that for other Converters the pattern we've used
> is
> > to use the corresponding serializer internally and just add in the extra
> > conversion steps for the data API.
> >
> > -Ewen
> >
> > On Sat, Feb 25, 2017 at 6:52 PM, Guozhang Wang 
> wrote:
> >
> > > I'm wondering why we can't just use ByteArarySerde in o.a.k.common?
> > >
> > > Guozhang
> > >
> > > On Sat, Feb 25, 2017 at 2:25 PM, Ewen Cheslack-Postava <
> > e...@confluent.io>
> > > wrote:
> > >
> > > > Hi all,
> > > >
> > > > I've added a pretty trivial KIP for adding a pass-through Converter
> for
> > > > Kafka Connect, similar to ByteArraySerializer/Deserializer.
> > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 128%3A+Add+ByteArrayConverter+for+Kafka+Connect
> > > >
> > > > This wasn't added with the framework originally because the idea was
> to
> > > > deal with structured data for the most part. However, we've seen a
> > couple
> > > > of use cases arise as the framework got more traction and I think it
> > > makes
> > > > sense to provide this out of the box now so people stop reinventing
> the
> > > > wheel (and using a different fully-qualified class name) for each
> > > connector
> > > > that needs this functionality.
> > > >
> > > > I imagine this will be a rather uncontroversial addition, so if I
> don't
> > > see
> > > > any comments in the next day or two I'll just start the vote thread.
> > > >
> > > > -Ewen
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>


[GitHub] kafka pull request #2615: Add a consumer offset migration tool

2017-02-28 Thread jeffwidman
GitHub user jeffwidman opened a pull request:

https://github.com/apache/kafka/pull/2615

Add a consumer offset migration tool

Extends #1715 to support renaming the consumer group as part of the 
migration. 

I tested this pretty thoroughly and seems to be working perfectly.

This is particularly useful if your consumers aren't Java-based, as many 
3rd-party clients don't support `dual.commit` forcing the use of a migration 
script like this in order to migrate.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jeffwidman/kafka offset-migrator-and-rename

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2615.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2615


commit 8ee1c59fcff3638479a74fdf14f0c56606a3a5a7
Author: Grant Henke 
Date:   2016-08-09T17:00:38Z

WIP: Add a consumer offset migration tool




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-1712) Excessive storage usage on newly added node

2017-02-28 Thread Alan Braithwaite (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15889137#comment-15889137
 ] 

Alan Braithwaite commented on KAFKA-1712:
-

Has this been looked at recently?  We've found it's an issue for nodes which 
are coming back into a cluster as well.

> Excessive storage usage on newly added node
> ---
>
> Key: KAFKA-1712
> URL: https://issues.apache.org/jira/browse/KAFKA-1712
> Project: Kafka
>  Issue Type: Bug
>Reporter: Oleg Golovin
>
> When a new node is added to cluster data starts replicating into it. The 
> mtime of creating segments will be set on the last message being written to 
> them. Though the replication is a prolonged process, let's assume (for 
> simplicity of explanation) that their mtime is very close to the time when 
> the new node was added.
> After the replication is done, new data will start to flow into this new 
> node. After `log.retention.hours` the amount of data will be 2 * 
> daily_amount_of_data_in_kafka_node (first one is the replicated data from 
> other nodes when the node was added (let us call it `t1`) and the second is 
> the amount of replicated data from other nodes which happened from `t1` to 
> `t1 + log.retention.hours`). So by that time the node will have twice as much 
> data as the other nodes.
> This poses a big problem to us as our storage is chosen to fit normal amount 
> of data (not twice this amount).
> In our particular case it poses another problem. We have an emergency segment 
> cleaner which runs in case storage is nearly full (>90%). We try to balance 
> the amount of data for it not to run to rely solely on kafka internal log 
> deletion, but sometimes emergency cleaner runs.
> It works this way:
> - it gets all kafka segments for the volume
> - it filters out last segments of each partition (just to avoid unnecessary 
> recreation of last small-size segments)
> - it sorts them by segment mtime
> - it changes mtime of the first N segements (with the lowest mtime) to 1, so 
> they become really really old. Number N is chosen to free specified 
> percentage of volume (3% in our case).  Kafka deletes these segments later 
> (as they are very old).
> Emergency cleaner works very well. Except for the case when the data is 
> replicated to the newly added node. 
> In this case segment mtime is the time the segment was replicated and does 
> not reflect the real creation time of original data stored in this segment.
> So in this case kafka emergency cleaner will delete segments with the lowest 
> mtime, which may hold the data which is much more recent than the data in 
> other segments.
> This is not a big problem until we delete the data which hasn't been fully 
> consumed.
> In this case we loose data and this makes it a big problem.
> Is it possible to retain segment mtime during initial replication on a new 
> node?
> This will help not to load the new node with the twice as large amount of 
> data as other nodes have.
> Or maybe there are another ways to sort segments by data creation times (or 
> close to data creation time)? (for example if this ticket is implemented 
> https://issues.apache.org/jira/browse/KAFKA-1403, we may take time of the 
> first message from .index). In our case it will help with kafka emergency 
> cleaner, which will be deleting really the oldest data.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4816) Message format changes for idempotent/transactional producer

2017-02-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15889133#comment-15889133
 ] 

ASF GitHub Bot commented on KAFKA-4816:
---

GitHub user hachikuji opened a pull request:

https://github.com/apache/kafka/pull/2614

KAFKA-4816: Message format changes for idempotent/transactional producer



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/confluentinc/kafka exactly-once-message-format

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2614.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2614


commit dbedb762dd04784c410b51a38413210da0cf9529
Author: Jason Gustafson 
Date:   2016-11-18T16:41:27Z

EoS Message Format Changes

commit 780880f24e31fe0b16a71be563b7914fc66dc58d
Author: hachikuji 
Date:   2017-02-15T05:09:44Z

Support variable length integer types in the new message format (#127)

commit 67b72a788003260265e87813ea08908ae163485a
Author: hachikuji 
Date:   2017-02-19T19:27:46Z

Add relative timestamps to new message format (#128)

commit 155e2e6b6c8c3d03187e3fd828fd04c194401ca1
Author: hachikuji 
Date:   2017-02-22T23:00:22Z

Implement full (stubbed out) fetch and produce request schema (#130)

commit 3e92c0c54a0334d73a9876c8c49a7ac4e1a77e87
Author: hachikuji 
Date:   2017-02-23T23:25:28Z

Add basic support for control messages (#132)

commit ac20e0630a3738ee86f5a2b84fc7624fad7c8345
Author: hachikuji 
Date:   2017-02-27T19:32:13Z

Add transactional flag support (#135)

commit 04909edd7079e026663a4da937aae4fb61741b27
Author: hachikuji 
Date:   2017-02-27T22:29:04Z

Add leader epoch for KIP-101 and no longer use attributes for null 
key/value (#136)

commit d31596034f331547f8967b565129957f5a8d5da1
Author: hachikuji 
Date:   2017-02-28T00:50:20Z

Support old produce request versions which use older magic values (#131)

commit a5681ac1db5a72184fcc53b2e777cb03152d2f70
Author: hachikuji 
Date:   2017-02-28T20:48:56Z

Add a few missing test cases (#137)

commit a4bd61da3ca052c99290feb61d8a1bb4e0b5995e
Author: Jason Gustafson 
Date:   2017-02-28T23:43:39Z

Some minor tweaks from looking over the code




> Message format changes for idempotent/transactional producer
> 
>
> Key: KAFKA-4816
> URL: https://issues.apache.org/jira/browse/KAFKA-4816
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, core, producer 
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
> Fix For: 0.11.0.0
>
>
> This task is for the implementation of the message format changes documented 
> here: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging#KIP-98-ExactlyOnceDeliveryandTransactionalMessaging-MessageFormat.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2614: KAFKA-4816: Message format changes for idempotent/...

2017-02-28 Thread hachikuji
GitHub user hachikuji opened a pull request:

https://github.com/apache/kafka/pull/2614

KAFKA-4816: Message format changes for idempotent/transactional producer



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/confluentinc/kafka exactly-once-message-format

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2614.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2614


commit dbedb762dd04784c410b51a38413210da0cf9529
Author: Jason Gustafson 
Date:   2016-11-18T16:41:27Z

EoS Message Format Changes

commit 780880f24e31fe0b16a71be563b7914fc66dc58d
Author: hachikuji 
Date:   2017-02-15T05:09:44Z

Support variable length integer types in the new message format (#127)

commit 67b72a788003260265e87813ea08908ae163485a
Author: hachikuji 
Date:   2017-02-19T19:27:46Z

Add relative timestamps to new message format (#128)

commit 155e2e6b6c8c3d03187e3fd828fd04c194401ca1
Author: hachikuji 
Date:   2017-02-22T23:00:22Z

Implement full (stubbed out) fetch and produce request schema (#130)

commit 3e92c0c54a0334d73a9876c8c49a7ac4e1a77e87
Author: hachikuji 
Date:   2017-02-23T23:25:28Z

Add basic support for control messages (#132)

commit ac20e0630a3738ee86f5a2b84fc7624fad7c8345
Author: hachikuji 
Date:   2017-02-27T19:32:13Z

Add transactional flag support (#135)

commit 04909edd7079e026663a4da937aae4fb61741b27
Author: hachikuji 
Date:   2017-02-27T22:29:04Z

Add leader epoch for KIP-101 and no longer use attributes for null 
key/value (#136)

commit d31596034f331547f8967b565129957f5a8d5da1
Author: hachikuji 
Date:   2017-02-28T00:50:20Z

Support old produce request versions which use older magic values (#131)

commit a5681ac1db5a72184fcc53b2e777cb03152d2f70
Author: hachikuji 
Date:   2017-02-28T20:48:56Z

Add a few missing test cases (#137)

commit a4bd61da3ca052c99290feb61d8a1bb4e0b5995e
Author: Jason Gustafson 
Date:   2017-02-28T23:43:39Z

Some minor tweaks from looking over the code




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4095) When a topic is deleted and then created with the same name, 'committed' offsets are not reset

2017-02-28 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15889127#comment-15889127
 ] 

Vahid Hashemian commented on KAFKA-4095:


[~jeffwidman] This JIRA applies to the old (ZooKeeper based) consumer, as you 
mentioned. But the old consumer is not yet deprecated yet. It is expected to 
officially become deprecated in the next release though 
([KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-109%3A+Old+Consumer+Deprecation]).

> When a topic is deleted and then created with the same name, 'committed' 
> offsets are not reset
> --
>
> Key: KAFKA-4095
> URL: https://issues.apache.org/jira/browse/KAFKA-4095
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.9.0.1, 0.10.0.0
>Reporter: Alex Glikson
>Assignee: Vahid Hashemian
>
> I encountered a very strange behavior of Kafka, which seems to be a bug.
> After deleting a topic and re-creating it with the same name, I produced 
> certain amount of new messages, and then opened a consumer with the same ID 
> that I used before re-creating the topic (with auto.commit=false, 
> auto.offset.reset=earliest). While the latest offsets seemed up to date, the 
> *committed* offset (returned by committed() method) was an *old* offset, from 
> the time before the topic has been deleted and created.
> I would have assumed that when a topic is deleted, all the associated 
> topic-partitions and consumer groups are recycled too.
> I am using the Java client version 0.9, with Kafka server 0.10.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4773) The Kafka build should run findbugs

2017-02-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15889094#comment-15889094
 ] 

ASF GitHub Bot commented on KAFKA-4773:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2557


> The Kafka build should run findbugs
> ---
>
> Key: KAFKA-4773
> URL: https://issues.apache.org/jira/browse/KAFKA-4773
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
> Fix For: 0.10.3.0
>
>
> The Kafka build should run findbugs to find issues that can easily be caught 
> by static analysis.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (KAFKA-4773) The Kafka build should run findbugs

2017-02-28 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-4773.

   Resolution: Fixed
Fix Version/s: 0.10.3.0

Issue resolved by pull request 2557
[https://github.com/apache/kafka/pull/2557]

> The Kafka build should run findbugs
> ---
>
> Key: KAFKA-4773
> URL: https://issues.apache.org/jira/browse/KAFKA-4773
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
> Fix For: 0.10.3.0
>
>
> The Kafka build should run findbugs to find issues that can easily be caught 
> by static analysis.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4095) When a topic is deleted and then created with the same name, 'committed' offsets are not reset

2017-02-28 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15889093#comment-15889093
 ] 

Jeff Widman commented on KAFKA-4095:


Wasn't this solved by KAFKA-2000?

At least for the new consumers which store their offsets in Kafka... if they 
store offsets in zookeeper that isn't updated, but that's been deprecated since 
Kafka 0.8.2 so probably not worth fixing at this point.

> When a topic is deleted and then created with the same name, 'committed' 
> offsets are not reset
> --
>
> Key: KAFKA-4095
> URL: https://issues.apache.org/jira/browse/KAFKA-4095
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.9.0.1, 0.10.0.0
>Reporter: Alex Glikson
>Assignee: Vahid Hashemian
>
> I encountered a very strange behavior of Kafka, which seems to be a bug.
> After deleting a topic and re-creating it with the same name, I produced 
> certain amount of new messages, and then opened a consumer with the same ID 
> that I used before re-creating the topic (with auto.commit=false, 
> auto.offset.reset=earliest). While the latest offsets seemed up to date, the 
> *committed* offset (returned by committed() method) was an *old* offset, from 
> the time before the topic has been deleted and created.
> I would have assumed that when a topic is deleted, all the associated 
> topic-partitions and consumer groups are recycled too.
> I am using the Java client version 0.9, with Kafka server 0.10.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2557: KAFKA-4773: The Kafka build should run findbugs

2017-02-28 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2557


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #2594: MINOR: add code quality checks to checkstyle.xml. ...

2017-02-28 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2594


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (KAFKA-4819) Expose states of active tasks to public API

2017-02-28 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-4819:
--

Assignee: Florian Hussonnois

> Expose states of active tasks to public API
> ---
>
> Key: KAFKA-4819
> URL: https://issues.apache.org/jira/browse/KAFKA-4819
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Florian Hussonnois
>Assignee: Florian Hussonnois
>Priority: Minor
>  Labels: needs-kip
>
> In Kafka 0.10.1.0 the toString method of KafkaStreams class has been 
> implemented mainly to ease topologies debugging. Also,  the streams Metrics 
> has been exposed to public API.
> But currently theres is no way to monitor kstreams tasks states, assignments 
> or consumed offsets.
> I propose to expose the states of active tasks to the public API KafkaStreams.
> For instance, an application can expose a REST API to get the global state of 
> a kstreams topology.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4819) Expose states of active tasks to public API

2017-02-28 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15889006#comment-15889006
 ] 

Matthias J. Sax commented on KAFKA-4819:


[~fhussonnois] Thanks for the JIRA an PR. All public API changes require a KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals

> Expose states of active tasks to public API
> ---
>
> Key: KAFKA-4819
> URL: https://issues.apache.org/jira/browse/KAFKA-4819
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Florian Hussonnois
>Priority: Minor
>  Labels: needs-kip
>
> In Kafka 0.10.1.0 the toString method of KafkaStreams class has been 
> implemented mainly to ease topologies debugging. Also,  the streams Metrics 
> has been exposed to public API.
> But currently theres is no way to monitor kstreams tasks states, assignments 
> or consumed offsets.
> I propose to expose the states of active tasks to the public API KafkaStreams.
> For instance, an application can expose a REST API to get the global state of 
> a kstreams topology.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4819) Expose states of active tasks to public API

2017-02-28 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4819:
---
Labels: needs-kip  (was: )

> Expose states of active tasks to public API
> ---
>
> Key: KAFKA-4819
> URL: https://issues.apache.org/jira/browse/KAFKA-4819
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Florian Hussonnois
>Priority: Minor
>  Labels: needs-kip
>
> In Kafka 0.10.1.0 the toString method of KafkaStreams class has been 
> implemented mainly to ease topologies debugging. Also,  the streams Metrics 
> has been exposed to public API.
> But currently theres is no way to monitor kstreams tasks states, assignments 
> or consumed offsets.
> I propose to expose the states of active tasks to the public API KafkaStreams.
> For instance, an application can expose a REST API to get the global state of 
> a kstreams topology.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-112: Handle disk failure for JBOD

2017-02-28 Thread Dong Lin
Hey Jun,

I just realized that StopReplicaRequest itself doesn't specify the
replicaId in the wire protocol. Thus controller would need to log the
brokerId with StopReplicaRequest in the log. Thus it may be
reasonable for controller to do the same with LeaderAndIsrRequest and only
specify the isNewReplica for the broker that receives LeaderAndIsrRequest.

Thanks,
Dong

On Tue, Feb 28, 2017 at 2:14 PM, Dong Lin  wrote:

> Hi Jun,
>
> Yeah there is tradeoff between controller's implementation complexity vs.
> wire-protocol complexity. I personally think it is more important to keep
> wire-protocol concise and only add information in wire-protocol if
> necessary. It seems fine to add a little bit complexity to controller's
> implementation, e.g. log destination broker per LeaderAndIsrRequet. Becket
> also shares this opinion with me. Is the only purpose of doing so to make
> controller log simpler?
>
> And certainly, I have added Todd's comment in the wiki.
>
> Thanks,
> Dong
>
>
> On Tue, Feb 28, 2017 at 1:37 PM, Jun Rao  wrote:
>
>> Hi, Dong,
>>
>> 52. What you suggested would work. However, I am thinking that it's
>> probably simpler to just set isNewReplica at the replica level. That way,
>> the LeaderAndIsrRequest can be created a bit simpler. When reading a
>> LeaderAndIsrRequest in the controller log, it's easier to see which
>> replicas are new without looking at which broker the request is intended
>> for.
>>
>> Could you also add those additional points from Todd's on 1 broker per
>> disk
>> vs JBOD vs RAID5/6 to the KIP?
>>
>> Thanks,
>>
>> Hi, Todd,
>>
>> Thanks for the feedback. That's very useful.
>>
>> Jun
>>
>> On Tue, Feb 28, 2017 at 10:25 AM, Dong Lin  wrote:
>>
>> > Hey Jun,
>> >
>> > Certainly, I have added Todd to reply to the thread. And I have updated
>> the
>> > item to in the wiki.
>> >
>> > 50. The full statement is "Broker assumes a log directory to be good
>> after
>> > it starts, and mark log directory as bad once there is IOException when
>> > broker attempts to access (i.e. read or write) the log directory". This
>> > statement seems reasonable, right? If a log directory is actually bad,
>> then
>> > the broker will first assume it is OK, try to read logs on this log
>> > directory, encounter IOException, and then mark it as bad.
>> >
>> > 51. My bad. I thought I removed it but I didn't. It is removed now.
>> >
>> > 52. I don't think so.. The isNewReplica field in the
>> LeaderAndIsrRequest is
>> > only relevant to the replica (i.e. broker) that receives the
>> > LeaderAndIsrRequest. There is no need to specify whether each replica is
>> > new inside LeaderAndIsrRequest. In other words, if a broker sends
>> > LeaderAndIsrRequest to three different replicas of a given partition,
>> the
>> > isNewReplica field can be different across these three requests.
>> >
>> > Yeah, I would definitely want to start discussion on KIP-113 after we
>> have
>> > reached agreement on KIP-112. I have actually opened KIP-113 discussion
>> > thread on 1/12 together with this thread. I have yet to add the ability
>> to
>> > list offline directories in KIP-113 which we discussed in this thread.
>> >
>> > Thanks for all your reviews! Is there further concern with the latest
>> KIP?
>> >
>> > Thanks!
>> > Dong
>> >
>> > On Tue, Feb 28, 2017 at 9:40 AM, Jun Rao  wrote:
>> >
>> > > Hi, Dong,
>> > >
>> > > RAID6 is an improvement over RAID5 and can tolerate 2 disks failure.
>> > Eno's
>> > > point is that the rebuild of RAID5/RAID6 requires reading more data
>> > > compared with RAID10, which increases the probability of error during
>> > > rebuild. This makes sense. In any case, do you think you could ask the
>> > SREs
>> > > at LinkedIn to share their opinions on RAID5/RAID6?
>> > >
>> > > Yes, when a replica is offline due to a bad disk, it makes sense to
>> > handle
>> > > it immediately as if a StopReplicaRequest is received (i.e., replica
>> is
>> > no
>> > > longer considered a leader and is removed from any replica fetcher
>> > thread).
>> > > Could you add that detail in item 2. in the wiki?
>> > >
>> > > 50. The wiki says "Broker assumes a log directory to be good after it
>> > > starts" : A log directory actually could be bad during startup.
>> > >
>> > > 51. In item 4, the wiki says "The controller watches the path
>> > > /log_dir_event_notification for new znode.". This doesn't seem be
>> needed
>> > > now?
>> > >
>> > > 52. The isNewReplica field in LeaderAndIsrRequest should be for each
>> > > replica inside the replicas field, right?
>> > >
>> > > Other than those, the current KIP looks good to me. Do you want to
>> start
>> > a
>> > > separate discussion thread on KIP-113? I do have some comments there.
>> > >
>> > > Thanks for working on this!
>> > >
>> > > Jun
>> > >
>> > >
>> > > On Mon, Feb 27, 2017 at 5:51 PM, Dong Lin 
>> wrote:
>> > >
>> > > > Hi Jun,
>> > > >
>> > > > In addition to 

Re: [VOTE] KIP-111 Kafka should preserve the Principal generated by the PrincipalBuilder while processing the request received on socket channel, on the broker.

2017-02-28 Thread Mayuresh Gharat
Hi Jun,

Sure.
I had an offline discussion with Joel on how we can deprecate the
KafkaPrincipal from  Session and Authorizer.
I will update the KIP to see if we can address all the concerns here. If
not we can keep the KafkaPrincipal.

Thanks,

Mayuresh

On Tue, Feb 28, 2017 at 1:53 PM, Jun Rao  wrote:

> Hi, Joel,
>
> Good point on the getAcls() method. KafkaPrincipal is also tied to ACL,
> which is used in pretty much every method in Authorizer. Now, I am not sure
> if it's easy to deprecate KafkaPrincipal.
>
> Hi, Mayuresh,
>
> Given the above, it seems that the easiest thing is to add a new Principal
> field in Session. We want to make it clear that it's ignored in the default
> implementation, but a customizer authorizer could take advantage of that.
>
> Thanks,
>
> Jun
>
> On Tue, Feb 28, 2017 at 10:52 AM, Joel Koshy  wrote:
>
> > If we deprecate KafkaPrincipal, then the Authorizer interface will also
> > need to change - i.e., deprecate the getAcls(KafkaPrincipal) method.
> >
> > On Tue, Feb 28, 2017 at 10:11 AM, Mayuresh Gharat <
> > gharatmayures...@gmail.com> wrote:
> >
> > > Hi Jun/Ismael,
> > >
> > > Thanks for the comments.
> > >
> > > I agree.
> > > What I was thinking was, we get the KIP passed now and wait till major
> > > kafka version release. We can then make this change, but for now we can
> > > wait. Does that work?
> > >
> > > If there are concerns, we can make the addition of extra field of type
> > > Principal to Session and then deprecate the KafkaPrincipal later.
> > >
> > > I am fine either ways. What do you think?
> > >
> > > Thanks,
> > >
> > > Mayuresh
> > >
> > > On Tue, Feb 28, 2017 at 9:53 AM, Jun Rao  wrote:
> > >
> > > > Hi, Ismael,
> > > >
> > > > Good point on compatibility.
> > > >
> > > > Hi, Mayuresh,
> > > >
> > > > Given that, it seems that it's better to just add the raw principal
> as
> > a
> > > > new field in Session for now and deprecate the KafkaPrincipal field
> in
> > > the
> > > > future if needed?
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Mon, Feb 27, 2017 at 5:05 PM, Ismael Juma 
> > wrote:
> > > >
> > > > > Breaking clients without a deprecation period is something we only
> do
> > > as
> > > > a
> > > > > last resort. Is there strong justification for doing it here?
> > > > >
> > > > > Ismael
> > > > >
> > > > > On Mon, Feb 27, 2017 at 11:28 PM, Mayuresh Gharat <
> > > > > gharatmayures...@gmail.com> wrote:
> > > > >
> > > > > > Hi Ismael,
> > > > > >
> > > > > > Yeah. I agree that it might break the clients if the user is
> using
> > > the
> > > > > > kafkaPrincipal directly. But since KafkaPrincipal is also a Java
> > > > > Principal
> > > > > > and I think, it would be a right thing to do replace the
> > > kafkaPrincipal
> > > > > > with Java Principal at this stage than later.
> > > > > >
> > > > > > We can mention in the KIP, that it would break the clients that
> are
> > > > using
> > > > > > the KafkaPrincipal directly and they will have to use the
> > > PrincipalType
> > > > > > directly, if they are using it as its only one value and use the
> > name
> > > > > from
> > > > > > the Principal directly or create a KafkaPrincipal from Java
> > Principal
> > > > as
> > > > > we
> > > > > > are doing in SimpleAclAuthorizer with this KIP.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Mayuresh
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Mon, Feb 27, 2017 at 10:56 AM, Ismael Juma  >
> > > > wrote:
> > > > > >
> > > > > > > Hi Mayuresh,
> > > > > > >
> > > > > > > Sorry for the delay. The updated KIP states that there is no
> > > > > > compatibility
> > > > > > > impact, but that doesn't seem right. The fact that we changed
> the
> > > > type
> > > > > of
> > > > > > > Session.principal to `Principal` means that any code that
> expects
> > > it
> > > > to
> > > > > > be
> > > > > > > `KafkaPrincipal` will break. Either because of declared types
> > > > (likely)
> > > > > or
> > > > > > > if it accesses `getPrincipalType` (unlikely since the value is
> > > always
> > > > > the
> > > > > > > same). It's a bit annoying, but we should add a new field to
> > > > `Session`
> > > > > > with
> > > > > > > the original principal. We can potentially deprecate the
> existing
> > > > one,
> > > > > if
> > > > > > > we're sure we don't need it (or we can leave it for now).
> > > > > > >
> > > > > > > Ismael
> > > > > > >
> > > > > > > On Mon, Feb 27, 2017 at 6:40 PM, Mayuresh Gharat <
> > > > > > > gharatmayures...@gmail.com
> > > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Ismael, Joel, Becket
> > > > > > > >
> > > > > > > > Would you mind taking a look at this. We require 2 more
> binding
> > > > votes
> > > > > > for
> > > > > > > > the KIP to pass.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Mayuresh
> > > > > > > >
> > > > > > > > On Thu, Feb 23, 2017 at 10:57 AM, Dong Lin <
> > 

[GitHub] kafka pull request #2613: MINOR. Fix tests/docker/Dockerfile

2017-02-28 Thread cmccabe
GitHub user cmccabe opened a pull request:

https://github.com/apache/kafka/pull/2613

MINOR. Fix tests/docker/Dockerfile

Fix tests/docker/Dockerfile to put the old Kafka distributions in the
correct spot for tests.  Also, run_tests.sh should exit with an error
code if image rebuilding fails, rather than silently falling back to an
older image.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cmccabe/kafka dockerfix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2613.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2613


commit 8d66d7cdeb9130bf7728a6321eaeb018f916a3aa
Author: Colin P. Mccabe 
Date:   2017-02-28T22:29:06Z

MINOR. Fix tests/docker/Dockerfile

Fix tests/docker/Dockerfile to put the old Kafka distributions in the
correct spot for tests.  Also, run_tests.sh should exit with an error
code if image rebuilding fails, rather than silently falling back to an
older image.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-112: Handle disk failure for JBOD

2017-02-28 Thread Dong Lin
Hi Jun,

Yeah there is tradeoff between controller's implementation complexity vs.
wire-protocol complexity. I personally think it is more important to keep
wire-protocol concise and only add information in wire-protocol if
necessary. It seems fine to add a little bit complexity to controller's
implementation, e.g. log destination broker per LeaderAndIsrRequet. Becket
also shares this opinion with me. Is the only purpose of doing so to make
controller log simpler?

And certainly, I have added Todd's comment in the wiki.

Thanks,
Dong


On Tue, Feb 28, 2017 at 1:37 PM, Jun Rao  wrote:

> Hi, Dong,
>
> 52. What you suggested would work. However, I am thinking that it's
> probably simpler to just set isNewReplica at the replica level. That way,
> the LeaderAndIsrRequest can be created a bit simpler. When reading a
> LeaderAndIsrRequest in the controller log, it's easier to see which
> replicas are new without looking at which broker the request is intended
> for.
>
> Could you also add those additional points from Todd's on 1 broker per disk
> vs JBOD vs RAID5/6 to the KIP?
>
> Thanks,
>
> Hi, Todd,
>
> Thanks for the feedback. That's very useful.
>
> Jun
>
> On Tue, Feb 28, 2017 at 10:25 AM, Dong Lin  wrote:
>
> > Hey Jun,
> >
> > Certainly, I have added Todd to reply to the thread. And I have updated
> the
> > item to in the wiki.
> >
> > 50. The full statement is "Broker assumes a log directory to be good
> after
> > it starts, and mark log directory as bad once there is IOException when
> > broker attempts to access (i.e. read or write) the log directory". This
> > statement seems reasonable, right? If a log directory is actually bad,
> then
> > the broker will first assume it is OK, try to read logs on this log
> > directory, encounter IOException, and then mark it as bad.
> >
> > 51. My bad. I thought I removed it but I didn't. It is removed now.
> >
> > 52. I don't think so.. The isNewReplica field in the LeaderAndIsrRequest
> is
> > only relevant to the replica (i.e. broker) that receives the
> > LeaderAndIsrRequest. There is no need to specify whether each replica is
> > new inside LeaderAndIsrRequest. In other words, if a broker sends
> > LeaderAndIsrRequest to three different replicas of a given partition, the
> > isNewReplica field can be different across these three requests.
> >
> > Yeah, I would definitely want to start discussion on KIP-113 after we
> have
> > reached agreement on KIP-112. I have actually opened KIP-113 discussion
> > thread on 1/12 together with this thread. I have yet to add the ability
> to
> > list offline directories in KIP-113 which we discussed in this thread.
> >
> > Thanks for all your reviews! Is there further concern with the latest
> KIP?
> >
> > Thanks!
> > Dong
> >
> > On Tue, Feb 28, 2017 at 9:40 AM, Jun Rao  wrote:
> >
> > > Hi, Dong,
> > >
> > > RAID6 is an improvement over RAID5 and can tolerate 2 disks failure.
> > Eno's
> > > point is that the rebuild of RAID5/RAID6 requires reading more data
> > > compared with RAID10, which increases the probability of error during
> > > rebuild. This makes sense. In any case, do you think you could ask the
> > SREs
> > > at LinkedIn to share their opinions on RAID5/RAID6?
> > >
> > > Yes, when a replica is offline due to a bad disk, it makes sense to
> > handle
> > > it immediately as if a StopReplicaRequest is received (i.e., replica is
> > no
> > > longer considered a leader and is removed from any replica fetcher
> > thread).
> > > Could you add that detail in item 2. in the wiki?
> > >
> > > 50. The wiki says "Broker assumes a log directory to be good after it
> > > starts" : A log directory actually could be bad during startup.
> > >
> > > 51. In item 4, the wiki says "The controller watches the path
> > > /log_dir_event_notification for new znode.". This doesn't seem be
> needed
> > > now?
> > >
> > > 52. The isNewReplica field in LeaderAndIsrRequest should be for each
> > > replica inside the replicas field, right?
> > >
> > > Other than those, the current KIP looks good to me. Do you want to
> start
> > a
> > > separate discussion thread on KIP-113? I do have some comments there.
> > >
> > > Thanks for working on this!
> > >
> > > Jun
> > >
> > >
> > > On Mon, Feb 27, 2017 at 5:51 PM, Dong Lin  wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > In addition to the Eno's reference of why rebuild time with RAID-5 is
> > > more
> > > > expensive, another concern is that RAID-5 will fail if more than one
> > disk
> > > > fails. JBOD is still works with 1+ disk failure and has better
> > > performance
> > > > with one disk failure. These seems like good argument for using JBOD
> > > > instead of RAID-5.
> > > >
> > > > If a leader replica goes offline, the broker should first take all
> > > actions
> > > > (i.e. remove the partition from fetcher thread) as if it has received
> > > > StopReplicaRequest for this partition because the replica can 

[jira] [Commented] (KAFKA-4738) Remove generic type of class ClientState

2017-02-28 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15888950#comment-15888950
 ] 

Matthias J. Sax commented on KAFKA-4738:


[~sharad.develop] Your PR shows 81 commits. We cannot review/merge it like 
this. Please rebase your PR to current trunk to get rid of all the duplicate 
commits.

> Remove generic type of class ClientState
> 
>
> Key: KAFKA-4738
> URL: https://issues.apache.org/jira/browse/KAFKA-4738
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Sharad
>Priority: Minor
>  Labels: beginner, newbie
>
> Currently, class 
> {{org.apache.kafka.streams.processor.internals.assignment.ClientState}} 
> uses a generic type. However, within actual Streams code base the type will 
> always be {{TaskId}} (from package {{org.apache.kafka.streams.processor}}).
> Thus, this ticket is about removing the generic type and replace it with 
> {{TaskId}}, to simplify the code base.
> There are some tests, that use {{ClientState}} (what allows for a 
> slightly simplified test setup).  Those tests need to be updated to work 
> properly using {{TaskId}} instead of {{Integer}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4789) ProcessorTopologyTestDriver does not forward extracted timestamps to internal topics

2017-02-28 Thread Hamidreza Afzali (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15888941#comment-15888941
 ] 

Hamidreza Afzali commented on KAFKA-4789:
-

Thanks!

> ProcessorTopologyTestDriver does not forward extracted timestamps to internal 
> topics
> 
>
> Key: KAFKA-4789
> URL: https://issues.apache.org/jira/browse/KAFKA-4789
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Hamidreza Afzali
>Assignee: Hamidreza Afzali
>  Labels: unit-test
> Fix For: 0.10.3.0
>
>
> *Problem:*
> When using ProcessorTopologyTestDriver, the extracted timestamp is not 
> forwarded with the produced record to the internal topics.
> *Example:*
> {code}
> object Topology1 {
>   def main(args: Array[String]): Unit = {
> val inputTopic = "input"
> val outputTopic = "output"
> val stateStore = "count"
> val inputs = Seq[(String, Integer)](("A@145000", 1), ("B@145000", 
> 2))
> val props = new Properties
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString)
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
> props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
> classOf[MyTimestampExtractor].getName)
> val windowedStringSerde = Serdes.serdeFrom(new 
> WindowedSerializer(Serdes.String.serializer),
>   new WindowedDeserializer(Serdes.String.deserializer))
> val builder = new KStreamBuilder
> builder.stream(Serdes.String, Serdes.Integer, inputTopic)
>   .map[String, Integer]((k, v) => new KeyValue(k.split("@")(0), v))
>   .groupByKey(Serdes.String, Serdes.Integer)
>   .count(TimeWindows.of(1000L), stateStore)
>   .to(windowedStringSerde, Serdes.Long, outputTopic)
> val driver = new ProcessorTopologyTestDriver(new StreamsConfig(props), 
> builder, stateStore)
> inputs.foreach {
>   case (key, value) => {
> driver.process(inputTopic, key, value, Serdes.String.serializer, 
> Serdes.Integer.serializer)
> val record = driver.readOutput(outputTopic, 
> Serdes.String.deserializer, Serdes.Long.deserializer)
> println(record)
>   }
> }
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[DISCUSS] KIP-129: Kafka Streams Exactly-Once Semantics

2017-02-28 Thread Guozhang Wang
Hi all,

I have just created KIP-129 to leverage KIP-98 in Kafka Streams and provide
exactly-once processing semantics:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-129%3A+Streams+Exactly-Once+Semantics

This KIP enables Streams users to optionally turn on exactly-once
processing semantics without changing their app code at all by leveraging
the transactional messaging features provided in KIP-98.

The above wiki page provides a high-level view of the proposed changes,
while detailed implementation design can be found in this Google doc:

https://docs.google.com/document/d/1pGZ8xtOOyGwDYgH5vA6h19zOMMaduFK1DAB8_gBYA2c

We would love to hear your comments and suggestions.

Thanks,
-- Guozhang


Re: [VOTE] KIP-111 Kafka should preserve the Principal generated by the PrincipalBuilder while processing the request received on socket channel, on the broker.

2017-02-28 Thread Jun Rao
Hi, Joel,

Good point on the getAcls() method. KafkaPrincipal is also tied to ACL,
which is used in pretty much every method in Authorizer. Now, I am not sure
if it's easy to deprecate KafkaPrincipal.

Hi, Mayuresh,

Given the above, it seems that the easiest thing is to add a new Principal
field in Session. We want to make it clear that it's ignored in the default
implementation, but a customizer authorizer could take advantage of that.

Thanks,

Jun

On Tue, Feb 28, 2017 at 10:52 AM, Joel Koshy  wrote:

> If we deprecate KafkaPrincipal, then the Authorizer interface will also
> need to change - i.e., deprecate the getAcls(KafkaPrincipal) method.
>
> On Tue, Feb 28, 2017 at 10:11 AM, Mayuresh Gharat <
> gharatmayures...@gmail.com> wrote:
>
> > Hi Jun/Ismael,
> >
> > Thanks for the comments.
> >
> > I agree.
> > What I was thinking was, we get the KIP passed now and wait till major
> > kafka version release. We can then make this change, but for now we can
> > wait. Does that work?
> >
> > If there are concerns, we can make the addition of extra field of type
> > Principal to Session and then deprecate the KafkaPrincipal later.
> >
> > I am fine either ways. What do you think?
> >
> > Thanks,
> >
> > Mayuresh
> >
> > On Tue, Feb 28, 2017 at 9:53 AM, Jun Rao  wrote:
> >
> > > Hi, Ismael,
> > >
> > > Good point on compatibility.
> > >
> > > Hi, Mayuresh,
> > >
> > > Given that, it seems that it's better to just add the raw principal as
> a
> > > new field in Session for now and deprecate the KafkaPrincipal field in
> > the
> > > future if needed?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Mon, Feb 27, 2017 at 5:05 PM, Ismael Juma 
> wrote:
> > >
> > > > Breaking clients without a deprecation period is something we only do
> > as
> > > a
> > > > last resort. Is there strong justification for doing it here?
> > > >
> > > > Ismael
> > > >
> > > > On Mon, Feb 27, 2017 at 11:28 PM, Mayuresh Gharat <
> > > > gharatmayures...@gmail.com> wrote:
> > > >
> > > > > Hi Ismael,
> > > > >
> > > > > Yeah. I agree that it might break the clients if the user is using
> > the
> > > > > kafkaPrincipal directly. But since KafkaPrincipal is also a Java
> > > > Principal
> > > > > and I think, it would be a right thing to do replace the
> > kafkaPrincipal
> > > > > with Java Principal at this stage than later.
> > > > >
> > > > > We can mention in the KIP, that it would break the clients that are
> > > using
> > > > > the KafkaPrincipal directly and they will have to use the
> > PrincipalType
> > > > > directly, if they are using it as its only one value and use the
> name
> > > > from
> > > > > the Principal directly or create a KafkaPrincipal from Java
> Principal
> > > as
> > > > we
> > > > > are doing in SimpleAclAuthorizer with this KIP.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Mayuresh
> > > > >
> > > > >
> > > > >
> > > > > On Mon, Feb 27, 2017 at 10:56 AM, Ismael Juma 
> > > wrote:
> > > > >
> > > > > > Hi Mayuresh,
> > > > > >
> > > > > > Sorry for the delay. The updated KIP states that there is no
> > > > > compatibility
> > > > > > impact, but that doesn't seem right. The fact that we changed the
> > > type
> > > > of
> > > > > > Session.principal to `Principal` means that any code that expects
> > it
> > > to
> > > > > be
> > > > > > `KafkaPrincipal` will break. Either because of declared types
> > > (likely)
> > > > or
> > > > > > if it accesses `getPrincipalType` (unlikely since the value is
> > always
> > > > the
> > > > > > same). It's a bit annoying, but we should add a new field to
> > > `Session`
> > > > > with
> > > > > > the original principal. We can potentially deprecate the existing
> > > one,
> > > > if
> > > > > > we're sure we don't need it (or we can leave it for now).
> > > > > >
> > > > > > Ismael
> > > > > >
> > > > > > On Mon, Feb 27, 2017 at 6:40 PM, Mayuresh Gharat <
> > > > > > gharatmayures...@gmail.com
> > > > > > > wrote:
> > > > > >
> > > > > > > Hi Ismael, Joel, Becket
> > > > > > >
> > > > > > > Would you mind taking a look at this. We require 2 more binding
> > > votes
> > > > > for
> > > > > > > the KIP to pass.
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Mayuresh
> > > > > > >
> > > > > > > On Thu, Feb 23, 2017 at 10:57 AM, Dong Lin <
> lindon...@gmail.com>
> > > > > wrote:
> > > > > > >
> > > > > > > > +1 (non-binding)
> > > > > > > >
> > > > > > > > On Wed, Feb 22, 2017 at 10:52 PM, Manikumar <
> > > > > manikumar.re...@gmail.com
> > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > +1 (non-binding)
> > > > > > > > >
> > > > > > > > > On Thu, Feb 23, 2017 at 3:27 AM, Mayuresh Gharat <
> > > > > > > > > gharatmayures...@gmail.com
> > > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Jun,
> > > > > > > > > >
> > > > > > > > > > Thanks a lot for the comments and reviews.
> > > > > > > > > > I agree we should log the username.
> > > > > > > > > > 

[jira] [Commented] (KAFKA-4819) Expose states of active tasks to public API

2017-02-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15888916#comment-15888916
 ] 

ASF GitHub Bot commented on KAFKA-4819:
---

GitHub user fhussonnois opened a pull request:

https://github.com/apache/kafka/pull/2612

KAFKA-4819: Expose states for active tasks to public API

Simple implementation of the feature : 
[KAFKA-4819](https://issues.apache.org/jira/browse/KAFKA-4819)
 KAFKA-4819

This PR adds a new method `threadStates` to public API of `KafkaStreams` 
which returns all currently states of running threads and active tasks.

Below is a example for a simple topology consuming from topics; test-p2 and 
test-p4.

[{"name":"StreamThread-1","state":"RUNNING","activeTasks":[{"id":"0_0", 
"assignments":["test-p4-0","test-p2-0"], 
"consumedOffsetsByPartition":[{"topicPartition":"test-p2-0","offset":"test-p2-0"}]},
 {"id":"0_2", "assignments":["test-p4-2"], "consumedOffsetsByPartition":[]}]}, 
{"name":"StreamThread-2","state":"RUNNING","activeTasks":[{"id":"0_1", 
"assignments":["test-p4-1","test-p2-1"], 
"consumedOffsetsByPartition":[{"topicPartition":"test-p2-1","offset":"test-p2-1"}]},
 {"id":"0_3", "assignments":["test-p4-3"], "consumedOffsetsByPartition":[]}]}]

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fhussonnois/kafka KAFKA-4819

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2612.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2612


commit 0f8b8123cabdbfcfb44fe59b9be20e13ac253c95
Author: Florian Hussonnois 
Date:   2017-02-23T22:08:01Z

KAFKA-4819: Expose states for active tasks to public API




> Expose states of active tasks to public API
> ---
>
> Key: KAFKA-4819
> URL: https://issues.apache.org/jira/browse/KAFKA-4819
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Florian Hussonnois
>Priority: Minor
>
> In Kafka 0.10.1.0 the toString method of KafkaStreams class has been 
> implemented mainly to ease topologies debugging. Also,  the streams Metrics 
> has been exposed to public API.
> But currently theres is no way to monitor kstreams tasks states, assignments 
> or consumed offsets.
> I propose to expose the states of active tasks to the public API KafkaStreams.
> For instance, an application can expose a REST API to get the global state of 
> a kstreams topology.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2612: KAFKA-4819: Expose states for active tasks to publ...

2017-02-28 Thread fhussonnois
GitHub user fhussonnois opened a pull request:

https://github.com/apache/kafka/pull/2612

KAFKA-4819: Expose states for active tasks to public API

Simple implementation of the feature : 
[KAFKA-4819](https://issues.apache.org/jira/browse/KAFKA-4819)
 KAFKA-4819

This PR adds a new method `threadStates` to public API of `KafkaStreams` 
which returns all currently states of running threads and active tasks.

Below is a example for a simple topology consuming from topics; test-p2 and 
test-p4.

[{"name":"StreamThread-1","state":"RUNNING","activeTasks":[{"id":"0_0", 
"assignments":["test-p4-0","test-p2-0"], 
"consumedOffsetsByPartition":[{"topicPartition":"test-p2-0","offset":"test-p2-0"}]},
 {"id":"0_2", "assignments":["test-p4-2"], "consumedOffsetsByPartition":[]}]}, 
{"name":"StreamThread-2","state":"RUNNING","activeTasks":[{"id":"0_1", 
"assignments":["test-p4-1","test-p2-1"], 
"consumedOffsetsByPartition":[{"topicPartition":"test-p2-1","offset":"test-p2-1"}]},
 {"id":"0_3", "assignments":["test-p4-3"], "consumedOffsetsByPartition":[]}]}]

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fhussonnois/kafka KAFKA-4819

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2612.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2612


commit 0f8b8123cabdbfcfb44fe59b9be20e13ac253c95
Author: Florian Hussonnois 
Date:   2017-02-23T22:08:01Z

KAFKA-4819: Expose states for active tasks to public API




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-112: Handle disk failure for JBOD

2017-02-28 Thread Jun Rao
Hi, Dong,

52. What you suggested would work. However, I am thinking that it's
probably simpler to just set isNewReplica at the replica level. That way,
the LeaderAndIsrRequest can be created a bit simpler. When reading a
LeaderAndIsrRequest in the controller log, it's easier to see which
replicas are new without looking at which broker the request is intended
for.

Could you also add those additional points from Todd's on 1 broker per disk
vs JBOD vs RAID5/6 to the KIP?

Thanks,

Hi, Todd,

Thanks for the feedback. That's very useful.

Jun

On Tue, Feb 28, 2017 at 10:25 AM, Dong Lin  wrote:

> Hey Jun,
>
> Certainly, I have added Todd to reply to the thread. And I have updated the
> item to in the wiki.
>
> 50. The full statement is "Broker assumes a log directory to be good after
> it starts, and mark log directory as bad once there is IOException when
> broker attempts to access (i.e. read or write) the log directory". This
> statement seems reasonable, right? If a log directory is actually bad, then
> the broker will first assume it is OK, try to read logs on this log
> directory, encounter IOException, and then mark it as bad.
>
> 51. My bad. I thought I removed it but I didn't. It is removed now.
>
> 52. I don't think so.. The isNewReplica field in the LeaderAndIsrRequest is
> only relevant to the replica (i.e. broker) that receives the
> LeaderAndIsrRequest. There is no need to specify whether each replica is
> new inside LeaderAndIsrRequest. In other words, if a broker sends
> LeaderAndIsrRequest to three different replicas of a given partition, the
> isNewReplica field can be different across these three requests.
>
> Yeah, I would definitely want to start discussion on KIP-113 after we have
> reached agreement on KIP-112. I have actually opened KIP-113 discussion
> thread on 1/12 together with this thread. I have yet to add the ability to
> list offline directories in KIP-113 which we discussed in this thread.
>
> Thanks for all your reviews! Is there further concern with the latest KIP?
>
> Thanks!
> Dong
>
> On Tue, Feb 28, 2017 at 9:40 AM, Jun Rao  wrote:
>
> > Hi, Dong,
> >
> > RAID6 is an improvement over RAID5 and can tolerate 2 disks failure.
> Eno's
> > point is that the rebuild of RAID5/RAID6 requires reading more data
> > compared with RAID10, which increases the probability of error during
> > rebuild. This makes sense. In any case, do you think you could ask the
> SREs
> > at LinkedIn to share their opinions on RAID5/RAID6?
> >
> > Yes, when a replica is offline due to a bad disk, it makes sense to
> handle
> > it immediately as if a StopReplicaRequest is received (i.e., replica is
> no
> > longer considered a leader and is removed from any replica fetcher
> thread).
> > Could you add that detail in item 2. in the wiki?
> >
> > 50. The wiki says "Broker assumes a log directory to be good after it
> > starts" : A log directory actually could be bad during startup.
> >
> > 51. In item 4, the wiki says "The controller watches the path
> > /log_dir_event_notification for new znode.". This doesn't seem be needed
> > now?
> >
> > 52. The isNewReplica field in LeaderAndIsrRequest should be for each
> > replica inside the replicas field, right?
> >
> > Other than those, the current KIP looks good to me. Do you want to start
> a
> > separate discussion thread on KIP-113? I do have some comments there.
> >
> > Thanks for working on this!
> >
> > Jun
> >
> >
> > On Mon, Feb 27, 2017 at 5:51 PM, Dong Lin  wrote:
> >
> > > Hi Jun,
> > >
> > > In addition to the Eno's reference of why rebuild time with RAID-5 is
> > more
> > > expensive, another concern is that RAID-5 will fail if more than one
> disk
> > > fails. JBOD is still works with 1+ disk failure and has better
> > performance
> > > with one disk failure. These seems like good argument for using JBOD
> > > instead of RAID-5.
> > >
> > > If a leader replica goes offline, the broker should first take all
> > actions
> > > (i.e. remove the partition from fetcher thread) as if it has received
> > > StopReplicaRequest for this partition because the replica can no longer
> > > work anyway. It will also respond with error to any ProduceRequest and
> > > FetchRequest for partition. The broker notifies controller by writing
> > > notification znode in ZK. The controller learns the disk failure event
> > from
> > > ZK, sends LeaderAndIsrRequest and receives LeaderAndIsrResponse to
> learn
> > > that the replica is offline. The controller will then elect new leader
> > for
> > > this partition and sends LeaderAndIsrRequest/MetadataUpdateRequest to
> > > relevant brokers. The broker should stop adjusting the ISR for this
> > > partition as if the broker is already offline. I am not sure there is
> any
> > > inconsistency in broker's behavior when it is leader or follower. Is
> > there
> > > any concern with this approach?
> > >
> > > Thanks for catching this. I have removed that reference 

[jira] [Commented] (KAFKA-4789) ProcessorTopologyTestDriver does not forward extracted timestamps to internal topics

2017-02-28 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15888903#comment-15888903
 ] 

Guozhang Wang commented on KAFKA-4789:
--

Thanks [~hrafzali] for the patch! I have added you to the contributor list so 
you can assign JIRAs to yourself in the future.

> ProcessorTopologyTestDriver does not forward extracted timestamps to internal 
> topics
> 
>
> Key: KAFKA-4789
> URL: https://issues.apache.org/jira/browse/KAFKA-4789
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Hamidreza Afzali
>Assignee: Hamidreza Afzali
>  Labels: unit-test
> Fix For: 0.10.3.0
>
>
> *Problem:*
> When using ProcessorTopologyTestDriver, the extracted timestamp is not 
> forwarded with the produced record to the internal topics.
> *Example:*
> {code}
> object Topology1 {
>   def main(args: Array[String]): Unit = {
> val inputTopic = "input"
> val outputTopic = "output"
> val stateStore = "count"
> val inputs = Seq[(String, Integer)](("A@145000", 1), ("B@145000", 
> 2))
> val props = new Properties
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString)
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
> props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
> classOf[MyTimestampExtractor].getName)
> val windowedStringSerde = Serdes.serdeFrom(new 
> WindowedSerializer(Serdes.String.serializer),
>   new WindowedDeserializer(Serdes.String.deserializer))
> val builder = new KStreamBuilder
> builder.stream(Serdes.String, Serdes.Integer, inputTopic)
>   .map[String, Integer]((k, v) => new KeyValue(k.split("@")(0), v))
>   .groupByKey(Serdes.String, Serdes.Integer)
>   .count(TimeWindows.of(1000L), stateStore)
>   .to(windowedStringSerde, Serdes.Long, outputTopic)
> val driver = new ProcessorTopologyTestDriver(new StreamsConfig(props), 
> builder, stateStore)
> inputs.foreach {
>   case (key, value) => {
> driver.process(inputTopic, key, value, Serdes.String.serializer, 
> Serdes.Integer.serializer)
> val record = driver.readOutput(outputTopic, 
> Serdes.String.deserializer, Serdes.Long.deserializer)
> println(record)
>   }
> }
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-4789) ProcessorTopologyTestDriver does not forward extracted timestamps to internal topics

2017-02-28 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reassigned KAFKA-4789:


Assignee: Hamidreza Afzali

> ProcessorTopologyTestDriver does not forward extracted timestamps to internal 
> topics
> 
>
> Key: KAFKA-4789
> URL: https://issues.apache.org/jira/browse/KAFKA-4789
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Hamidreza Afzali
>Assignee: Hamidreza Afzali
>  Labels: unit-test
> Fix For: 0.10.3.0
>
>
> *Problem:*
> When using ProcessorTopologyTestDriver, the extracted timestamp is not 
> forwarded with the produced record to the internal topics.
> *Example:*
> {code}
> object Topology1 {
>   def main(args: Array[String]): Unit = {
> val inputTopic = "input"
> val outputTopic = "output"
> val stateStore = "count"
> val inputs = Seq[(String, Integer)](("A@145000", 1), ("B@145000", 
> 2))
> val props = new Properties
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString)
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
> props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
> classOf[MyTimestampExtractor].getName)
> val windowedStringSerde = Serdes.serdeFrom(new 
> WindowedSerializer(Serdes.String.serializer),
>   new WindowedDeserializer(Serdes.String.deserializer))
> val builder = new KStreamBuilder
> builder.stream(Serdes.String, Serdes.Integer, inputTopic)
>   .map[String, Integer]((k, v) => new KeyValue(k.split("@")(0), v))
>   .groupByKey(Serdes.String, Serdes.Integer)
>   .count(TimeWindows.of(1000L), stateStore)
>   .to(windowedStringSerde, Serdes.Long, outputTopic)
> val driver = new ProcessorTopologyTestDriver(new StreamsConfig(props), 
> builder, stateStore)
> inputs.foreach {
>   case (key, value) => {
> driver.process(inputTopic, key, value, Serdes.String.serializer, 
> Serdes.Integer.serializer)
> val record = driver.readOutput(outputTopic, 
> Serdes.String.deserializer, Serdes.Long.deserializer)
> println(record)
>   }
> }
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4789) ProcessorTopologyTestDriver does not forward extracted timestamps to internal topics

2017-02-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15888901#comment-15888901
 ] 

ASF GitHub Bot commented on KAFKA-4789:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2590


> ProcessorTopologyTestDriver does not forward extracted timestamps to internal 
> topics
> 
>
> Key: KAFKA-4789
> URL: https://issues.apache.org/jira/browse/KAFKA-4789
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Hamidreza Afzali
>  Labels: unit-test
> Fix For: 0.10.3.0
>
>
> *Problem:*
> When using ProcessorTopologyTestDriver, the extracted timestamp is not 
> forwarded with the produced record to the internal topics.
> *Example:*
> {code}
> object Topology1 {
>   def main(args: Array[String]): Unit = {
> val inputTopic = "input"
> val outputTopic = "output"
> val stateStore = "count"
> val inputs = Seq[(String, Integer)](("A@145000", 1), ("B@145000", 
> 2))
> val props = new Properties
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString)
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
> props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
> classOf[MyTimestampExtractor].getName)
> val windowedStringSerde = Serdes.serdeFrom(new 
> WindowedSerializer(Serdes.String.serializer),
>   new WindowedDeserializer(Serdes.String.deserializer))
> val builder = new KStreamBuilder
> builder.stream(Serdes.String, Serdes.Integer, inputTopic)
>   .map[String, Integer]((k, v) => new KeyValue(k.split("@")(0), v))
>   .groupByKey(Serdes.String, Serdes.Integer)
>   .count(TimeWindows.of(1000L), stateStore)
>   .to(windowedStringSerde, Serdes.Long, outputTopic)
> val driver = new ProcessorTopologyTestDriver(new StreamsConfig(props), 
> builder, stateStore)
> inputs.foreach {
>   case (key, value) => {
> driver.process(inputTopic, key, value, Serdes.String.serializer, 
> Serdes.Integer.serializer)
> val record = driver.readOutput(outputTopic, 
> Serdes.String.deserializer, Serdes.Long.deserializer)
> println(record)
>   }
> }
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (KAFKA-4789) ProcessorTopologyTestDriver does not forward extracted timestamps to internal topics

2017-02-28 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-4789.
--
   Resolution: Fixed
Fix Version/s: 0.10.3.0

Issue resolved by pull request 2590
[https://github.com/apache/kafka/pull/2590]

> ProcessorTopologyTestDriver does not forward extracted timestamps to internal 
> topics
> 
>
> Key: KAFKA-4789
> URL: https://issues.apache.org/jira/browse/KAFKA-4789
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Hamidreza Afzali
>  Labels: unit-test
> Fix For: 0.10.3.0
>
>
> *Problem:*
> When using ProcessorTopologyTestDriver, the extracted timestamp is not 
> forwarded with the produced record to the internal topics.
> *Example:*
> {code}
> object Topology1 {
>   def main(args: Array[String]): Unit = {
> val inputTopic = "input"
> val outputTopic = "output"
> val stateStore = "count"
> val inputs = Seq[(String, Integer)](("A@145000", 1), ("B@145000", 
> 2))
> val props = new Properties
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString)
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
> props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
> classOf[MyTimestampExtractor].getName)
> val windowedStringSerde = Serdes.serdeFrom(new 
> WindowedSerializer(Serdes.String.serializer),
>   new WindowedDeserializer(Serdes.String.deserializer))
> val builder = new KStreamBuilder
> builder.stream(Serdes.String, Serdes.Integer, inputTopic)
>   .map[String, Integer]((k, v) => new KeyValue(k.split("@")(0), v))
>   .groupByKey(Serdes.String, Serdes.Integer)
>   .count(TimeWindows.of(1000L), stateStore)
>   .to(windowedStringSerde, Serdes.Long, outputTopic)
> val driver = new ProcessorTopologyTestDriver(new StreamsConfig(props), 
> builder, stateStore)
> inputs.foreach {
>   case (key, value) => {
> driver.process(inputTopic, key, value, Serdes.String.serializer, 
> Serdes.Integer.serializer)
> val record = driver.readOutput(outputTopic, 
> Serdes.String.deserializer, Serdes.Long.deserializer)
> println(record)
>   }
> }
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2590: KAFKA-4789: Added support to ProcessorTopologyTest...

2017-02-28 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2590


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #2607: MINOR: Fix typo in javadoc of `flatMapValues`

2017-02-28 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2607


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #2303: MINOR: improve license header check by providing h...

2017-02-28 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2303


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (KAFKA-4819) Expose states of active tasks to public API

2017-02-28 Thread Florian Hussonnois (JIRA)
Florian Hussonnois created KAFKA-4819:
-

 Summary: Expose states of active tasks to public API
 Key: KAFKA-4819
 URL: https://issues.apache.org/jira/browse/KAFKA-4819
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Affects Versions: 0.10.2.0
Reporter: Florian Hussonnois
Priority: Minor


In Kafka 0.10.1.0 the toString method of KafkaStreams class has been 
implemented mainly to ease topologies debugging. Also,  the streams Metrics has 
been exposed to public API.

But currently theres is no way to monitor kstreams tasks states, assignments or 
consumed offsets.

I propose to expose the states of active tasks to the public API KafkaStreams.

For instance, an application can expose a REST API to get the global state of a 
kstreams topology.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2487: Kafka-4722 : Add application.id to StreamThread na...

2017-02-28 Thread sharad-develop
GitHub user sharad-develop reopened a pull request:

https://github.com/apache/kafka/pull/2487

Kafka-4722 : Add application.id to StreamThread name

Kafka-4722 : Add application.id to StreamThread name

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sharad-develop/kafka KAFKA-4722

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2487.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2487


commit 7946b2f35c0e3811ba7d706fc9c761d73ece47bb
Author: Damian Guy 
Date:   2017-01-16T19:40:47Z

MINOR: Remove unused constructor param from ProcessorStateManager

Remove applicationId parameter as it is no longer used.

Author: Damian Guy 

Reviewers: Guozhang Wang 

Closes #2385 from dguy/minor-remove-unused-param

commit 621dff22e79dc64b9a8748186dd985774044f91a
Author: Rajini Sivaram 
Date:   2017-01-17T11:16:29Z

KAFKA-4363; Documentation for sasl.jaas.config property

Author: Rajini Sivaram 

Reviewers: Ismael Juma 

Closes #2316 from rajinisivaram/KAFKA-4363

commit e3f4cdd0e249f78a7f4e8f064533bcd15eb11cbf
Author: Rajini Sivaram 
Date:   2017-01-17T12:55:07Z

KAFKA-4590; SASL/SCRAM system tests

Runs sanity test and one replication test using SASL/SCRAM.

Author: Rajini Sivaram 

Reviewers: Ewen Cheslack-Postava , Ismael Juma 


Closes #2355 from rajinisivaram/KAFKA-4590

commit 2b19ad9d8c47fb0f78a6e90d2f5711df6110bf1f
Author: Rajini Sivaram 
Date:   2017-01-17T18:42:55Z

KAFKA-4580; Use sasl.jaas.config for some system tests

Switched console_consumer, verifiable_consumer and verifiable_producer to 
use new sasl.jaas_config property instead of static JAAS configuration file 
when used with SASL_PLAINTEXT.

Author: Rajini Sivaram 

Reviewers: Ewen Cheslack-Postava , Ismael Juma 


Closes #2323 from rajinisivaram/KAFKA-4580

(cherry picked from commit 3f6c4f63c9c17424cf717ca76c74554bcf3b2e9a)
Signed-off-by: Ismael Juma 

commit 60d759a227087079e6fd270c68fd9e38441cb34a
Author: Jason Gustafson 
Date:   2017-01-17T18:42:05Z

MINOR: Some cleanups and additional testing for KIP-88

Author: Jason Gustafson 

Reviewers: Vahid Hashemian , Ismael Juma 


Closes #2383 from hachikuji/minor-cleanup-kip-88

commit c9b9acf6a8b542c2d0d825c17a4a20cf3fa5
Author: Damian Guy 
Date:   2017-01-17T20:33:11Z

KAFKA-4588: Wait for topics to be created in 
QueryableStateIntegrationTest.shouldNotMakeStoreAvailableUntilAllStoresAvailable

After debugging this i can see the times that it fails there is a race 
between when the topic is actually created/ready on the broker and when the 
assignment happens. When it fails `StreamPartitionAssignor.assign(..)` gets 
called with a `Cluster` with no topics. Hence the test hangs as no tasks get 
assigned. To fix this I added a `waitForTopics` method to 
`EmbeddedKafkaCluster`. This will wait until the topics have been created.

Author: Damian Guy 

Reviewers: Matthias J. Sax, Guozhang Wang

Closes #2371 from dguy/integration-test-fix

(cherry picked from commit 825f225bc5706b16af8ec44ca47ee1452c11e6f3)
Signed-off-by: Guozhang Wang 

commit 6f72a5a53c444278187fa6be58031168bcaffb26
Author: Damian Guy 
Date:   2017-01-17T22:13:46Z

KAFKA-3452 Follow-up: Refactoring StateStore hierarchies

This is a follow up of https://github.com/apache/kafka/pull/2166 - 
refactoring the store hierarchies as requested

Author: Damian Guy 

Reviewers: Guozhang Wang 

Closes #2360 from dguy/state-store-refactor

(cherry picked from commit 73b7ae0019d387407375f3865e263225c986a6ce)
Signed-off-by: Guozhang Wang 

commit eb62e5695506ae13bd37102c3c08e8a067eca0c8
Author: Ismael Juma 
Date:   2017-01-18T02:43:10Z

KAFKA-4591; Create Topic Policy follow-up

1. Added javadoc to public classes
2. Removed `s` from config name for consistency with interface name
3. The policy interface now implements Configurable and AutoCloseable as 
per the KIP
4. Use `null` instead of `-1` in `RequestMetadata`
5. 

Build failed in Jenkins: kafka-trunk-jdk8 #1308

2017-02-28 Thread Apache Jenkins Server
See 


Changes:

[me] KAFKA-4809: docker/run_tests.sh should set up /opt/kafka-dev to be the

[me] MINOR: improve license header check by providing head file instead of

--
Started by an SCM change
[EnvInject] - Loading node environment variables.
Building remotely on ubuntu-us1 (ubuntu) in workspace 

 > git rev-parse --is-inside-work-tree # timeout=10
Fetching changes from the remote Git repository
 > git config remote.origin.url 
 > https://git-wip-us.apache.org/repos/asf/kafka.git # timeout=10
Fetching upstream changes from https://git-wip-us.apache.org/repos/asf/kafka.git
 > git --version # timeout=10
 > git fetch --tags --progress 
 > https://git-wip-us.apache.org/repos/asf/kafka.git 
 > +refs/heads/*:refs/remotes/origin/*
 > git rev-parse refs/remotes/origin/trunk^{commit} # timeout=10
 > git rev-parse refs/remotes/origin/origin/trunk^{commit} # timeout=10
Checking out Revision d0e436c471ba4122ddcc0f7a1624546f97c4a517 
(refs/remotes/origin/trunk)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f d0e436c471ba4122ddcc0f7a1624546f97c4a517
 > git rev-list 8e6fbe8fed592e7cc15731a0827c350794413767 # timeout=10
Setting 
GRADLE_2_4_RC_2_HOME=/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.4-rc-2
[kafka-trunk-jdk8] $ /bin/bash -xe /tmp/hudson845165826670901739.sh
+ rm -rf 
+ 
/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.4-rc-2/bin/gradle
To honour the JVM settings for this build a new JVM will be forked. Please 
consider using the daemon: 
http://gradle.org/docs/2.4-rc-2/userguide/gradle_daemon.html.
Building project 'core' with Scala version 2.10.6
:downloadWrapper

BUILD SUCCESSFUL

Total time: 22.699 secs
Setting 
GRADLE_2_4_RC_2_HOME=/home/jenkins/jenkins-slave/tools/hudson.plugins.gradle.GradleInstallation/Gradle_2.4-rc-2
[kafka-trunk-jdk8] $ /bin/bash -xe /tmp/hudson7415865503829777297.sh
+ export GRADLE_OPTS=-Xmx1024m
+ GRADLE_OPTS=-Xmx1024m
+ ./gradlew --no-daemon -Dorg.gradle.project.maxParallelForks=1 
-Dorg.gradle.project.testLoggingEvents=started,passed,skipped,failed clean 
testAll
To honour the JVM settings for this build a new JVM will be forked. Please 
consider using the daemon: 
https://docs.gradle.org/3.2.1/userguide/gradle_daemon.html.
Daemon will be stopped at the end of the build stopping after processing
Building project 'core' with Scala version 2.10.6
:clean UP-TO-DATE
:clients:clean
:connect:clean UP-TO-DATE
:core:clean
:examples:clean UP-TO-DATE
:log4j-appender:clean UP-TO-DATE
:streams:clean UP-TO-DATE
:tools:clean UP-TO-DATE
:connect:api:clean UP-TO-DATE
:connect:file:clean UP-TO-DATE
:connect:json:clean UP-TO-DATE
:connect:runtime:clean UP-TO-DATE
:connect:transforms:clean UP-TO-DATE
:streams:examples:clean UP-TO-DATE
:test_core_2_10
Building project 'core' with Scala version 2.10.6
:kafka-trunk-jdk8:clients:compileJavawarning: [options] bootstrap class path 
not set in conjunction with -source 1.7
Note: Some input files use unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.
1 warning

:kafka-trunk-jdk8:clients:processResources UP-TO-DATE
:kafka-trunk-jdk8:clients:classes
:kafka-trunk-jdk8:clients:determineCommitId UP-TO-DATE
:kafka-trunk-jdk8:clients:createVersionFile
:kafka-trunk-jdk8:clients:jar
:kafka-trunk-jdk8:clients:compileTestJavawarning: [options] bootstrap class 
path not set in conjunction with -source 1.7
1 warning

:kafka-trunk-jdk8:clients:processTestResources
:kafka-trunk-jdk8:clients:testClasses
:kafka-trunk-jdk8:core:compileJava UP-TO-DATE
:kafka-trunk-jdk8:core:compileScala
:79:
 value DEFAULT_TIMESTAMP in object OffsetCommitRequest is deprecated: see 
corresponding Javadoc for more information.

org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP
 ^
:36:
 value DEFAULT_TIMESTAMP in object OffsetCommitRequest is deprecated: see 
corresponding Javadoc for more information.
 commitTimestamp: Long = 
org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP,

  ^
:37:
 value DEFAULT_TIMESTAMP in object OffsetCommitRequest is deprecated: see 
corresponding Javadoc for more information.
 expireTimestamp: Long = 

[jira] [Assigned] (KAFKA-4815) Idempotent/transactional Producer Checklist (KIP-98)

2017-02-28 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-4815:
--

Assignee: Jason Gustafson

> Idempotent/transactional Producer Checklist (KIP-98)
> 
>
> Key: KAFKA-4815
> URL: https://issues.apache.org/jira/browse/KAFKA-4815
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, core, producer 
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>  Labels: kip
> Fix For: 0.11.0.0
>
>
> This issue tracks implementation progress for KIP-98: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4815) Idempotent/transactional Producer Checklist (KIP-98)

2017-02-28 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4815:
---
Labels: kip  (was: )

> Idempotent/transactional Producer Checklist (KIP-98)
> 
>
> Key: KAFKA-4815
> URL: https://issues.apache.org/jira/browse/KAFKA-4815
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, core, producer 
>Reporter: Jason Gustafson
>Assignee: Matthias J. Sax
>  Labels: kip
> Fix For: 0.11.0.0
>
>
> This issue tracks implementation progress for KIP-98: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-4815) Idempotent/transactional Producer Checklist (KIP-98)

2017-02-28 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-4815:
--

Assignee: (was: Matthias J. Sax)

> Idempotent/transactional Producer Checklist (KIP-98)
> 
>
> Key: KAFKA-4815
> URL: https://issues.apache.org/jira/browse/KAFKA-4815
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, core, producer 
>Reporter: Jason Gustafson
>  Labels: kip
> Fix For: 0.11.0.0
>
>
> This issue tracks implementation progress for KIP-98: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-4815) Idempotent/transactional Producer Checklist (KIP-98)

2017-02-28 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-4815:
--

Assignee: Matthias J. Sax  (was: Jason Gustafson)

> Idempotent/transactional Producer Checklist (KIP-98)
> 
>
> Key: KAFKA-4815
> URL: https://issues.apache.org/jira/browse/KAFKA-4815
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, core, producer 
>Reporter: Jason Gustafson
>Assignee: Matthias J. Sax
>  Labels: kip
> Fix For: 0.11.0.0
>
>
> This issue tracks implementation progress for KIP-98: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4817) Implement idempotent producer

2017-02-28 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-4817:
---
Summary: Implement idempotent producer  (was: Basic idempotent producer 
implementation)

> Implement idempotent producer
> -
>
> Key: KAFKA-4817
> URL: https://issues.apache.org/jira/browse/KAFKA-4817
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, core, producer 
>Reporter: Jason Gustafson
>Assignee: Apurva Mehta
> Fix For: 0.11.0.0
>
>
> This task covers the implementation of the idempotent producer for KIP-98. 
> This covers both the necessary changes on the server-side and client-side 
> changes.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4818) Implement transactional producer

2017-02-28 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-4818:
--

 Summary: Implement transactional producer
 Key: KAFKA-4818
 URL: https://issues.apache.org/jira/browse/KAFKA-4818
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jason Gustafson
Assignee: Guozhang Wang


This covers the implementation of the transaction coordinator and the changes 
to the producer and consumer to support transactions. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2487: Kafka-4722 : Add application.id to StreamThread na...

2017-02-28 Thread sharad-develop
Github user sharad-develop closed the pull request at:

https://github.com/apache/kafka/pull/2487


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4738) Remove generic type of class ClientState

2017-02-28 Thread Sharad (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1561#comment-1561
 ] 

Sharad commented on KAFKA-4738:
---

Yes, its done.

PR submitted:
https://github.com/apache/kafka/pull/2605

> Remove generic type of class ClientState
> 
>
> Key: KAFKA-4738
> URL: https://issues.apache.org/jira/browse/KAFKA-4738
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Sharad
>Priority: Minor
>  Labels: beginner, newbie
>
> Currently, class 
> {{org.apache.kafka.streams.processor.internals.assignment.ClientState}} 
> uses a generic type. However, within actual Streams code base the type will 
> always be {{TaskId}} (from package {{org.apache.kafka.streams.processor}}).
> Thus, this ticket is about removing the generic type and replace it with 
> {{TaskId}}, to simplify the code base.
> There are some tests, that use {{ClientState}} (what allows for a 
> slightly simplified test setup).  Those tests need to be updated to work 
> properly using {{TaskId}} instead of {{Integer}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4817) Basic idempotent producer implementation

2017-02-28 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-4817:
--

 Summary: Basic idempotent producer implementation
 Key: KAFKA-4817
 URL: https://issues.apache.org/jira/browse/KAFKA-4817
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jason Gustafson
Assignee: Apurva Mehta


This task covers the implementation of the idempotent producer for KIP-98. This 
covers both the necessary changes on the server-side and client-side changes.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4816) Message format changes for idempotent/transactional producer

2017-02-28 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-4816:
--

 Summary: Message format changes for idempotent/transactional 
producer
 Key: KAFKA-4816
 URL: https://issues.apache.org/jira/browse/KAFKA-4816
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jason Gustafson
Assignee: Jason Gustafson


This task is for the implementation of the message format changes documented 
here: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging#KIP-98-ExactlyOnceDeliveryandTransactionalMessaging-MessageFormat.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: Improve License Header Check

2017-02-28 Thread Matthias J. Sax
Just a reminder. This PR got merged today.


-Matthias

On 1/20/17 9:02 AM, Matthias J. Sax wrote:
> Hi,
> 
> I opened an PR to improve the check for file license header (the check
> is currently quite weak and it's possible to have files with an invalid
> header).
> 
> https://github.com/apache/kafka/pull/2303/
> 
> As some people do have IDE setting for adding a header automatically, we
> wanted to give a heads up that you will need to update you IDE setting.
> 
> 
> 
> -Matthias
> 
> 



signature.asc
Description: OpenPGP digital signature


[jira] [Created] (KAFKA-4815) Idempotent/transactional Producer Checklist (KIP-98)

2017-02-28 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-4815:
--

 Summary: Idempotent/transactional Producer Checklist (KIP-98)
 Key: KAFKA-4815
 URL: https://issues.apache.org/jira/browse/KAFKA-4815
 Project: Kafka
  Issue Type: New Feature
  Components: clients, core, producer 
Reporter: Jason Gustafson
Assignee: Jason Gustafson
 Fix For: 0.11.0.0


This issue tracks implementation progress for KIP-98: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-82 - Add Record Headers

2017-02-28 Thread radai
I will settle for any API really, but just wanted to point out that as it
stands right now the API targets the most "advanced" (hence obscure and
rare) use cases, at the expense of the simple and common ones. i'd suggest
(as the minimal set):

Header header(String key) - returns JUST ONE (the very last) value given a
key
Iterable headers(String key) - returns ALL under a key
Iterable headers() - returns all, period. maybe allow null as key
to prev method instead?
void add(Header header) - appends a header (key inside).
void remove(String key) - removes ALL HEADERS under a key.

this way naive get/set semantics map to header(key)/add(Header) cleanly and
simply while preserving the ability to handle more advanced use cases.
we can always add more convenience methods (like those dealing with lists -
addAll etc) but i think the 5 (potentially 4) above are sufficient for
basically everything.

On Tue, Feb 28, 2017 at 4:08 AM, Ismael Juma  wrote:

> Hi Becket,
>
> Comments inline.
>
> On Sat, Feb 25, 2017 at 10:33 PM, Becket Qin  wrote:
> >
> > 1. Regarding the mutability.
> >
> > I think it would be a big convenience to have headers mutable during
> > certain stage in the message life cycle for the use cases you mentioned.
> I
> > agree there is a material benefit especially given that we may have to
> > modify the headers for each message.
> >
> > That said, I also think it is fair to say that in the producer, in order
> to
> > guarantee the correctness of the entire logic, it is necessary that at
> some
> > point we need to make producer record immutable. For example we probably
> > don't want to see that users accidentally updated the headers when the
> > producer is doing the serialization or compression.
> >
> > Given that, would it be possible to make Headers to be able to switch
> from
> > mutable to immutable? We have done this for the Batch in the producer.
> For
> > example, initially the headers are mutable, but after it has gone through
> > all the interceptors, we can call Headers.close() to make it immutable
> > afterwards.
> >
>
> The difference is that the batch is an internal class that is not exposed
> to users. Can you please explain what happens if a user tries to send the
> same ProducerRecord twice? Would an interceptor fail when trying to mutate
> the header that is now closed? Or did you have something else in mind?
>
> Thanks,
> Ismael
>


[jira] [Resolved] (KAFKA-4809) docker/run_tests.sh should set up /opt/kafka-dev to be the source directory

2017-02-28 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava resolved KAFKA-4809.
--
   Resolution: Fixed
 Reviewer: Ewen Cheslack-Postava
Fix Version/s: 0.10.2.1
   0.10.3.0

> docker/run_tests.sh should set up /opt/kafka-dev to be the source directory
> ---
>
> Key: KAFKA-4809
> URL: https://issues.apache.org/jira/browse/KAFKA-4809
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
> Fix For: 0.10.3.0, 0.10.2.1
>
>
> {{docker/run_tests.sh }} should be able to run 
> {{sanity_checks/test_verifiable_producer.py}} against version 0.8.2.2.  
> Currently, it can't do that, because the Docker image configures the contents 
> of {{/opt/kafka-dev}} differently than the Vagrant images are configured.  
> Currently, {{run_tests.sh}} decompresses the {{releaseTarGz}} in that 
> directory.  But it should simply be the source directory.  This would also 
> make it unnecessary to copy the {{releaseTarGz}} around.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2602: KAFKA-4809: docker/run_tests.sh should set up /opt...

2017-02-28 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2602


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4809) docker/run_tests.sh should set up /opt/kafka-dev to be the source directory

2017-02-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1509#comment-1509
 ] 

ASF GitHub Bot commented on KAFKA-4809:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2602


> docker/run_tests.sh should set up /opt/kafka-dev to be the source directory
> ---
>
> Key: KAFKA-4809
> URL: https://issues.apache.org/jira/browse/KAFKA-4809
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
>
> {{docker/run_tests.sh }} should be able to run 
> {{sanity_checks/test_verifiable_producer.py}} against version 0.8.2.2.  
> Currently, it can't do that, because the Docker image configures the contents 
> of {{/opt/kafka-dev}} differently than the Vagrant images are configured.  
> Currently, {{run_tests.sh}} decompresses the {{releaseTarGz}} in that 
> directory.  But it should simply be the source directory.  This would also 
> make it unnecessary to copy the {{releaseTarGz}} around.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Build failed in Jenkins: kafka-trunk-jdk8 #1307

2017-02-28 Thread Apache Jenkins Server
See 


Changes:

[me] MINOR: Make asJsonSchema() and asConnectSchema() methods public

--
[...truncated 7.43 KB...]
:328:
 value timestamp in class PartitionData is deprecated: see corresponding 
Javadoc for more information.
offsetRetention + partitionData.timestamp
^
:575:
 method offsetData in class ListOffsetRequest is deprecated: see corresponding 
Javadoc for more information.
val (authorizedRequestInfo, unauthorizedRequestInfo) = 
offsetRequest.offsetData.asScala.partition {
 ^
:575:
 class PartitionData in object ListOffsetRequest is deprecated: see 
corresponding Javadoc for more information.
val (authorizedRequestInfo, unauthorizedRequestInfo) = 
offsetRequest.offsetData.asScala.partition {

  ^
:580:
 constructor PartitionData in class PartitionData is deprecated: see 
corresponding Javadoc for more information.
  new ListOffsetResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
List[JLong]().asJava)
  ^
:605:
 constructor PartitionData in class PartitionData is deprecated: see 
corresponding Javadoc for more information.
(topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, 
offsets.map(new JLong(_)).asJava))
 ^
:612:
 constructor PartitionData in class PartitionData is deprecated: see 
corresponding Javadoc for more information.
  (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
   ^
:615:
 constructor PartitionData in class PartitionData is deprecated: see 
corresponding Javadoc for more information.
  (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
   ^
:269:
 class PartitionData in object ListOffsetRequest is deprecated: see 
corresponding Javadoc for more information.
val partitions = Map(topicPartition -> new 
ListOffsetRequest.PartitionData(earliestOrLatest, 1))
 ^
:280:
 value offsets in class PartitionData is deprecated: see corresponding Javadoc 
for more information.
  partitionData.offsets.get(0)
^
:45:
 class OldProducer in package producer is deprecated: This class has been 
deprecated and will be removed in a future release. Please use 
org.apache.kafka.clients.producer.KafkaProducer instead.
new OldProducer(getOldProducerProps(config))
^
:47:
 class NewShinyProducer in package producer is deprecated: This class has been 
deprecated and will be removed in a future release. Please use 
org.apache.kafka.clients.producer.KafkaProducer instead.
new NewShinyProducer(getNewProducerProps(config))
^
21 warnings found
warning: [options] bootstrap class path not set in conjunction with -source 1.7
1 warning
:kafka-trunk-jdk8:core:processResources UP-TO-DATE
:kafka-trunk-jdk8:core:classes
:kafka-trunk-jdk8:core:checkstyleMain
:kafka-trunk-jdk8:core:compileTestJava UP-TO-DATE
:kafka-trunk-jdk8:core:compileTestScala
:88:
 method createAndShutdownStep in class MetricsTest is deprecated: This test has 
been deprecated and it will be removed in a future release
createAndShutdownStep("group0", "consumer0", "producer0")
^
one warning found
:kafka-trunk-jdk8:core:processTestResources
:kafka-trunk-jdk8:core:testClasses

[jira] [Commented] (KAFKA-3959) __consumer_offsets wrong number of replicas at startup

2017-02-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15888757#comment-15888757
 ] 

ASF GitHub Bot commented on KAFKA-3959:
---

Github user ewencp closed the pull request at:

https://github.com/apache/kafka/pull/2484


> __consumer_offsets wrong number of replicas at startup
> --
>
> Key: KAFKA-3959
> URL: https://issues.apache.org/jira/browse/KAFKA-3959
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, offset manager, replication
>Affects Versions: 0.9.0.1, 0.10.0.0, 0.10.0.1, 0.10.0.2, 0.10.1.0, 
> 0.10.1.1, 0.10.1.2
> Environment: Brokers of 3 kafka nodes running Red Hat Enterprise 
> Linux Server release 7.2 (Maipo)
>Reporter: Alban Hurtaud
>Assignee: Onur Karaman
>Priority: Blocker
>  Labels: needs-kip, reliability
> Fix For: 0.10.3.0
>
>
> When creating a stack of 3 kafka brokers, the consumer is starting faster 
> than kafka nodes and when trying to read a topic, only one kafka node is 
> available.
> So the __consumer_offsets is created with a replication factor set to 1 
> (instead of configured 3) :
> offsets.topic.replication.factor=3
> default.replication.factor=3
> min.insync.replicas=2
> Then, other kafka nodes go up and we have exceptions because the replicas # 
> for __consumer_offsets is 1 and min insync is 2. So exceptions are thrown.
> What I missed is : Why the __consumer_offsets is created with replication to 
> 1 (when 1 broker is running) whereas in server.properties it is set to 3 ?
> To reproduce : 
> - Prepare 3 kafka nodes with the 3 lines above added to servers.properties.
> - Run one kafka,
> - Run one consumer (the __consumer_offsets is created with replicas =1)
> - Run 2 more kafka nodes



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2484: KAFKA-3959: Follow-up; move upgrade notes to 0.10....

2017-02-28 Thread ewencp
Github user ewencp closed the pull request at:

https://github.com/apache/kafka/pull/2484


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #2610: MINOR: Make asJsonSchema() and asConnectSchema() m...

2017-02-28 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2610


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] 0.10.3.0/0.11.0.0 release planning

2017-02-28 Thread Apurva Mehta
+1 (non-binding) for 0.11.0

I do agree with Ismael's point that exactly-once should go through one
release of stabilization before bumping the version to 1.0.

Thanks,
Apurva

On Mon, Feb 27, 2017 at 7:47 PM, Ismael Juma  wrote:

> Hi all,
>
> With 0.10.2.0 out of the way, I would like to volunteer to be the release
> manager for our next time-based release. See https://cwiki.apache.org/c
> onfluence/display/KAFKA/Time+Based+Release+Plan if you missed previous
> communication on time-based releases or need a reminder.
>
> I put together a draft release plan with June 2017 as the release month (as
> previously agreed) and a list of KIPs that have already been voted:
>
> *https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=68716876
>  >*
>
> I haven't set exact dates for the various stages (feature freeze, code
> freeze, etc.) for now as Ewen is going to send out an email with some
> suggested tweaks based on his experience as release manager for 0.10.2.0.
> We can set the exact dates after that discussion.
>
> As we are starting the process early this time, we should expect the number
> of KIPs in the plan to grow (so don't worry if your KIP is not there yet),
> but it's good to see that we already have 10 (including 2 merged and 2 with
> PR reviews in progress).
>
> Out of the KIPs listed, KIP-98 (Exactly-once and Transactions) and KIP-101
> (Leader Generation in Replication) require message format changes, which
> typically imply a major version bump (i.e. 0.11.0.0). If we do that, then
> it makes sense to also include KIP-106 (Unclean leader election should be
> false by default) and KIP-118 (Drop support for Java 7). We would also take
> the chance to remove deprecated code, in that case.
>
> Given the above, how do people feel about 0.11.0.0 as the next Kafka
> version? Please share your thoughts.
>
> Thanks,
> Ismael
>


Re: [VOTE] KIP-111 Kafka should preserve the Principal generated by the PrincipalBuilder while processing the request received on socket channel, on the broker.

2017-02-28 Thread Joel Koshy
If we deprecate KafkaPrincipal, then the Authorizer interface will also
need to change - i.e., deprecate the getAcls(KafkaPrincipal) method.

On Tue, Feb 28, 2017 at 10:11 AM, Mayuresh Gharat <
gharatmayures...@gmail.com> wrote:

> Hi Jun/Ismael,
>
> Thanks for the comments.
>
> I agree.
> What I was thinking was, we get the KIP passed now and wait till major
> kafka version release. We can then make this change, but for now we can
> wait. Does that work?
>
> If there are concerns, we can make the addition of extra field of type
> Principal to Session and then deprecate the KafkaPrincipal later.
>
> I am fine either ways. What do you think?
>
> Thanks,
>
> Mayuresh
>
> On Tue, Feb 28, 2017 at 9:53 AM, Jun Rao  wrote:
>
> > Hi, Ismael,
> >
> > Good point on compatibility.
> >
> > Hi, Mayuresh,
> >
> > Given that, it seems that it's better to just add the raw principal as a
> > new field in Session for now and deprecate the KafkaPrincipal field in
> the
> > future if needed?
> >
> > Thanks,
> >
> > Jun
> >
> > On Mon, Feb 27, 2017 at 5:05 PM, Ismael Juma  wrote:
> >
> > > Breaking clients without a deprecation period is something we only do
> as
> > a
> > > last resort. Is there strong justification for doing it here?
> > >
> > > Ismael
> > >
> > > On Mon, Feb 27, 2017 at 11:28 PM, Mayuresh Gharat <
> > > gharatmayures...@gmail.com> wrote:
> > >
> > > > Hi Ismael,
> > > >
> > > > Yeah. I agree that it might break the clients if the user is using
> the
> > > > kafkaPrincipal directly. But since KafkaPrincipal is also a Java
> > > Principal
> > > > and I think, it would be a right thing to do replace the
> kafkaPrincipal
> > > > with Java Principal at this stage than later.
> > > >
> > > > We can mention in the KIP, that it would break the clients that are
> > using
> > > > the KafkaPrincipal directly and they will have to use the
> PrincipalType
> > > > directly, if they are using it as its only one value and use the name
> > > from
> > > > the Principal directly or create a KafkaPrincipal from Java Principal
> > as
> > > we
> > > > are doing in SimpleAclAuthorizer with this KIP.
> > > >
> > > > Thanks,
> > > >
> > > > Mayuresh
> > > >
> > > >
> > > >
> > > > On Mon, Feb 27, 2017 at 10:56 AM, Ismael Juma 
> > wrote:
> > > >
> > > > > Hi Mayuresh,
> > > > >
> > > > > Sorry for the delay. The updated KIP states that there is no
> > > > compatibility
> > > > > impact, but that doesn't seem right. The fact that we changed the
> > type
> > > of
> > > > > Session.principal to `Principal` means that any code that expects
> it
> > to
> > > > be
> > > > > `KafkaPrincipal` will break. Either because of declared types
> > (likely)
> > > or
> > > > > if it accesses `getPrincipalType` (unlikely since the value is
> always
> > > the
> > > > > same). It's a bit annoying, but we should add a new field to
> > `Session`
> > > > with
> > > > > the original principal. We can potentially deprecate the existing
> > one,
> > > if
> > > > > we're sure we don't need it (or we can leave it for now).
> > > > >
> > > > > Ismael
> > > > >
> > > > > On Mon, Feb 27, 2017 at 6:40 PM, Mayuresh Gharat <
> > > > > gharatmayures...@gmail.com
> > > > > > wrote:
> > > > >
> > > > > > Hi Ismael, Joel, Becket
> > > > > >
> > > > > > Would you mind taking a look at this. We require 2 more binding
> > votes
> > > > for
> > > > > > the KIP to pass.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Mayuresh
> > > > > >
> > > > > > On Thu, Feb 23, 2017 at 10:57 AM, Dong Lin 
> > > > wrote:
> > > > > >
> > > > > > > +1 (non-binding)
> > > > > > >
> > > > > > > On Wed, Feb 22, 2017 at 10:52 PM, Manikumar <
> > > > manikumar.re...@gmail.com
> > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > +1 (non-binding)
> > > > > > > >
> > > > > > > > On Thu, Feb 23, 2017 at 3:27 AM, Mayuresh Gharat <
> > > > > > > > gharatmayures...@gmail.com
> > > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Jun,
> > > > > > > > >
> > > > > > > > > Thanks a lot for the comments and reviews.
> > > > > > > > > I agree we should log the username.
> > > > > > > > > What I meant by creating KafkaPrincipal was, after this KIP
> > we
> > > > > would
> > > > > > > not
> > > > > > > > be
> > > > > > > > > required to create KafkaPrincipal and if we want to
> maintain
> > > the
> > > > > old
> > > > > > > > > logging, we will have to create it as we do today.
> > > > > > > > > I will take care that we specify the Principal name in the
> > log.
> > > > > > > > >
> > > > > > > > > Thanks again for all the reviews.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > >
> > > > > > > > > Mayuresh
> > > > > > > > >
> > > > > > > > > On Wed, Feb 22, 2017 at 1:45 PM, Jun Rao  >
> > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi, Mayuresh,
> > > > > > > > > >
> > > > > > > > > > For logging the user name, we could do either way. We
> just
> > > need
> > > > > to
> 

[GitHub] kafka pull request #2611: MINOR: improve MinTimestampTrackerTest and fix NPE...

2017-02-28 Thread dguy
GitHub user dguy opened a pull request:

https://github.com/apache/kafka/pull/2611

MINOR: improve MinTimestampTrackerTest and fix NPE when null element removed



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dguy/kafka testing

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2611.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2611


commit 7b702b9914c1b7f676f7f3d667e6ee2f0aef461d
Author: Damian Guy 
Date:   2017-02-28T17:53:45Z

improve MinTimestampTrackerTest and fix NPE when element with null removed




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [VOTE] KIP-107: Add purgeDataBefore() API in AdminClient

2017-02-28 Thread Dong Lin
Thanks Jun. I have updated the KIP to reflect this change.

On Tue, Feb 28, 2017 at 9:44 AM, Jun Rao  wrote:

> Hi, Dong,
>
> Yes, this change makes sense to me.
>
> Thanks,
>
> Jun
>
> On Mon, Feb 27, 2017 at 8:26 PM, Dong Lin  wrote:
>
> > Hi Jun and everyone,
> >
> > I would like to change the KIP in the following way. Currently, if any
> > replica if offline, the purge result for a partition will
> > be NotEnoughReplicasException and its low_watermark will be 0. The
> > motivation for this approach is that we want to guarantee that the data
> > before purgedOffset has been deleted on all replicas of this partition if
> > purge result indicates success.
> >
> > But this approach seems too conservative. It should be sufficient in most
> > cases to just tell user success and set low_watermark to minimum
> > logStartOffset of all live replicas in the PurgeResponse if
> logStartOffset
> > of all live replicas have reached purgedOffset. This is because for an
> > offline replicas to become online and be elected leader, it should have
> > received one FetchReponse from the current leader which should tell it to
> > purge beyond purgedOffset. The benefit of doing this change is that we
> can
> > allow purge operation to succeed when some replica is offline.
> >
> > Are you OK with this change? If so, I will go ahead to update the KIP and
> > implement this behavior.
> >
> > Thanks,
> > Dong
> >
> >
> >
> > On Tue, Jan 17, 2017 at 10:18 AM, Dong Lin  wrote:
> >
> > > Hey Jun,
> > >
> > > Do you have time to review the KIP again or vote for it?
> > >
> > > Hey Ewen,
> > >
> > > Can you also review the KIP again or vote for it? I have discussed with
> > > Radai and Becket regarding your concern. We still think putting it in
> > Admin
> > > Client seems more intuitive because there is use-case where application
> > > which manages topic or produces data may also want to purge data. It
> > seems
> > > weird if they need to create a consumer to do this.
> > >
> > > Thanks,
> > > Dong
> > >
> > > On Thu, Jan 12, 2017 at 9:34 AM, Mayuresh Gharat <
> > > gharatmayures...@gmail.com> wrote:
> > >
> > >> +1 (non-binding)
> > >>
> > >> Thanks,
> > >>
> > >> Mayuresh
> > >>
> > >> On Wed, Jan 11, 2017 at 1:03 PM, Dong Lin 
> wrote:
> > >>
> > >> > Sorry for the duplicated email. It seems that gmail will put the
> > voting
> > >> > email in this thread if I simply replace DISCUSS with VOTE in the
> > >> subject.
> > >> >
> > >> > On Wed, Jan 11, 2017 at 12:57 PM, Dong Lin 
> > wrote:
> > >> >
> > >> > > Hi all,
> > >> > >
> > >> > > It seems that there is no further concern with the KIP-107. At
> this
> > >> point
> > >> > > we would like to start the voting process. The KIP can be found at
> > >> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-107
> > >> > > %3A+Add+purgeDataBefore%28%29+API+in+AdminClient.
> > >> > >
> > >> > > Thanks,
> > >> > > Dong
> > >> > >
> > >> >
> > >>
> > >>
> > >>
> > >> --
> > >> -Regards,
> > >> Mayuresh R. Gharat
> > >> (862) 250-7125
> > >>
> > >
> > >
> >
>


Re: KIP-122: Add a tool to Reset Consumer Group Offsets

2017-02-28 Thread Vahid S Hashemian
Thanks Jorge for addressing my suggestions. Looks good to me.

--Vahid



From:   Jorge Esteban Quilcate Otoya 
To: dev@kafka.apache.org
Date:   02/27/2017 01:57 AM
Subject:Re: KIP-122: Add a tool to Reset Consumer Group Offsets



@Vahid: make sense to add "new lag" info IMO, I will update the KIP.

@Becket:

1. About deleting, I think ConsumerGroupCommand already has an option to
delete Group information by topic. From delete docs: "Pass in groups to
delete topic partition offsets and ownership information over the entire
consumer group.". Let me know if this solves is enough for your case, of 
we
can consider to add something to the Reset Offsets tool.

2. Yes, for instance in the case of active consumers, the tool will
validate that there are no active consumers to avoid race conditions. I
have added some code snippets to the wiki, thanks for pointing that out.

El sáb., 25 feb. 2017 a las 0:29, Becket Qin ()
escribió:

> Thanks for the KIP Jorge. I think this is a useful KIP. I haven't read 
the
> KIP in detail yet, some comments from a quick review:
>
> 1. A glance at it it seems that there is no delete option. At LinkedIn 
we
> identified some cases that users want to delete the committed offset of 
a
> group. It would be good to include that as well.
>
> 2. It seems the KIP is missing some necessary implementation key points.
> e.g. how would the tool to commit offsets for a consumer group, does the
> broker need to know this is a special tool instead of an active consumer 
in
> the group (the generation check will be made on offset commit)? They are
> probably in your proof of concept code. Could you add them to the wiki 
as
> well?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Fri, Feb 24, 2017 at 1:19 PM, Vahid S Hashemian <
> vahidhashem...@us.ibm.com> wrote:
>
> > Thanks Jorge for addressing my question/suggestion.
> >
> > One last thing. I noticed is that in the example you have for the 
"plan"
> > option
> > (
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 122%3A+Add+Reset+Consumer+Group+Offsets+tooling#KIP-122:
> > AddResetConsumerGroupOffsetstooling-ExecutionOptions
> > )
> > under "Description" column, you put 0 for lag. So I assume that is the
> > current lag being reported, and not the new lag. Might be helpful to
> > explicitly specify that (i.e. CURRENT-LAG) in the column header.
> > The other option is to report both current and new lags, but I 
understand
> > if we don't want to do that since it's rather redundant info.
> >
> > Thanks again.
> > --Vahid
> >
> >
> >
> > From:   Jorge Esteban Quilcate Otoya 
> > To: dev@kafka.apache.org
> > Date:   02/24/2017 12:47 PM
> > Subject:Re: KIP-122: Add a tool to Reset Consumer Group 
Offsets
> >
> >
> >
> > Hi Vahid,
> >
> > Thanks for your comments. Check my answers below:
> >
> > El vie., 24 feb. 2017 a las 19:41, Vahid S Hashemian (<
> > vahidhashem...@us.ibm.com>) escribió:
> >
> > > Hi Jorge,
> > >
> > > Thanks for the useful KIP.
> > >
> > > I have a question regarding the proposed "plan" option.
> > > The "current offset" and "lag" values of a topic partition are
> > meaningful
> > > within a consumer group. In other words, different consumer groups
> could
> > > have different values for these properties of each topic partition.
> > > I don't see that reflected in the discussion around the "plan" 
option.
> > > Unless we are assuming a "--group" option is also provided by user
> > (which
> > > is not clear from the KIP if that is the case).
> > >
> >
> > I have added an additional comment to state that this options will
> require
> > a "group" argument.
> > It is considered to affect only one Consumer Group.
> >
> >
> > >
> > > Also, I was wondering if you can provide at least one full command
> > example
> > > for each of the "plan", "execute", and "export" options. They would
> > > definitely help in understanding some of the details.
> > >
> > >
> > Added to the KIP.
> >
> >
> > > Sorry for the delayed question/suggestion. I hope they make sense.
> > >
> > > Thanks.
> > > --Vahid
> > >
> > >
> > >
> > > From:   Jorge Esteban Quilcate Otoya 
> > > To: dev@kafka.apache.org
> > > Date:   02/24/2017 09:51 AM
> > > Subject:Re: KIP-122: Add a tool to Reset Consumer Group 
Offsets
> > >
> > >
> > >
> > > Great! KIP updated.
> > >
> > >
> > >
> > > El vie., 24 feb. 2017 a las 18:22, Matthias J. Sax
> > > ()
> > > escribió:
> > >
> > > > I like this!
> > > >
> > > > --by-duration and --shift-by
> > > >
> > > >
> > > > -Matthias
> > > >
> > > > On 2/24/17 12:57 AM, Jorge Esteban Quilcate Otoya wrote:
> > > > > Renaming to --by-duration LGTM
> > > > >
> > > > > Not sure about changing it to --shift-by-duration because we 
could
> > end
> > > up
> > > > > with the same redundancy as before with reset: --reset-offsets
> > > > > --reset-to-*.
> > > > >
> > > > > Maybe 

Re: [DISCUSS] KIP-112: Handle disk failure for JBOD

2017-02-28 Thread Eno Thereska
Thanks Todd for the explanation.

Eno
> On 28 Feb 2017, at 18:15, Todd Palino  wrote:
> 
> We have tested RAID 5/6 in the past (and recently) and found it to be
> lacking. So, as noted, rebuild takes more time than RAID 10 because all the
> disks need to be accessed to recalculate parity. In addition, there’s a
> significant performance loss just in normal operations. It’s been a while
> since I ran those tests, but it was in the 30-50% range - nothing to shrug
> off. We didn’t even get to failure testing because of that.
> 
> Jun - to your question, we ran the tests with numerous combinations of
> block sizes and FS parameters. The performance varied, but it was never
> good enough to warrant more than a superficial look at using RAID 5/6. We
> also tested both software RAID and hardware RAID.
> 
> As far as the operational concerns around broker-per-disk and
> broker-per-server, we’ve been talking about this internally. Running one
> broker per disk adds a good bit of administrative overhead and complexity.
> If you perform a one by one rolling bounce of the cluster, you’re talking
> about a 10x increase in time. That means a cluster that restarts in 30
> minutes now takes 5 hours. If you try and optimize this by shutting down
> all the brokers on one host at a time, you can get close to the original
> number, but you now have added operational complexity by having to
> micro-manage the bounce. The broker count increase will percolate down to
> the rest of the administrative domain as well - maintaining ports for all
> the instances, monitoring more instances, managing configs, etc.
> 
> You also have the overhead of running the extra processes - extra heap,
> task switching, etc. We don’t have a problem with page cache really, since
> the VM subsystem is fairly efficient about how it works. But just because
> cache works doesn’t mean we’re not wasting other resources. And that gets
> pushed downstream to clients as well, because they all have to maintain
> more network connections and the resources that go along with it.
> 
> Running more brokers in a cluster also exposes you to more corner cases and
> race conditions within the Kafka code. Bugs in the brokers, bugs in the
> controllers, more complexity in balancing load in a cluster (though trying
> to balance load across disks in a single broker doing JBOD negates that).
> 
> -Todd
> 
> 
> On Tue, Feb 28, 2017 at 9:40 AM, Jun Rao  wrote:
> 
>> Hi, Dong,
>> 
>> RAID6 is an improvement over RAID5 and can tolerate 2 disks failure. Eno's
>> point is that the rebuild of RAID5/RAID6 requires reading more data
>> compared with RAID10, which increases the probability of error during
>> rebuild. This makes sense. In any case, do you think you could ask the SREs
>> at LinkedIn to share their opinions on RAID5/RAID6?
>> 
>> Yes, when a replica is offline due to a bad disk, it makes sense to handle
>> it immediately as if a StopReplicaRequest is received (i.e., replica is no
>> longer considered a leader and is removed from any replica fetcher thread).
>> Could you add that detail in item 2. in the wiki?
>> 
>> 50. The wiki says "Broker assumes a log directory to be good after it
>> starts" : A log directory actually could be bad during startup.
>> 
>> 51. In item 4, the wiki says "The controller watches the path
>> /log_dir_event_notification for new znode.". This doesn't seem be needed
>> now?
>> 
>> 52. The isNewReplica field in LeaderAndIsrRequest should be for each
>> replica inside the replicas field, right?
>> 
>> Other than those, the current KIP looks good to me. Do you want to start a
>> separate discussion thread on KIP-113? I do have some comments there.
>> 
>> Thanks for working on this!
>> 
>> Jun
>> 
>> 
>> On Mon, Feb 27, 2017 at 5:51 PM, Dong Lin  wrote:
>> 
>>> Hi Jun,
>>> 
>>> In addition to the Eno's reference of why rebuild time with RAID-5 is
>> more
>>> expensive, another concern is that RAID-5 will fail if more than one disk
>>> fails. JBOD is still works with 1+ disk failure and has better
>> performance
>>> with one disk failure. These seems like good argument for using JBOD
>>> instead of RAID-5.
>>> 
>>> If a leader replica goes offline, the broker should first take all
>> actions
>>> (i.e. remove the partition from fetcher thread) as if it has received
>>> StopReplicaRequest for this partition because the replica can no longer
>>> work anyway. It will also respond with error to any ProduceRequest and
>>> FetchRequest for partition. The broker notifies controller by writing
>>> notification znode in ZK. The controller learns the disk failure event
>> from
>>> ZK, sends LeaderAndIsrRequest and receives LeaderAndIsrResponse to learn
>>> that the replica is offline. The controller will then elect new leader
>> for
>>> this partition and sends LeaderAndIsrRequest/MetadataUpdateRequest to
>>> relevant brokers. The broker should stop adjusting the ISR for this
>>> partition as if 

Re: [DISCUSS] KIP-112: Handle disk failure for JBOD

2017-02-28 Thread Dong Lin
Hey Jun,

Certainly, I have added Todd to reply to the thread. And I have updated the
item to in the wiki.

50. The full statement is "Broker assumes a log directory to be good after
it starts, and mark log directory as bad once there is IOException when
broker attempts to access (i.e. read or write) the log directory". This
statement seems reasonable, right? If a log directory is actually bad, then
the broker will first assume it is OK, try to read logs on this log
directory, encounter IOException, and then mark it as bad.

51. My bad. I thought I removed it but I didn't. It is removed now.

52. I don't think so.. The isNewReplica field in the LeaderAndIsrRequest is
only relevant to the replica (i.e. broker) that receives the
LeaderAndIsrRequest. There is no need to specify whether each replica is
new inside LeaderAndIsrRequest. In other words, if a broker sends
LeaderAndIsrRequest to three different replicas of a given partition, the
isNewReplica field can be different across these three requests.

Yeah, I would definitely want to start discussion on KIP-113 after we have
reached agreement on KIP-112. I have actually opened KIP-113 discussion
thread on 1/12 together with this thread. I have yet to add the ability to
list offline directories in KIP-113 which we discussed in this thread.

Thanks for all your reviews! Is there further concern with the latest KIP?

Thanks!
Dong

On Tue, Feb 28, 2017 at 9:40 AM, Jun Rao  wrote:

> Hi, Dong,
>
> RAID6 is an improvement over RAID5 and can tolerate 2 disks failure. Eno's
> point is that the rebuild of RAID5/RAID6 requires reading more data
> compared with RAID10, which increases the probability of error during
> rebuild. This makes sense. In any case, do you think you could ask the SREs
> at LinkedIn to share their opinions on RAID5/RAID6?
>
> Yes, when a replica is offline due to a bad disk, it makes sense to handle
> it immediately as if a StopReplicaRequest is received (i.e., replica is no
> longer considered a leader and is removed from any replica fetcher thread).
> Could you add that detail in item 2. in the wiki?
>
> 50. The wiki says "Broker assumes a log directory to be good after it
> starts" : A log directory actually could be bad during startup.
>
> 51. In item 4, the wiki says "The controller watches the path
> /log_dir_event_notification for new znode.". This doesn't seem be needed
> now?
>
> 52. The isNewReplica field in LeaderAndIsrRequest should be for each
> replica inside the replicas field, right?
>
> Other than those, the current KIP looks good to me. Do you want to start a
> separate discussion thread on KIP-113? I do have some comments there.
>
> Thanks for working on this!
>
> Jun
>
>
> On Mon, Feb 27, 2017 at 5:51 PM, Dong Lin  wrote:
>
> > Hi Jun,
> >
> > In addition to the Eno's reference of why rebuild time with RAID-5 is
> more
> > expensive, another concern is that RAID-5 will fail if more than one disk
> > fails. JBOD is still works with 1+ disk failure and has better
> performance
> > with one disk failure. These seems like good argument for using JBOD
> > instead of RAID-5.
> >
> > If a leader replica goes offline, the broker should first take all
> actions
> > (i.e. remove the partition from fetcher thread) as if it has received
> > StopReplicaRequest for this partition because the replica can no longer
> > work anyway. It will also respond with error to any ProduceRequest and
> > FetchRequest for partition. The broker notifies controller by writing
> > notification znode in ZK. The controller learns the disk failure event
> from
> > ZK, sends LeaderAndIsrRequest and receives LeaderAndIsrResponse to learn
> > that the replica is offline. The controller will then elect new leader
> for
> > this partition and sends LeaderAndIsrRequest/MetadataUpdateRequest to
> > relevant brokers. The broker should stop adjusting the ISR for this
> > partition as if the broker is already offline. I am not sure there is any
> > inconsistency in broker's behavior when it is leader or follower. Is
> there
> > any concern with this approach?
> >
> > Thanks for catching this. I have removed that reference from the KIP.
> >
> > Hi Eno,
> >
> > Thank you for providing the reference of the RAID-5. In LinkedIn we have
> 10
> > disks per Kafka machine. It will not be a show-stopper operationally for
> > LinkedIn if we have to deploy one-broker-per-disk. On the other hand we
> > previously discussed the advantage of JBOD vs. one-broker-per-disk or
> > one-broker-per-machine. One-broker-per-disk suffers from the problems
> > described in the KIP and one-broker-per-machine increases the failure
> > caused by disk failure by 10X. Since JBOD is strictly better than either
> of
> > the two, it is also better then one-broker-per-multiple-disk which is
> > somewhere between one-broker-per-disk and one-broker-per-machine.
> >
> > I personally think the benefits of JBOD design is worth the
> implementation
> > complexity it 

Re: [DISCUSS] KIP-112: Handle disk failure for JBOD

2017-02-28 Thread Todd Palino
We have tested RAID 5/6 in the past (and recently) and found it to be
lacking. So, as noted, rebuild takes more time than RAID 10 because all the
disks need to be accessed to recalculate parity. In addition, there’s a
significant performance loss just in normal operations. It’s been a while
since I ran those tests, but it was in the 30-50% range - nothing to shrug
off. We didn’t even get to failure testing because of that.

Jun - to your question, we ran the tests with numerous combinations of
block sizes and FS parameters. The performance varied, but it was never
good enough to warrant more than a superficial look at using RAID 5/6. We
also tested both software RAID and hardware RAID.

As far as the operational concerns around broker-per-disk and
broker-per-server, we’ve been talking about this internally. Running one
broker per disk adds a good bit of administrative overhead and complexity.
If you perform a one by one rolling bounce of the cluster, you’re talking
about a 10x increase in time. That means a cluster that restarts in 30
minutes now takes 5 hours. If you try and optimize this by shutting down
all the brokers on one host at a time, you can get close to the original
number, but you now have added operational complexity by having to
micro-manage the bounce. The broker count increase will percolate down to
the rest of the administrative domain as well - maintaining ports for all
the instances, monitoring more instances, managing configs, etc.

You also have the overhead of running the extra processes - extra heap,
task switching, etc. We don’t have a problem with page cache really, since
the VM subsystem is fairly efficient about how it works. But just because
cache works doesn’t mean we’re not wasting other resources. And that gets
pushed downstream to clients as well, because they all have to maintain
more network connections and the resources that go along with it.

Running more brokers in a cluster also exposes you to more corner cases and
race conditions within the Kafka code. Bugs in the brokers, bugs in the
controllers, more complexity in balancing load in a cluster (though trying
to balance load across disks in a single broker doing JBOD negates that).

-Todd


On Tue, Feb 28, 2017 at 9:40 AM, Jun Rao  wrote:

> Hi, Dong,
>
> RAID6 is an improvement over RAID5 and can tolerate 2 disks failure. Eno's
> point is that the rebuild of RAID5/RAID6 requires reading more data
> compared with RAID10, which increases the probability of error during
> rebuild. This makes sense. In any case, do you think you could ask the SREs
> at LinkedIn to share their opinions on RAID5/RAID6?
>
> Yes, when a replica is offline due to a bad disk, it makes sense to handle
> it immediately as if a StopReplicaRequest is received (i.e., replica is no
> longer considered a leader and is removed from any replica fetcher thread).
> Could you add that detail in item 2. in the wiki?
>
> 50. The wiki says "Broker assumes a log directory to be good after it
> starts" : A log directory actually could be bad during startup.
>
> 51. In item 4, the wiki says "The controller watches the path
> /log_dir_event_notification for new znode.". This doesn't seem be needed
> now?
>
> 52. The isNewReplica field in LeaderAndIsrRequest should be for each
> replica inside the replicas field, right?
>
> Other than those, the current KIP looks good to me. Do you want to start a
> separate discussion thread on KIP-113? I do have some comments there.
>
> Thanks for working on this!
>
> Jun
>
>
> On Mon, Feb 27, 2017 at 5:51 PM, Dong Lin  wrote:
>
> > Hi Jun,
> >
> > In addition to the Eno's reference of why rebuild time with RAID-5 is
> more
> > expensive, another concern is that RAID-5 will fail if more than one disk
> > fails. JBOD is still works with 1+ disk failure and has better
> performance
> > with one disk failure. These seems like good argument for using JBOD
> > instead of RAID-5.
> >
> > If a leader replica goes offline, the broker should first take all
> actions
> > (i.e. remove the partition from fetcher thread) as if it has received
> > StopReplicaRequest for this partition because the replica can no longer
> > work anyway. It will also respond with error to any ProduceRequest and
> > FetchRequest for partition. The broker notifies controller by writing
> > notification znode in ZK. The controller learns the disk failure event
> from
> > ZK, sends LeaderAndIsrRequest and receives LeaderAndIsrResponse to learn
> > that the replica is offline. The controller will then elect new leader
> for
> > this partition and sends LeaderAndIsrRequest/MetadataUpdateRequest to
> > relevant brokers. The broker should stop adjusting the ISR for this
> > partition as if the broker is already offline. I am not sure there is any
> > inconsistency in broker's behavior when it is leader or follower. Is
> there
> > any concern with this approach?
> >
> > Thanks for catching this. I have removed that reference from 

Re: [DISCUSS] 0.10.3.0/0.11.0.0 release planning

2017-02-28 Thread Bill Bejeck
+1

Sounds good to me. Thanks, Ismael.

-Bill

On Tue, Feb 28, 2017 at 1:01 PM, Colin McCabe  wrote:

> +1 (non-binding).
>
> Thanks, Ismael.
>
> cheers,
> Colin
>
>
> On Mon, Feb 27, 2017, at 19:47, Ismael Juma wrote:
> > Hi all,
> >
> > With 0.10.2.0 out of the way, I would like to volunteer to be the release
> > manager for our next time-based release. See https://cwiki.apache.org/c
> > onfluence/display/KAFKA/Time+Based+Release+Plan if you missed previous
> > communication on time-based releases or need a reminder.
> >
> > I put together a draft release plan with June 2017 as the release month
> > (as
> > previously agreed) and a list of KIPs that have already been voted:
> >
> > *https://cwiki.apache.org/confluence/pages/viewpage.
> action?pageId=68716876
> >  action?pageId=68716876>*
> >
> > I haven't set exact dates for the various stages (feature freeze, code
> > freeze, etc.) for now as Ewen is going to send out an email with some
> > suggested tweaks based on his experience as release manager for 0.10.2.0.
> > We can set the exact dates after that discussion.
> >
> > As we are starting the process early this time, we should expect the
> > number
> > of KIPs in the plan to grow (so don't worry if your KIP is not there
> > yet),
> > but it's good to see that we already have 10 (including 2 merged and 2
> > with
> > PR reviews in progress).
> >
> > Out of the KIPs listed, KIP-98 (Exactly-once and Transactions) and
> > KIP-101
> > (Leader Generation in Replication) require message format changes, which
> > typically imply a major version bump (i.e. 0.11.0.0). If we do that, then
> > it makes sense to also include KIP-106 (Unclean leader election should be
> > false by default) and KIP-118 (Drop support for Java 7). We would also
> > take
> > the chance to remove deprecated code, in that case.
> >
> > Given the above, how do people feel about 0.11.0.0 as the next Kafka
> > version? Please share your thoughts.
> >
> > Thanks,
> > Ismael
>


Re: [VOTE] KIP-111 Kafka should preserve the Principal generated by the PrincipalBuilder while processing the request received on socket channel, on the broker.

2017-02-28 Thread Mayuresh Gharat
Hi Jun/Ismael,

Thanks for the comments.

I agree.
What I was thinking was, we get the KIP passed now and wait till major
kafka version release. We can then make this change, but for now we can
wait. Does that work?

If there are concerns, we can make the addition of extra field of type
Principal to Session and then deprecate the KafkaPrincipal later.

I am fine either ways. What do you think?

Thanks,

Mayuresh

On Tue, Feb 28, 2017 at 9:53 AM, Jun Rao  wrote:

> Hi, Ismael,
>
> Good point on compatibility.
>
> Hi, Mayuresh,
>
> Given that, it seems that it's better to just add the raw principal as a
> new field in Session for now and deprecate the KafkaPrincipal field in the
> future if needed?
>
> Thanks,
>
> Jun
>
> On Mon, Feb 27, 2017 at 5:05 PM, Ismael Juma  wrote:
>
> > Breaking clients without a deprecation period is something we only do as
> a
> > last resort. Is there strong justification for doing it here?
> >
> > Ismael
> >
> > On Mon, Feb 27, 2017 at 11:28 PM, Mayuresh Gharat <
> > gharatmayures...@gmail.com> wrote:
> >
> > > Hi Ismael,
> > >
> > > Yeah. I agree that it might break the clients if the user is using the
> > > kafkaPrincipal directly. But since KafkaPrincipal is also a Java
> > Principal
> > > and I think, it would be a right thing to do replace the kafkaPrincipal
> > > with Java Principal at this stage than later.
> > >
> > > We can mention in the KIP, that it would break the clients that are
> using
> > > the KafkaPrincipal directly and they will have to use the PrincipalType
> > > directly, if they are using it as its only one value and use the name
> > from
> > > the Principal directly or create a KafkaPrincipal from Java Principal
> as
> > we
> > > are doing in SimpleAclAuthorizer with this KIP.
> > >
> > > Thanks,
> > >
> > > Mayuresh
> > >
> > >
> > >
> > > On Mon, Feb 27, 2017 at 10:56 AM, Ismael Juma 
> wrote:
> > >
> > > > Hi Mayuresh,
> > > >
> > > > Sorry for the delay. The updated KIP states that there is no
> > > compatibility
> > > > impact, but that doesn't seem right. The fact that we changed the
> type
> > of
> > > > Session.principal to `Principal` means that any code that expects it
> to
> > > be
> > > > `KafkaPrincipal` will break. Either because of declared types
> (likely)
> > or
> > > > if it accesses `getPrincipalType` (unlikely since the value is always
> > the
> > > > same). It's a bit annoying, but we should add a new field to
> `Session`
> > > with
> > > > the original principal. We can potentially deprecate the existing
> one,
> > if
> > > > we're sure we don't need it (or we can leave it for now).
> > > >
> > > > Ismael
> > > >
> > > > On Mon, Feb 27, 2017 at 6:40 PM, Mayuresh Gharat <
> > > > gharatmayures...@gmail.com
> > > > > wrote:
> > > >
> > > > > Hi Ismael, Joel, Becket
> > > > >
> > > > > Would you mind taking a look at this. We require 2 more binding
> votes
> > > for
> > > > > the KIP to pass.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Mayuresh
> > > > >
> > > > > On Thu, Feb 23, 2017 at 10:57 AM, Dong Lin 
> > > wrote:
> > > > >
> > > > > > +1 (non-binding)
> > > > > >
> > > > > > On Wed, Feb 22, 2017 at 10:52 PM, Manikumar <
> > > manikumar.re...@gmail.com
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > +1 (non-binding)
> > > > > > >
> > > > > > > On Thu, Feb 23, 2017 at 3:27 AM, Mayuresh Gharat <
> > > > > > > gharatmayures...@gmail.com
> > > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Jun,
> > > > > > > >
> > > > > > > > Thanks a lot for the comments and reviews.
> > > > > > > > I agree we should log the username.
> > > > > > > > What I meant by creating KafkaPrincipal was, after this KIP
> we
> > > > would
> > > > > > not
> > > > > > > be
> > > > > > > > required to create KafkaPrincipal and if we want to maintain
> > the
> > > > old
> > > > > > > > logging, we will have to create it as we do today.
> > > > > > > > I will take care that we specify the Principal name in the
> log.
> > > > > > > >
> > > > > > > > Thanks again for all the reviews.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Mayuresh
> > > > > > > >
> > > > > > > > On Wed, Feb 22, 2017 at 1:45 PM, Jun Rao 
> > > wrote:
> > > > > > > >
> > > > > > > > > Hi, Mayuresh,
> > > > > > > > >
> > > > > > > > > For logging the user name, we could do either way. We just
> > need
> > > > to
> > > > > > make
> > > > > > > > > sure the expected user name is logged. Also, currently, we
> > are
> > > > > > already
> > > > > > > > > creating a KafkaPrincipal on every request. +1 on the
> latest
> > > KIP.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > >
> > > > > > > > > Jun
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Tue, Feb 21, 2017 at 8:05 PM, Mayuresh Gharat <
> > > > > > > > > gharatmayures...@gmail.com
> > > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Jun,
> > > > > > > > > >
> > > > > > > > > > 

Re: [VOTE] KIP-81: Bound Fetch memory usage in the consumer

2017-02-28 Thread Mickael Maison
Yes I agree, having a generic flag is more future proof.
I'll update the KIP in the coming days.

Thanks

On Tue, Feb 28, 2017 at 5:08 AM, Jason Gustafson  wrote:
> Hey Mickael,
>
> The suggestion to add something to Node makes sense. I could imagine for
> example adding a flag to indicate that the connection has a higher
> "priority," meaning that we can allocate outside of the memory pool if
> necessary. That would still be generic even if the only use case is the
> consumer coordinator. We might also face a similar problem when the
> producer is sending requests to the transaction coordinator for KIP-98.
> What do you think?
>
> Thanks,
> Jason
>
> On Mon, Feb 27, 2017 at 9:09 AM, Mickael Maison 
> wrote:
>
>> Apologies for the late response.
>>
>> Thanks Jason for the suggestion. Yes you are right, the Coordinator
>> connection is "tagged" with a different id, so we could retrieve it in
>> NetworkReceive to make the distinction.
>> However, currently the coordinator connection are made different by using:
>> Integer.MAX_VALUE - groupCoordinatorResponse.node().id()
>> for the Node id.
>>
>> So to identify Coordinator connections, we'd have to check that the
>> NetworkReceive source is a value near Integer.MAX_VALUE which is a bit
>> hacky ...
>>
>> Maybe we could add a constructor to Node that allows to pass in a
>> sourceId String. That way we could make all the coordinator
>> connections explicit (by setting it to "Coordinator-[ID]" for
>> example).
>> What do you think ?
>>
>> On Tue, Jan 24, 2017 at 12:58 AM, Jason Gustafson 
>> wrote:
>> > Good point. The consumer does use a separate connection to the
>> coordinator,
>> > so perhaps the connection itself could be tagged for normal heap
>> allocation?
>> >
>> > -Jason
>> >
>> > On Mon, Jan 23, 2017 at 10:26 AM, Onur Karaman <
>> onurkaraman.apa...@gmail.com
>> >> wrote:
>> >
>> >> I only did a quick scan but I wanted to point out what I think is an
>> >> incorrect assumption in the KIP's caveats:
>> >> "
>> >> There is a risk using the MemoryPool that, after we fill up the memory
>> with
>> >> fetch data, we can starve the coordinator's connection
>> >> ...
>> >> To alleviate this issue, only messages larger than 1Kb will be
>> allocated in
>> >> the MemoryPool. Smaller messages will be allocated directly on the Heap
>> >> like before. This allows group/heartbeat messages to avoid being
>> delayed if
>> >> the MemoryPool fills up.
>> >> "
>> >>
>> >> So it sounds like there's an incorrect assumption that responses from
>> the
>> >> coordinator will always be small (< 1Kb as mentioned in the caveat).
>> There
>> >> are now a handful of request types between clients and the coordinator:
>> >> {JoinGroup, SyncGroup, LeaveGroup, Heartbeat, OffsetCommit, OffsetFetch,
>> >> ListGroups, DescribeGroups}. While true (at least today) for
>> >> HeartbeatResponse and a few others, I don't think we can assume
>> >> JoinGroupResponse, SyncGroupResponse, DescribeGroupsResponse, and
>> >> OffsetFetchResponse will be small, as they are effectively bounded by
>> the
>> >> max message size allowed by the broker for the __consumer_offsets topic
>> >> which by default is 1MB.
>> >>
>> >> On Mon, Jan 23, 2017 at 9:46 AM, Mickael Maison <
>> mickael.mai...@gmail.com>
>> >> wrote:
>> >>
>> >> > I've updated the KIP to address all the comments raised here and from
>> >> > the "DISCUSS" thread.
>> >> > See: https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
>> >> >
>> >> > Now, I'd like to restart the vote.
>> >> >
>> >> > On Tue, Dec 6, 2016 at 9:02 AM, Rajini Sivaram
>> >> >  wrote:
>> >> > > Hi Mickael,
>> >> > >
>> >> > > I am +1 on the overall approach of this KIP, but have a couple of
>> >> > comments
>> >> > > (sorry, should have brought them up on the discuss thread earlier):
>> >> > >
>> >> > > 1. Perhaps it would be better to do this after KAFKA-4137
>> >> > >  is implemented?
>> At
>> >> > the
>> >> > > moment, coordinator shares the same NetworkClient (and hence the
>> same
>> >> > > Selector) with consumer connections used for fetching records. Since
>> >> > > freeing of memory relies on consuming applications invoking poll()
>> >> after
>> >> > > processing previous records and potentially after committing
>> offsets,
>> >> it
>> >> > > will be good to ensure that coordinator is not blocked for read by
>> >> fetch
>> >> > > responses. This may be simpler once coordinator has its own
>> Selector.
>> >> > >
>> >> > > 2. The KIP says: *Once messages are returned to the user, messages
>> are
>> >> > > deleted from the MemoryPool so new messages can be stored.*
>> >> > > Can you expand that a bit? I am assuming that partial buffers never
>> get
>> >> > > freed when some messages are returned to the user since the
>> consumer is
>> >> > > still holding a reference 

Re: [DISCUSS] 0.10.3.0/0.11.0.0 release planning

2017-02-28 Thread Colin McCabe
+1 (non-binding).

Thanks, Ismael.

cheers,
Colin


On Mon, Feb 27, 2017, at 19:47, Ismael Juma wrote:
> Hi all,
> 
> With 0.10.2.0 out of the way, I would like to volunteer to be the release
> manager for our next time-based release. See https://cwiki.apache.org/c
> onfluence/display/KAFKA/Time+Based+Release+Plan if you missed previous
> communication on time-based releases or need a reminder.
> 
> I put together a draft release plan with June 2017 as the release month
> (as
> previously agreed) and a list of KIPs that have already been voted:
> 
> *https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=68716876
> *
> 
> I haven't set exact dates for the various stages (feature freeze, code
> freeze, etc.) for now as Ewen is going to send out an email with some
> suggested tweaks based on his experience as release manager for 0.10.2.0.
> We can set the exact dates after that discussion.
> 
> As we are starting the process early this time, we should expect the
> number
> of KIPs in the plan to grow (so don't worry if your KIP is not there
> yet),
> but it's good to see that we already have 10 (including 2 merged and 2
> with
> PR reviews in progress).
> 
> Out of the KIPs listed, KIP-98 (Exactly-once and Transactions) and
> KIP-101
> (Leader Generation in Replication) require message format changes, which
> typically imply a major version bump (i.e. 0.11.0.0). If we do that, then
> it makes sense to also include KIP-106 (Unclean leader election should be
> false by default) and KIP-118 (Drop support for Java 7). We would also
> take
> the chance to remove deprecated code, in that case.
> 
> Given the above, how do people feel about 0.11.0.0 as the next Kafka
> version? Please share your thoughts.
> 
> Thanks,
> Ismael


[GitHub] kafka pull request #2610: MINOR: Make asJsonSchema() and asConnectSchema() m...

2017-02-28 Thread C0urante
GitHub user C0urante opened a pull request:

https://github.com/apache/kafka/pull/2610

MINOR: Make asJsonSchema() and asConnectSchema() methods public

Want to use these methods in an external project.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/C0urante/kafka public-json-schema-conversion

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2610.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2610


commit 1f28682da64b7f088e741c9af7309406ca7eee2a
Author: Chris Egerton 
Date:   2017-02-28T17:30:54Z

Make asJsonSchema() and asConnectSchema() methods public




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-124: Request rate quotas

2017-02-28 Thread Colin McCabe
I noticed that the throttle_time_ms added to all the message responses
is in milliseconds.  Does it make sense to express this in microseconds
in case we start doing more fine-grained CPU throttling later on?  An
int32 should still be more than enough if using microseconds.

best,
Colin


On Fri, Feb 24, 2017, at 10:31, Jun Rao wrote:
> Hi, Jay,
> 
> 2. Regarding request.unit vs request.percentage. I started with
> request.percentage too. The reasoning for request.unit is the following.
> Suppose that the capacity has been reached on a broker and the admin
> needs
> to add a new user. A simple way to increase the capacity is to increase
> the
> number of io threads, assuming there are still enough cores. If the limit
> is based on percentage, the additional capacity automatically gets
> distributed to existing users and we haven't really carved out any
> additional resource for the new user. Now, is it easy for a user to
> reason
> about 0.1 unit vs 10%. My feeling is that both are hard and have to be
> configured empirically. Not sure if percentage is obviously easier to
> reason about.
> 
> Thanks,
> 
> Jun
> 
> On Fri, Feb 24, 2017 at 8:10 AM, Jay Kreps  wrote:
> 
> > A couple of quick points:
> >
> > 1. Even though the implementation of this quota is only using io thread
> > time, i think we should call it something like "request-time". This will
> > give us flexibility to improve the implementation to cover network threads
> > in the future and will avoid exposing internal details like our thread
> > pools on the server.
> >
> > 2. Jun/Roger, I get what you are trying to fix but the idea of thread/units
> > is super unintuitive as a user-facing knob. I had to read the KIP like
> > eight times to understand this. I'm not sure that your point that
> > increasing the number of threads is a problem with a percentage-based
> > value, it really depends on whether the user thinks about the "percentage
> > of request processing time" or "thread units". If they think "I have
> > allocated 10% of my request processing time to user x" then it is a bug
> > that increasing the thread count decreases that percent as it does in the
> > current proposal. As a practical matter I think the only way to actually
> > reason about this is as a percent---I just don't believe people are going
> > to think, "ah, 4.3 thread units, that is the right amount!". Instead I
> > think they have to understand this thread unit concept, figure out what
> > they have set in number of threads, compute a percent and then come up with
> > the number of thread units, and these will all be wrong if that thread
> > count changes. I also think this ties us to throttling the I/O thread pool,
> > which may not be where we want to end up.
> >
> > 3. For what it's worth I do think having a single throttle_ms field in all
> > the responses that combines all throttling from all quotas is probably the
> > simplest. There could be a use case for having separate fields for each,
> > but I think that is actually harder to use/monitor in the common case so
> > unless someone has a use case I think just one should be fine.
> >
> > -Jay
> >
> > On Fri, Feb 24, 2017 at 4:21 AM, Rajini Sivaram 
> > wrote:
> >
> > > I have updated the KIP based on the discussions so far.
> > >
> > >
> > > Regards,
> > >
> > > Rajini
> > >
> > > On Thu, Feb 23, 2017 at 11:29 PM, Rajini Sivaram <
> > rajinisiva...@gmail.com>
> > > wrote:
> > >
> > > > Thank you all for the feedback.
> > > >
> > > > Ismael #1. It makes sense not to throttle inter-broker requests like
> > > > LeaderAndIsr etc. The simplest way to ensure that clients cannot use
> > > these
> > > > requests to bypass quotas for DoS attacks is to ensure that ACLs
> > prevent
> > > > clients from using these requests and unauthorized requests are
> > included
> > > > towards quotas.
> > > >
> > > > Ismael #2, Jay #1 : I was thinking that these quotas can return a
> > > separate
> > > > throttle time, and all utilization based quotas could use the same
> > field
> > > > (we won't add another one for network thread utilization for instance).
> > > But
> > > > perhaps it makes sense to keep byte rate quotas separate in
> > produce/fetch
> > > > responses to provide separate metrics? Agree with Ismael that the name
> > of
> > > > the existing field should be changed if we have two. Happy to switch
> > to a
> > > > single combined throttle time if that is sufficient.
> > > >
> > > > Ismael #4, #5, #6: Will update KIP. Will use dot separated name for new
> > > > property. Replication quotas use dot separated, so it will be
> > consistent
> > > > with all properties except byte rate quotas.
> > > >
> > > > Radai: #1 Request processing time rather than request rate were chosen
> > > > because the time per request can vary significantly between requests as
> > > > mentioned in the discussion and KIP.
> > > > #2 Two separate quotas for heartbeats/regular requests feel like more
> > > 

Re: [VOTE] KIP-111 Kafka should preserve the Principal generated by the PrincipalBuilder while processing the request received on socket channel, on the broker.

2017-02-28 Thread Jun Rao
Hi, Ismael,

Good point on compatibility.

Hi, Mayuresh,

Given that, it seems that it's better to just add the raw principal as a
new field in Session for now and deprecate the KafkaPrincipal field in the
future if needed?

Thanks,

Jun

On Mon, Feb 27, 2017 at 5:05 PM, Ismael Juma  wrote:

> Breaking clients without a deprecation period is something we only do as a
> last resort. Is there strong justification for doing it here?
>
> Ismael
>
> On Mon, Feb 27, 2017 at 11:28 PM, Mayuresh Gharat <
> gharatmayures...@gmail.com> wrote:
>
> > Hi Ismael,
> >
> > Yeah. I agree that it might break the clients if the user is using the
> > kafkaPrincipal directly. But since KafkaPrincipal is also a Java
> Principal
> > and I think, it would be a right thing to do replace the kafkaPrincipal
> > with Java Principal at this stage than later.
> >
> > We can mention in the KIP, that it would break the clients that are using
> > the KafkaPrincipal directly and they will have to use the PrincipalType
> > directly, if they are using it as its only one value and use the name
> from
> > the Principal directly or create a KafkaPrincipal from Java Principal as
> we
> > are doing in SimpleAclAuthorizer with this KIP.
> >
> > Thanks,
> >
> > Mayuresh
> >
> >
> >
> > On Mon, Feb 27, 2017 at 10:56 AM, Ismael Juma  wrote:
> >
> > > Hi Mayuresh,
> > >
> > > Sorry for the delay. The updated KIP states that there is no
> > compatibility
> > > impact, but that doesn't seem right. The fact that we changed the type
> of
> > > Session.principal to `Principal` means that any code that expects it to
> > be
> > > `KafkaPrincipal` will break. Either because of declared types (likely)
> or
> > > if it accesses `getPrincipalType` (unlikely since the value is always
> the
> > > same). It's a bit annoying, but we should add a new field to `Session`
> > with
> > > the original principal. We can potentially deprecate the existing one,
> if
> > > we're sure we don't need it (or we can leave it for now).
> > >
> > > Ismael
> > >
> > > On Mon, Feb 27, 2017 at 6:40 PM, Mayuresh Gharat <
> > > gharatmayures...@gmail.com
> > > > wrote:
> > >
> > > > Hi Ismael, Joel, Becket
> > > >
> > > > Would you mind taking a look at this. We require 2 more binding votes
> > for
> > > > the KIP to pass.
> > > >
> > > > Thanks,
> > > >
> > > > Mayuresh
> > > >
> > > > On Thu, Feb 23, 2017 at 10:57 AM, Dong Lin 
> > wrote:
> > > >
> > > > > +1 (non-binding)
> > > > >
> > > > > On Wed, Feb 22, 2017 at 10:52 PM, Manikumar <
> > manikumar.re...@gmail.com
> > > >
> > > > > wrote:
> > > > >
> > > > > > +1 (non-binding)
> > > > > >
> > > > > > On Thu, Feb 23, 2017 at 3:27 AM, Mayuresh Gharat <
> > > > > > gharatmayures...@gmail.com
> > > > > > > wrote:
> > > > > >
> > > > > > > Hi Jun,
> > > > > > >
> > > > > > > Thanks a lot for the comments and reviews.
> > > > > > > I agree we should log the username.
> > > > > > > What I meant by creating KafkaPrincipal was, after this KIP we
> > > would
> > > > > not
> > > > > > be
> > > > > > > required to create KafkaPrincipal and if we want to maintain
> the
> > > old
> > > > > > > logging, we will have to create it as we do today.
> > > > > > > I will take care that we specify the Principal name in the log.
> > > > > > >
> > > > > > > Thanks again for all the reviews.
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Mayuresh
> > > > > > >
> > > > > > > On Wed, Feb 22, 2017 at 1:45 PM, Jun Rao 
> > wrote:
> > > > > > >
> > > > > > > > Hi, Mayuresh,
> > > > > > > >
> > > > > > > > For logging the user name, we could do either way. We just
> need
> > > to
> > > > > make
> > > > > > > > sure the expected user name is logged. Also, currently, we
> are
> > > > > already
> > > > > > > > creating a KafkaPrincipal on every request. +1 on the latest
> > KIP.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Jun
> > > > > > > >
> > > > > > > >
> > > > > > > > On Tue, Feb 21, 2017 at 8:05 PM, Mayuresh Gharat <
> > > > > > > > gharatmayures...@gmail.com
> > > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Jun,
> > > > > > > > >
> > > > > > > > > Thanks for the comments.
> > > > > > > > >
> > > > > > > > > I will mention in the KIP : how this change doesn't affect
> > the
> > > > > > default
> > > > > > > > > authorizer implementation.
> > > > > > > > >
> > > > > > > > > Regarding, Currently, we log the principal name in the
> > request
> > > > log
> > > > > in
> > > > > > > > > RequestChannel, which has the format of "principalType +
> > > > SEPARATOR
> > > > > +
> > > > > > > > > name;".
> > > > > > > > > It would be good if we can keep the same convention after
> > this
> > > > KIP.
> > > > > > One
> > > > > > > > way
> > > > > > > > > to do that is to convert java.security.Principal to
> > > > KafkaPrincipal
> > > > > > for
> > > > > > > > > logging the requests.
> > > > > > > > > --- > This would mean we have to create a 

Re: [VOTE] KIP-107: Add purgeDataBefore() API in AdminClient

2017-02-28 Thread Jun Rao
Hi, Dong,

Yes, this change makes sense to me.

Thanks,

Jun

On Mon, Feb 27, 2017 at 8:26 PM, Dong Lin  wrote:

> Hi Jun and everyone,
>
> I would like to change the KIP in the following way. Currently, if any
> replica if offline, the purge result for a partition will
> be NotEnoughReplicasException and its low_watermark will be 0. The
> motivation for this approach is that we want to guarantee that the data
> before purgedOffset has been deleted on all replicas of this partition if
> purge result indicates success.
>
> But this approach seems too conservative. It should be sufficient in most
> cases to just tell user success and set low_watermark to minimum
> logStartOffset of all live replicas in the PurgeResponse if logStartOffset
> of all live replicas have reached purgedOffset. This is because for an
> offline replicas to become online and be elected leader, it should have
> received one FetchReponse from the current leader which should tell it to
> purge beyond purgedOffset. The benefit of doing this change is that we can
> allow purge operation to succeed when some replica is offline.
>
> Are you OK with this change? If so, I will go ahead to update the KIP and
> implement this behavior.
>
> Thanks,
> Dong
>
>
>
> On Tue, Jan 17, 2017 at 10:18 AM, Dong Lin  wrote:
>
> > Hey Jun,
> >
> > Do you have time to review the KIP again or vote for it?
> >
> > Hey Ewen,
> >
> > Can you also review the KIP again or vote for it? I have discussed with
> > Radai and Becket regarding your concern. We still think putting it in
> Admin
> > Client seems more intuitive because there is use-case where application
> > which manages topic or produces data may also want to purge data. It
> seems
> > weird if they need to create a consumer to do this.
> >
> > Thanks,
> > Dong
> >
> > On Thu, Jan 12, 2017 at 9:34 AM, Mayuresh Gharat <
> > gharatmayures...@gmail.com> wrote:
> >
> >> +1 (non-binding)
> >>
> >> Thanks,
> >>
> >> Mayuresh
> >>
> >> On Wed, Jan 11, 2017 at 1:03 PM, Dong Lin  wrote:
> >>
> >> > Sorry for the duplicated email. It seems that gmail will put the
> voting
> >> > email in this thread if I simply replace DISCUSS with VOTE in the
> >> subject.
> >> >
> >> > On Wed, Jan 11, 2017 at 12:57 PM, Dong Lin 
> wrote:
> >> >
> >> > > Hi all,
> >> > >
> >> > > It seems that there is no further concern with the KIP-107. At this
> >> point
> >> > > we would like to start the voting process. The KIP can be found at
> >> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-107
> >> > > %3A+Add+purgeDataBefore%28%29+API+in+AdminClient.
> >> > >
> >> > > Thanks,
> >> > > Dong
> >> > >
> >> >
> >>
> >>
> >>
> >> --
> >> -Regards,
> >> Mayuresh R. Gharat
> >> (862) 250-7125
> >>
> >
> >
>


Re: [DISCUSS] KIP-112: Handle disk failure for JBOD

2017-02-28 Thread Jun Rao
Hi, Dong,

RAID6 is an improvement over RAID5 and can tolerate 2 disks failure. Eno's
point is that the rebuild of RAID5/RAID6 requires reading more data
compared with RAID10, which increases the probability of error during
rebuild. This makes sense. In any case, do you think you could ask the SREs
at LinkedIn to share their opinions on RAID5/RAID6?

Yes, when a replica is offline due to a bad disk, it makes sense to handle
it immediately as if a StopReplicaRequest is received (i.e., replica is no
longer considered a leader and is removed from any replica fetcher thread).
Could you add that detail in item 2. in the wiki?

50. The wiki says "Broker assumes a log directory to be good after it
starts" : A log directory actually could be bad during startup.

51. In item 4, the wiki says "The controller watches the path
/log_dir_event_notification for new znode.". This doesn't seem be needed
now?

52. The isNewReplica field in LeaderAndIsrRequest should be for each
replica inside the replicas field, right?

Other than those, the current KIP looks good to me. Do you want to start a
separate discussion thread on KIP-113? I do have some comments there.

Thanks for working on this!

Jun


On Mon, Feb 27, 2017 at 5:51 PM, Dong Lin  wrote:

> Hi Jun,
>
> In addition to the Eno's reference of why rebuild time with RAID-5 is more
> expensive, another concern is that RAID-5 will fail if more than one disk
> fails. JBOD is still works with 1+ disk failure and has better performance
> with one disk failure. These seems like good argument for using JBOD
> instead of RAID-5.
>
> If a leader replica goes offline, the broker should first take all actions
> (i.e. remove the partition from fetcher thread) as if it has received
> StopReplicaRequest for this partition because the replica can no longer
> work anyway. It will also respond with error to any ProduceRequest and
> FetchRequest for partition. The broker notifies controller by writing
> notification znode in ZK. The controller learns the disk failure event from
> ZK, sends LeaderAndIsrRequest and receives LeaderAndIsrResponse to learn
> that the replica is offline. The controller will then elect new leader for
> this partition and sends LeaderAndIsrRequest/MetadataUpdateRequest to
> relevant brokers. The broker should stop adjusting the ISR for this
> partition as if the broker is already offline. I am not sure there is any
> inconsistency in broker's behavior when it is leader or follower. Is there
> any concern with this approach?
>
> Thanks for catching this. I have removed that reference from the KIP.
>
> Hi Eno,
>
> Thank you for providing the reference of the RAID-5. In LinkedIn we have 10
> disks per Kafka machine. It will not be a show-stopper operationally for
> LinkedIn if we have to deploy one-broker-per-disk. On the other hand we
> previously discussed the advantage of JBOD vs. one-broker-per-disk or
> one-broker-per-machine. One-broker-per-disk suffers from the problems
> described in the KIP and one-broker-per-machine increases the failure
> caused by disk failure by 10X. Since JBOD is strictly better than either of
> the two, it is also better then one-broker-per-multiple-disk which is
> somewhere between one-broker-per-disk and one-broker-per-machine.
>
> I personally think the benefits of JBOD design is worth the implementation
> complexity it introduces. I would also argue that it is reasonable for
> Kafka to manage this low level detail because Kafka is already exposing and
> managing replication factor of its data. But whether the complexity is
> worthwhile can be subjective and I can not prove my opinion. I am
> contributing significant amount of time to do this KIP because Kafka
> develops at LinkedIn believes it is useful and worth the effort. Yeah, it
> will be useful to see what everyone else think about it.
>
>
> Thanks,
> Dong
>
>
> On Mon, Feb 27, 2017 at 1:16 PM, Jun Rao  wrote:
>
> > Hi, Dong,
> >
> > For RAID5, I am not sure the rebuild cost is a big concern. If a disk
> > fails, typically an admin has to bring down the broker, replace the
> failed
> > disk with a new one, trigger the RAID rebuild, and bring up the broker.
> > This way, there is no performance impact at runtime due to rebuild. The
> > benefit is that a broker doesn't fail in a hard way when there is a disk
> > failure and can be brought down in a controlled way for maintenance.
> While
> > the broker is running with a failed disk, reads may be more expensive
> since
> > they have to be computed from the parity. However, if most reads are from
> > page cache, this may not be a big issue either. So, it would be useful to
> > do some tests on RAID5 before we completely rule it out.
> >
> > Regarding whether to remove an offline replica from the fetcher thread
> > immediately. What do we do when a failed replica is a leader? Do we do
> > nothing or mark the replica as not the leader immediately? Intuitively,
> it
> > seems it's better if 

  1   2   >