Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability

2018-01-02 Thread Becket Qin
Thanks for the reply, Colin.

My concern for the reinitialization is potential churn rather than
efficiency. The current KIP proposal uses the time and priority based
protection to avoid thrashing, but it is not clear to me if that is
sufficient. For example, consider topic creation/deletion. In those cases,
a lot of the replica fetchers will potentially need to re-establish the
session. And there might be many client session got evicted. And thus again
need to re-establish sessions. This would involve two round trips (due to
InvalidFetchSessionException), potential metadata refresh and backoff.

Admittedly it is probably not going to be worse than what we have now, but
such uncertain impact still worries me. Are we going to have the follow up
optimization discussion before the implementation of this KIP or are we
going to do it after? In the past we used to have separate KIPs for a
complicated feature but implement them together. Perhaps we can do the same
here if you want to limit the scope of this KIP.

Thanks,

Jiangjie (Becket) Qin


On Tue, Jan 2, 2018 at 6:34 PM, Colin McCabe  wrote:

> On Tue, Jan 2, 2018, at 04:46, Becket Qin wrote:
> > Hi Colin,
> >
> > Good point about KIP-226. Maybe a separate broker epoch is needed
> although
> > it is a little awkward to let the consumer set this. So was there a
> > solution to the frequent pause and resume scenario? Did I miss something?
> >
> > Thanks,
> > Jiangjie (Becket) Qin
>
> Hi Becket,
>
> Allowing sessions to be re-initialized (as the current KIP does) makes
> frequent pauses and resumes less painful, because the memory associated
> with the old session can be reclaimed.  The only cost is sending a full
> fetch request once when the pause or resume is activated.
>
> There are other follow-on optimizations that we might want to do later,
> like allowing new partitions to be added to existing fetch sessions without
> a re-initialization, that could make this even more efficient.  But that's
> not in the current KIP, in order to avoid expanding the scope too much.
>
> best,
> Colin
>
> >
> > On Wed, Dec 27, 2017 at 1:40 PM, Colin McCabe 
> wrote:
> >
> > > On Sat, Dec 23, 2017, at 09:15, Becket Qin wrote:
> > > > Hi Colin,
> > > >
> > > > Thanks for the explanation. I want to clarify a bit more on my
> thoughts.
> > > >
> > > > I am fine with having a separate discussion as long as the follow-up
> > > > discussion will be incremental on top of this KIP instead of
> override the
> > > > protocol in this KIP.
> > >
> > > Hi Becket,
> > >
> > > Thanks for the clarification.  I do think that the changes we've been
> > > discussing would be incremental rather than completely replacing what
> we've
> > > talked about here.  See my responses inline below.
> > >
> > > >
> > > > I completely agree this KIP is useful by itself. That being said, we
> want
> > > > to avoid falling into a "local optimal" solution by just saying
> because
> > > it
> > > > solves the problem in this scope. I think we should also think if the
> > > > solution aligns with a "global optimal" (systematic optimal)
> solution as
> > > > well. That is why I brought up other considerations. If they turned
> out
> > > to
> > > > be orthogonal and should be addressed separately, that's good. But at
> > > least
> > > > it is worth thinking about the potential connections between those
> > > things.
> > > >
> > > > One example of such related consideration is the following two
> seemingly
> > > > unrelated things:
> > > >
> > > > 1. I might have missed the discussion, but it seems the concern of
> the
> > > > clients doing frequent pause and resume is still not addressed. Since
> > > this
> > > > is a pretty common use case for applications that want to have flow
> > > > control, or have prioritized consumption, or get consumption
> fairness, we
> > > > probably want to see how to handle this case. One of the solution
> might
> > > be
> > > > a long-lived session id spanning the clients' life time.
> > > >
> > > > 2. KAFKA-6029. The key problem is that the leader wants to know if a
> > > fetch
> > > > request is from a shutting down broker or from a restarted broker.
> > > >
> > > > The connection between those two issues is that both of them could be
> > > > addressed by having a life-long session id for each client (or
> fetcher,
> > > to
> > > > be more accurate). This may indicate that having a life long session
> id
> > > > might be a "global optimal" solution so it should be considered in
> this
> > > > KIP. Otherwise, a follow up KIP discussion for KAFKA-6029 may either
> > > > introduce a broker epoch unnecessarily (which will not be used by the
> > > > consumers at all) or override what we do in this KIP.
> > >
> > > Remember that a given follower will have more than one fetch session
> ID.
> > > Each fetcher thread will have its own session ID.  And we will
> eventually
> > > be able to dynamically add or remove fetcher threads using KIP-226.
> 

Re: [VOTE] KIP-243: Make ProducerConfig and ConsumerConfig constructors public

2018-01-02 Thread Matthias J. Sax
Thanks for the feedback.

I updated the KIP and PR accordingly.


-Matthias


On 1/2/18 9:18 PM, Ewen Cheslack-Postava wrote:
> Builders have historically seen some resistance in the project (by some
> notable original authors...) but they've been increasingly making their way
> in -- SchemaBuilder in Connect, I believe some small streams APIs,
> AdminClient stuff. The general trend seems to be towards more fluent APIs.
> 
> Regarding the KIP, I'm not sure why the APIs are currently Map, but
> this seems wrong. We have a mess of old Properties compatibility (which we
> seem to have dragged into the new producer/consumer APIs), but we could at
> least make these Map. (I prefer just doing Map
> since we go through a parse phase regardless of the types, but didn't
> manage to make that happen within Connect.)
> 
> Otherwise, the general idea seems fine to me.
> 
> -Ewen
> 
> On Thu, Dec 21, 2017 at 2:28 PM, Jason Gustafson  wrote:
> 
>> I didn't sense much resistance in that thread, just an effort to keep the
>> streams and core client config APIs consistent ;).
>>
>> I'd prefer seeing a KIP for a more general improvement, but this change
>> seems harmless and improves consistency between the clients, so +1 from me.
>>
>> -Jason
>>
>> On Thu, Dec 21, 2017 at 11:19 AM, Matthias J. Sax 
>> wrote:
>>
>>> I personally love the builder pattern idea. There was some push back in
>>> the past though from some people.
>>>
>>> cf https://issues.apache.org/jira/browse/KAFKA-4436
>>>
>>> Happy to propose the builder pattern but than we should have a proper
>>> DISCUSS thread. Maybe we do this as a follow up and just do this KIP
>> as-is?
>>>
>>>
>>> -Matthias
>>>
>>> On 12/21/17 10:28 AM, Jason Gustafson wrote:
 Hey Matthias,

 Let me suggest an alternative. As you have mentioned, these config
>>> classes
 do not give users much benefit currently. Maybe we change that? I think
 many users would appreciate having a builder for configuration since it
 provides type safety and is generally a much friendlier pattern to work
 with programmatically. Users could then do something like this:

 ConsumerConfig config = ConsumerConfig.newBuilder()
 .setBootstrapServers("localhost:9092")
 .setGroupId("group")
 .setRequestTimeout(15, TimeUnit.SECONDS)
 .build();

 Consumer consumer = new KafkaConsumer(config);

 An additional benefit of this is that it gives us a better way to
>> expose
 config deprecations. In any case, it would make it less odd to expose
>> the
 public constructor without giving users anything useful to do with the
 class.

 What do you think?

 -Jason

 On Wed, Dec 20, 2017 at 5:59 PM, Matthias J. Sax <
>> matth...@confluent.io>
 wrote:

> It's tailored for internal usage. I think client constructors don't
> benefit from accepting those config objects. We just want to be able
>> to
> access the default values for certain parameters.
>
> From a user point of view, it's actually boiler plate code if you pass
> in a config object instead of a plain Properties object because the
> config object itself is immutable.
>
> I actually create a JIRA to remove the constructors from KafkaStreams
> that do accept StreamsConfig for exact this reason:
> https://issues.apache.org/jira/browse/KAFKA-6386
>
>
> -Matthias
>
>
> On 12/20/17 3:33 PM, Jason Gustafson wrote:
>> Hi Matthias,
>>
>> Isn't it a little weird to make these constructors public but not
>> also
>> expose the corresponding client constructors that use them?
>>
>> -Jason
>>
>> On Tue, Dec 19, 2017 at 9:30 AM, Bill Bejeck 
>>> wrote:
>>
>>> +1
>>>
>>> On Tue, Dec 19, 2017 at 12:09 PM, Guozhang Wang >>
>>> wrote:
>>>
 +1

 On Tue, Dec 19, 2017 at 1:49 AM, Tom Bentley <
>> t.j.bent...@gmail.com>
 wrote:

> +1
>
> On 18 December 2017 at 23:28, Vahid S Hashemian <
 vahidhashem...@us.ibm.com
>>
> wrote:
>
>> +1
>>
>> Thanks for the KIP.
>>
>> --Vahid
>>
>>
>>
>> From:   Ted Yu 
>> To: dev@kafka.apache.org
>> Date:   12/18/2017 02:45 PM
>> Subject:Re: [VOTE] KIP-243: Make ProducerConfig and
> ConsumerConfig
>> constructors public
>>
>>
>>
>> +1
>>
>> nit: via "copy and past" an 'e' is missing at the end.
>>
>> On Mon, Dec 18, 2017 at 2:38 PM, Matthias J. Sax <
 matth...@confluent.io>
>> wrote:
>>
>>> Hi,
>>>
>>> I want to propose the following KIP:
>>>
>> 

Re: [VOTE] KIP-239 Add queryableStoreName() to GlobalKTable

2018-01-02 Thread Matthias J. Sax
@Richard: you can close this vote thread with a summary as usual and
update the KIP wiki page accordingly.


-Matthias

On 1/2/18 9:57 PM, Richard Yu wrote:
> A subsequent PR has already been created:
> https://github.com/apache/kafka/pull/4340/
> It should be seen on the JIRA.
> 
> 
> 
> On Tue, Jan 2, 2018 at 9:51 PM, Ewen Cheslack-Postava 
> wrote:
> 
>> Oh, the KIP passes w/ the required votes. My comment was just on
>> implementation details. I will leave comments about that up to the
>> subsequent PR and to the Kafka Streams folks that are much better suited
>> than me to comment on them :)
>>
>> -Ewen
>>
>> On Tue, Jan 2, 2018 at 9:28 PM, Richard Yu 
>> wrote:
>>
>>> After investigation, I have found that the
>>> InternalStreamsBuilder#globalTable method is the only instance where the
>>> constructor for GlobalKTableImpl is called.
>>> The KTableValueGetterSupplier parameter used in this particular
>> constructor
>>> is an instance of KTableSourceValueGetterSupplier. Hence, your
>> requirement
>>> is satisfied.
>>>
>>> Since this is the vote thread, if you have further comments, please
>> comment
>>> on the pull request.
>>>
>>> On Tue, Jan 2, 2018 at 6:38 PM, Ewen Cheslack-Postava >>
>>> wrote:
>>>
 +1 binding

 The idea seems reasonable. Looking at it implementation-wise, seems
>> there
 is a bit of awkwardness because GlobalKTableImpl uses a
 KTableValueGetterSupplier which seems to possibly have multiple stores,
>>> but
 maybe using the more specific KTableSourceValueGetterSupplier
 implementation instead can resolve that.

 -Ewen

 On Mon, Jan 1, 2018 at 6:22 PM, Ted Yu  wrote:

> Gentle reminder: one more binding vote is needed for the KIP to pass.
>
> Cheers
>
> On Thu, Dec 21, 2017 at 4:13 AM, Damian Guy 
 wrote:
>
>> +1
>>
>> On Wed, 20 Dec 2017 at 21:09 Ted Yu  wrote:
>>
>>> Ping for more (binding) votes.
>>>
>>> The pull request is ready.
>>>
>>> On Fri, Dec 15, 2017 at 12:57 PM, Guozhang Wang <
>>> wangg...@gmail.com>
>>> wrote:
>>>
 +1 (binding), thanks!

 On Fri, Dec 15, 2017 at 11:56 AM, Ted Yu 
> wrote:

> Hi,
> Here is the discussion thread:
>
> http://search-hadoop.com/m/Kafka/uyzND12QnH514pPO9?subj=
> Re+DISCUSS+KIP+239+Add+queryableStoreName+to+GlobalKTable
>
> Please vote on this KIP.
>
> Thanks
>



 --
 -- Guozhang

>>>
>>
>

>>>
>>
> 



signature.asc
Description: OpenPGP digital signature


[jira] [Created] (KAFKA-6419) Menu order in Streams docs incorrect

2018-01-02 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-6419:
--

 Summary: Menu order in Streams docs incorrect
 Key: KAFKA-6419
 URL: https://issues.apache.org/jira/browse/KAFKA-6419
 Project: Kafka
  Issue Type: Bug
  Components: documentation
Reporter: Matthias J. Sax
Assignee: Joel Hamill
Priority: Minor


The menu order in Steams page is
{noformat}
INTRODUCTIONDEVELOPERGUIDE CONCEPTSRUN DEMO APPTUTORIAL: WRITE 
APP
{noformat}

However, on page "Introduction" when clicking "next" you get to "Run Demo App" 
and from there (on "next") you get to "Tutorial: write App".

The menu and the order of "next" should align with each other.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [VOTE] KIP-239 Add queryableStoreName() to GlobalKTable

2018-01-02 Thread Richard Yu
A subsequent PR has already been created:
https://github.com/apache/kafka/pull/4340/
It should be seen on the JIRA.



On Tue, Jan 2, 2018 at 9:51 PM, Ewen Cheslack-Postava 
wrote:

> Oh, the KIP passes w/ the required votes. My comment was just on
> implementation details. I will leave comments about that up to the
> subsequent PR and to the Kafka Streams folks that are much better suited
> than me to comment on them :)
>
> -Ewen
>
> On Tue, Jan 2, 2018 at 9:28 PM, Richard Yu 
> wrote:
>
> > After investigation, I have found that the
> > InternalStreamsBuilder#globalTable method is the only instance where the
> > constructor for GlobalKTableImpl is called.
> > The KTableValueGetterSupplier parameter used in this particular
> constructor
> > is an instance of KTableSourceValueGetterSupplier. Hence, your
> requirement
> > is satisfied.
> >
> > Since this is the vote thread, if you have further comments, please
> comment
> > on the pull request.
> >
> > On Tue, Jan 2, 2018 at 6:38 PM, Ewen Cheslack-Postava  >
> > wrote:
> >
> > > +1 binding
> > >
> > > The idea seems reasonable. Looking at it implementation-wise, seems
> there
> > > is a bit of awkwardness because GlobalKTableImpl uses a
> > > KTableValueGetterSupplier which seems to possibly have multiple stores,
> > but
> > > maybe using the more specific KTableSourceValueGetterSupplier
> > > implementation instead can resolve that.
> > >
> > > -Ewen
> > >
> > > On Mon, Jan 1, 2018 at 6:22 PM, Ted Yu  wrote:
> > >
> > > > Gentle reminder: one more binding vote is needed for the KIP to pass.
> > > >
> > > > Cheers
> > > >
> > > > On Thu, Dec 21, 2017 at 4:13 AM, Damian Guy 
> > > wrote:
> > > >
> > > > > +1
> > > > >
> > > > > On Wed, 20 Dec 2017 at 21:09 Ted Yu  wrote:
> > > > >
> > > > > > Ping for more (binding) votes.
> > > > > >
> > > > > > The pull request is ready.
> > > > > >
> > > > > > On Fri, Dec 15, 2017 at 12:57 PM, Guozhang Wang <
> > wangg...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > +1 (binding), thanks!
> > > > > > >
> > > > > > > On Fri, Dec 15, 2017 at 11:56 AM, Ted Yu 
> > > > wrote:
> > > > > > >
> > > > > > > > Hi,
> > > > > > > > Here is the discussion thread:
> > > > > > > >
> > > > > > > > http://search-hadoop.com/m/Kafka/uyzND12QnH514pPO9?subj=
> > > > > > > > Re+DISCUSS+KIP+239+Add+queryableStoreName+to+GlobalKTable
> > > > > > > >
> > > > > > > > Please vote on this KIP.
> > > > > > > >
> > > > > > > > Thanks
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > -- Guozhang
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: [VOTE] KIP-239 Add queryableStoreName() to GlobalKTable

2018-01-02 Thread Ewen Cheslack-Postava
Oh, the KIP passes w/ the required votes. My comment was just on
implementation details. I will leave comments about that up to the
subsequent PR and to the Kafka Streams folks that are much better suited
than me to comment on them :)

-Ewen

On Tue, Jan 2, 2018 at 9:28 PM, Richard Yu 
wrote:

> After investigation, I have found that the
> InternalStreamsBuilder#globalTable method is the only instance where the
> constructor for GlobalKTableImpl is called.
> The KTableValueGetterSupplier parameter used in this particular constructor
> is an instance of KTableSourceValueGetterSupplier. Hence, your requirement
> is satisfied.
>
> Since this is the vote thread, if you have further comments, please comment
> on the pull request.
>
> On Tue, Jan 2, 2018 at 6:38 PM, Ewen Cheslack-Postava 
> wrote:
>
> > +1 binding
> >
> > The idea seems reasonable. Looking at it implementation-wise, seems there
> > is a bit of awkwardness because GlobalKTableImpl uses a
> > KTableValueGetterSupplier which seems to possibly have multiple stores,
> but
> > maybe using the more specific KTableSourceValueGetterSupplier
> > implementation instead can resolve that.
> >
> > -Ewen
> >
> > On Mon, Jan 1, 2018 at 6:22 PM, Ted Yu  wrote:
> >
> > > Gentle reminder: one more binding vote is needed for the KIP to pass.
> > >
> > > Cheers
> > >
> > > On Thu, Dec 21, 2017 at 4:13 AM, Damian Guy 
> > wrote:
> > >
> > > > +1
> > > >
> > > > On Wed, 20 Dec 2017 at 21:09 Ted Yu  wrote:
> > > >
> > > > > Ping for more (binding) votes.
> > > > >
> > > > > The pull request is ready.
> > > > >
> > > > > On Fri, Dec 15, 2017 at 12:57 PM, Guozhang Wang <
> wangg...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > +1 (binding), thanks!
> > > > > >
> > > > > > On Fri, Dec 15, 2017 at 11:56 AM, Ted Yu 
> > > wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > > Here is the discussion thread:
> > > > > > >
> > > > > > > http://search-hadoop.com/m/Kafka/uyzND12QnH514pPO9?subj=
> > > > > > > Re+DISCUSS+KIP+239+Add+queryableStoreName+to+GlobalKTable
> > > > > > >
> > > > > > > Please vote on this KIP.
> > > > > > >
> > > > > > > Thanks
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: [VOTE] KIP-231: Improve the Required ACL of ListGroups API

2018-01-02 Thread Ewen Cheslack-Postava
+1 (binding)

Thanks for the KIP Vahid, nice improvement!

-Ewen

On Tue, Dec 19, 2017 at 11:30 AM, Vahid S Hashemian <
vahidhashem...@us.ibm.com> wrote:

> I believe the concerns on this KIP have been addressed so far.
> Therefore, I'd like to start a vote.
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 231%3A+Improve+the+Required+ACL+of+ListGroups+API
>
> Thanks.
> --Vahid
>
>


Re: [DISCUSS] KIP-231: Improve the Required ACL of ListGroups API

2018-01-02 Thread Ewen Cheslack-Postava
Late to the game here, but I'm +1 on this. Some of the ConsumerGroup
permissions are weird, but this KIP brings the describe ACLs into better
alignment with everything else and makes things more functional for clients
with more locked down permissions.

-Ewen

On Fri, Dec 15, 2017 at 12:57 PM, Vahid S Hashemian <
vahidhashem...@us.ibm.com> wrote:

> If there are no other feedback or suggestion on this KIP, I'll start a
> vote early next week.
>
> Thanks.
> --Vahid
>
>
>
> From:   "Vahid S Hashemian" 
> To: dev@kafka.apache.org
> Date:   11/29/2017 03:18 PM
> Subject:Re: [DISCUSS] KIP-231: Improve the Required ACL of
> ListGroups API
>
>
>
> Completing the subject line :)
>
>
>
> From:   "Vahid S Hashemian" 
> To: dev 
> Date:   11/29/2017 03:17 PM
> Subject:[DISCUSS] KIP-231:
>
>
>
> Hi everyone,
>
> I started KIP-231 to propose a small change to the required ACL of
> ListGroups API (in response to KAFKA-5638):
> https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.
> apache.org_confluence_display_KAFKA_KIP-2D231-253A-
> 2BImprove-2Bthe-2BRequired-2BACL-2Bof-2BListGroups-2BAPI&
> d=DwIFAg=jf_iaSHvJObTbx-siA1ZOg=Q_itwloTQj3_xUKl7Nzswo6KE4Nj-
> kjJc7uSVcviKUc=XjHVTsIl7t-z0NBesB0U-ptMMm6mmpy3UqS8TjJM5yM=
> eu378oaLvC0Wzbfcz15Rwo4nqdrO11ENLK6v9Kq9Z6w=
>
>
> Your feedback and suggestions are welcome!
>
> Thanks.
> --Vahid
>
>
>
>
>
>
>
>
>
>
>


Re: [VOTE] KIP-239 Add queryableStoreName() to GlobalKTable

2018-01-02 Thread Richard Yu
After investigation, I have found that the
InternalStreamsBuilder#globalTable method is the only instance where the
constructor for GlobalKTableImpl is called.
The KTableValueGetterSupplier parameter used in this particular constructor
is an instance of KTableSourceValueGetterSupplier. Hence, your requirement
is satisfied.

Since this is the vote thread, if you have further comments, please comment
on the pull request.

On Tue, Jan 2, 2018 at 6:38 PM, Ewen Cheslack-Postava 
wrote:

> +1 binding
>
> The idea seems reasonable. Looking at it implementation-wise, seems there
> is a bit of awkwardness because GlobalKTableImpl uses a
> KTableValueGetterSupplier which seems to possibly have multiple stores, but
> maybe using the more specific KTableSourceValueGetterSupplier
> implementation instead can resolve that.
>
> -Ewen
>
> On Mon, Jan 1, 2018 at 6:22 PM, Ted Yu  wrote:
>
> > Gentle reminder: one more binding vote is needed for the KIP to pass.
> >
> > Cheers
> >
> > On Thu, Dec 21, 2017 at 4:13 AM, Damian Guy 
> wrote:
> >
> > > +1
> > >
> > > On Wed, 20 Dec 2017 at 21:09 Ted Yu  wrote:
> > >
> > > > Ping for more (binding) votes.
> > > >
> > > > The pull request is ready.
> > > >
> > > > On Fri, Dec 15, 2017 at 12:57 PM, Guozhang Wang 
> > > > wrote:
> > > >
> > > > > +1 (binding), thanks!
> > > > >
> > > > > On Fri, Dec 15, 2017 at 11:56 AM, Ted Yu 
> > wrote:
> > > > >
> > > > > > Hi,
> > > > > > Here is the discussion thread:
> > > > > >
> > > > > > http://search-hadoop.com/m/Kafka/uyzND12QnH514pPO9?subj=
> > > > > > Re+DISCUSS+KIP+239+Add+queryableStoreName+to+GlobalKTable
> > > > > >
> > > > > > Please vote on this KIP.
> > > > > >
> > > > > > Thanks
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> >
>


Re: [VOTE] KIP-243: Make ProducerConfig and ConsumerConfig constructors public

2018-01-02 Thread Ewen Cheslack-Postava
Builders have historically seen some resistance in the project (by some
notable original authors...) but they've been increasingly making their way
in -- SchemaBuilder in Connect, I believe some small streams APIs,
AdminClient stuff. The general trend seems to be towards more fluent APIs.

Regarding the KIP, I'm not sure why the APIs are currently Map, but
this seems wrong. We have a mess of old Properties compatibility (which we
seem to have dragged into the new producer/consumer APIs), but we could at
least make these Map. (I prefer just doing Map
since we go through a parse phase regardless of the types, but didn't
manage to make that happen within Connect.)

Otherwise, the general idea seems fine to me.

-Ewen

On Thu, Dec 21, 2017 at 2:28 PM, Jason Gustafson  wrote:

> I didn't sense much resistance in that thread, just an effort to keep the
> streams and core client config APIs consistent ;).
>
> I'd prefer seeing a KIP for a more general improvement, but this change
> seems harmless and improves consistency between the clients, so +1 from me.
>
> -Jason
>
> On Thu, Dec 21, 2017 at 11:19 AM, Matthias J. Sax 
> wrote:
>
> > I personally love the builder pattern idea. There was some push back in
> > the past though from some people.
> >
> > cf https://issues.apache.org/jira/browse/KAFKA-4436
> >
> > Happy to propose the builder pattern but than we should have a proper
> > DISCUSS thread. Maybe we do this as a follow up and just do this KIP
> as-is?
> >
> >
> > -Matthias
> >
> > On 12/21/17 10:28 AM, Jason Gustafson wrote:
> > > Hey Matthias,
> > >
> > > Let me suggest an alternative. As you have mentioned, these config
> > classes
> > > do not give users much benefit currently. Maybe we change that? I think
> > > many users would appreciate having a builder for configuration since it
> > > provides type safety and is generally a much friendlier pattern to work
> > > with programmatically. Users could then do something like this:
> > >
> > > ConsumerConfig config = ConsumerConfig.newBuilder()
> > > .setBootstrapServers("localhost:9092")
> > > .setGroupId("group")
> > > .setRequestTimeout(15, TimeUnit.SECONDS)
> > > .build();
> > >
> > > Consumer consumer = new KafkaConsumer(config);
> > >
> > > An additional benefit of this is that it gives us a better way to
> expose
> > > config deprecations. In any case, it would make it less odd to expose
> the
> > > public constructor without giving users anything useful to do with the
> > > class.
> > >
> > > What do you think?
> > >
> > > -Jason
> > >
> > > On Wed, Dec 20, 2017 at 5:59 PM, Matthias J. Sax <
> matth...@confluent.io>
> > > wrote:
> > >
> > >> It's tailored for internal usage. I think client constructors don't
> > >> benefit from accepting those config objects. We just want to be able
> to
> > >> access the default values for certain parameters.
> > >>
> > >> From a user point of view, it's actually boiler plate code if you pass
> > >> in a config object instead of a plain Properties object because the
> > >> config object itself is immutable.
> > >>
> > >> I actually create a JIRA to remove the constructors from KafkaStreams
> > >> that do accept StreamsConfig for exact this reason:
> > >> https://issues.apache.org/jira/browse/KAFKA-6386
> > >>
> > >>
> > >> -Matthias
> > >>
> > >>
> > >> On 12/20/17 3:33 PM, Jason Gustafson wrote:
> > >>> Hi Matthias,
> > >>>
> > >>> Isn't it a little weird to make these constructors public but not
> also
> > >>> expose the corresponding client constructors that use them?
> > >>>
> > >>> -Jason
> > >>>
> > >>> On Tue, Dec 19, 2017 at 9:30 AM, Bill Bejeck 
> > wrote:
> > >>>
> >  +1
> > 
> >  On Tue, Dec 19, 2017 at 12:09 PM, Guozhang Wang  >
> >  wrote:
> > 
> > > +1
> > >
> > > On Tue, Dec 19, 2017 at 1:49 AM, Tom Bentley <
> t.j.bent...@gmail.com>
> > > wrote:
> > >
> > >> +1
> > >>
> > >> On 18 December 2017 at 23:28, Vahid S Hashemian <
> > > vahidhashem...@us.ibm.com
> > >>>
> > >> wrote:
> > >>
> > >>> +1
> > >>>
> > >>> Thanks for the KIP.
> > >>>
> > >>> --Vahid
> > >>>
> > >>>
> > >>>
> > >>> From:   Ted Yu 
> > >>> To: dev@kafka.apache.org
> > >>> Date:   12/18/2017 02:45 PM
> > >>> Subject:Re: [VOTE] KIP-243: Make ProducerConfig and
> > >> ConsumerConfig
> > >>> constructors public
> > >>>
> > >>>
> > >>>
> > >>> +1
> > >>>
> > >>> nit: via "copy and past" an 'e' is missing at the end.
> > >>>
> > >>> On Mon, Dec 18, 2017 at 2:38 PM, Matthias J. Sax <
> > > matth...@confluent.io>
> > >>> wrote:
> > >>>
> >  Hi,
> > 
> >  I want to propose the following KIP:
> > 
> > >>> https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.
> > >>> 

Re: [VOTE] KIP-237: More Controller Health Metrics

2018-01-02 Thread Ewen Cheslack-Postava
Dong,

Seems like a reasonable addition. +1 (binding) from me.

There were some final naming issues in the DISCUSS thread that didn't get
follow up. I'm fine with either version of the naming as these are details
I think mostly advanced users will be monitoring and both naming arguments
are reasonable here.

-Ewen

On Thu, Dec 21, 2017 at 2:57 PM, Dong Lin  wrote:

> Bump up the thread so that we can have these sensors to monitor our Kafka
> service sooner.
>
> On Mon, Dec 18, 2017 at 2:03 PM, Dong Lin  wrote:
>
> > Hi all,
> >
> > Since there are no more outstanding comments, I would like to start
> voting
> > thread for KIP-237: https://cwiki.apache.org/confluence/display/KAFKA/
> > KIP-237%3A+More+Controller+Health+Metrics
> >
> > The KIP proposes to add a few more metrics to help monitor Kafka
> > Controller health.
> >
> > Thanks,
> > Dong
> >
>


Re: [DISCUSS] KIP-242: Mask password fields in Kafka Connect REST response

2018-01-02 Thread Ewen Cheslack-Postava
Vincent,

Thanks for the KIP. This is definitely an issue we know is a problem for
some users.

I think the major problem with the KIP as-is is that it makes it impossible
to get the original value back out of the API. This KIP probably ties in
significantly with ideas for securing the REST API (SSL) and adding ACLs to
it. Both are things we know people want, but haven't happened yet. However,
it also interacts with other approaches to adding those features, e.g.
layering proxies on top of the existing API (e.g. nginx, apache, etc). Just
doing a blanket replacement of password values with a constant would likely
break things for people who secure things via a proxy (and may just not
allow reads of configs unless the user is authorized for the particular
connector). These are the types of concerns we like to think through in the
compatibility section. One option to get the masking functionality in
without depending on a bunch of other security improvements might be to
make this configurable so users that need this (and can forgo seeing a
valid config via the API) can opt-in.

Regarding your individual points:

* I don't think the particular value for the masked content matters much.
Any constant indicating a password field is good. Your value seems fine to
me.
* I don't think ConnectorInfo has enough info on its own to do proper
masking. In fact, I think you need to parse the config enough to get the
Connector-specific ConfigDef out in order to determine which fields are
Password fields. I would probably try to push this to be as central as
possible, maybe adding a method to AbstractHerder that can get configs with
a boolean indicating whether they need to have sensitive fields removed.
That method could deal with parsing the config to get the right connector,
getting the connector config, and then sanitizing any configs that are
sensitive. We could have this in one location, then have the relevant REST
APIs just use the right flag to determine if they get sanitized or
unsanitized data.

That second point raises another interesting point -- what happens if the
connector configuration references a connector which the worker serving the
REST request *does not know about*? In that case, there will be no
corresponding ConfigDef that defines which fields are Passwords and need to
be sensitized. Does it return an error? Or just return the config as is?

-Ewen

On Thu, Dec 28, 2017 at 3:34 AM, Ted Yu  wrote:

> For the last point you raised, can you come up with a unit test that shows
> what you observed ?
>
> Cheers
>
> On Mon, Dec 18, 2017 at 11:14 AM, Vincent Meng  wrote:
>
> > Hi all,
> >
> > I've created KIP-242, a proposal to secure credentials in kafka connect
> > rest endpoint.
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 242%3A+Mask+password+in+Kafka+Connect+Rest+API+response
> >
> > Here are something I'd like to discuss:
> >
> >- The "masked" value is set to "*" (9 stars) currently. It's
> an
> >arbitrary value I picked. Are there any better options?
> >- The proposal change is in the
> >*org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource*
> >class, where before the response is returned we go through config and
> > mask
> >the password. This has been proven to work. However I think it's
> > cleaner if
> >we do the masking in
> >*org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo* where
> >config() method can return the masked config, so that we don't have to
> > mask
> >the value in each endpoint (and new endpoints if added in the
> future). I
> >ran into some issue with this. So after a while, I start seeing
> > incorrect
> >password being used for the connector. My conjecture is that the value
> >stored in kafka has been changed to the mask value. Can someone
> confirm
> >this might happen with kafka connect? Feel like
> *ConnectorInfo.Config()*
> >is used somewhere to update connect config storage topic.
> >
> > If there's any comments on the KIP let me know. Thank you very much.
> >
> > -Vincent
> >
>


Re: [DISCUSS] KIP-228 Negative record timestamp support

2018-01-02 Thread Ewen Cheslack-Postava
On Tue, Jan 2, 2018 at 8:04 PM, Matthias J. Sax 
wrote:

> I was thinking about a broker/topic config.
>
> However, I am not sure if we only need to worry about data written in
> the future (this would only be true, if there would be no records with
> -1 timestamp already). Assume that we you an existing topic that
> contains data with -1 = UNKNOWN records -- for this case, we would give
> those timestamps a new semantics if we suddenly allow negative
> timestamps. (Assuming that we don't allow -1 as a gap in the timeline
> what I would rather not do.)
>

Using the Java producer you cannot have a negative timestamp today. So
(modulo comment about being dependent on the client implementation), no
existing data should have -1 timestamp unless it is NO_TIMESTAMP.

When you say you'd rather not like to have -1 as a gap in the timeline, can
you explain the potential scale of impact? I view it as a relatively
unlikely value and something that people who are really concerned with
negative timestamps can easily work around. Probably many users won't care
as they will not be using pre-1970s data where they actually set the Kafka
timestamp (rather than having timestamps embedded in the data) anyway. I
agree it isn't ideal, but to me it looks like a reasonable tradeoff. What
are the effects/use cases that make you concerned that we'd see significant
user pain as a result?


>
> Also note, that it's not really client specific IMHO, as one could
> implement their own clients. There are many third party clients and we
> don't know if they check for negative timestamps (applications could
> even assign their own special meaning to negative timestamps as those
> are unused atm) -- furthermore, all older client not embedding a
> timestamp default to -1 on the broker side...
>

I said "client-specific" because some of the checks are done on the
client-side, which means they are dependent on the specific client
implementation being used. Based on the rest of your comment, I think we're
in agreement except for how we are naming things :) I'd have to double
check if the same level of enforcement is done broker-side. I only mention
that because we tend to discuss these proposals in the context of only the
Java clients, but it is worth thinking through the impact to other clients
as well.


>
> > The implementation could easily be made to map
> > those values into a range that is less likely to be utilized (e.g. use
> the
> > values near Long.MIN_VALUE and have the consumer convert back as needed).
> > The sentinel for NO_TIMESTAMP could be changed between versions as long
> as
> > it is handled consistently between client versions.
>
> This opens Pandora's box IMHO.
>

Why? There should be a small number of values that need to be mapped and
someone could think through the different compatibility issues that are
possible to determine if there are any significant issues/drawbacks.


>
> > Introducing the new
> > config seems like it has significant compatibility concerns that need to
> be
> > sorted out.
>
> I cannot follow here -- from my point of view, it relaxes compatibility
> concerns. If we only allow new topic to enable negative timestamps, old
> behavior and new behavior are not mixed. IMHO, mixing both would be a
> real issue. Thus, for new topics we can change "unknown" from -1 to
> Long.MIN_VALUE and don't mix two different approaches within a single
> topic.
>

What's the mechanism for this? Is the new config only allowed in
CreateTopics requests? If you use existing tooling to set topic configs,
you would just be able to set any valid config. Are the semantics just
undefined if you do? Unless it is impossible to do certain things, we have
to deal with the compatibility concerns regardless of intended use. Might
be fine to just say the behavior is undefined, but there's still work to be
done there. Regardless, I didn't (and probably still don't) have a concrete
understanding of the proposed setting, so hard for me to reason about it.

-Ewen


>
> I see your point that we do have too many configs -- we could also make
> it a new value for existing `message.timestamp.type`.
>
>
> -Matthias
>
>
> On 1/2/18 7:48 PM, Ewen Cheslack-Postava wrote:
> > For `allow.negative.timestamps`, do you mean this as a broker config? I'm
> > not entirely clear on what the proposal would entail.
> >
> > I think taking into account whether we're talking about compatibility
> with
> > existing data in Kafka vs enabling use of negative timestamps is
> important
> > here. If they're effectively not supported today (though admittedly this
> is
> > really client-specific), then we need only concern ourselves with data
> that
> > hasn't been produced into Kafka yet. In that case, we can always handle
> > sentinel values in special ways if we really want to. For example, the
> Java
> > producer does not accept any values < 0 and the API supports passing null
> > rather than the sentinels. The implementation could easily be 

Re: [DISCUSS] KIP-228 Negative record timestamp support

2018-01-02 Thread Matthias J. Sax
I was thinking about a broker/topic config.

However, I am not sure if we only need to worry about data written in
the future (this would only be true, if there would be no records with
-1 timestamp already). Assume that we you an existing topic that
contains data with -1 = UNKNOWN records -- for this case, we would give
those timestamps a new semantics if we suddenly allow negative
timestamps. (Assuming that we don't allow -1 as a gap in the timeline
what I would rather not do.)

Also note, that it's not really client specific IMHO, as one could
implement their own clients. There are many third party clients and we
don't know if they check for negative timestamps (applications could
even assign their own special meaning to negative timestamps as those
are unused atm) -- furthermore, all older client not embedding a
timestamp default to -1 on the broker side...

> The implementation could easily be made to map
> those values into a range that is less likely to be utilized (e.g. use the
> values near Long.MIN_VALUE and have the consumer convert back as needed).
> The sentinel for NO_TIMESTAMP could be changed between versions as long as
> it is handled consistently between client versions.

This opens Pandora's box IMHO.

> Introducing the new
> config seems like it has significant compatibility concerns that need to be
> sorted out. 

I cannot follow here -- from my point of view, it relaxes compatibility
concerns. If we only allow new topic to enable negative timestamps, old
behavior and new behavior are not mixed. IMHO, mixing both would be a
real issue. Thus, for new topics we can change "unknown" from -1 to
Long.MIN_VALUE and don't mix two different approaches within a single topic.

I see your point that we do have too many configs -- we could also make
it a new value for existing `message.timestamp.type`.


-Matthias


On 1/2/18 7:48 PM, Ewen Cheslack-Postava wrote:
> For `allow.negative.timestamps`, do you mean this as a broker config? I'm
> not entirely clear on what the proposal would entail.
> 
> I think taking into account whether we're talking about compatibility with
> existing data in Kafka vs enabling use of negative timestamps is important
> here. If they're effectively not supported today (though admittedly this is
> really client-specific), then we need only concern ourselves with data that
> hasn't been produced into Kafka yet. In that case, we can always handle
> sentinel values in special ways if we really want to. For example, the Java
> producer does not accept any values < 0 and the API supports passing null
> rather than the sentinels. The implementation could easily be made to map
> those values into a range that is less likely to be utilized (e.g. use the
> values near Long.MIN_VALUE and have the consumer convert back as needed).
> The sentinel for NO_TIMESTAMP could be changed between versions as long as
> it is handled consistently between client versions.
> 
> IMO we already have way too many configs, so we should think about where
> the impact is and if a not ideal, but also not significant compromise can
> be made and avoid most of the additional complexity. Introducing the new
> config seems like it has significant compatibility concerns that need to be
> sorted out. In contrast, I suspect the use cases we need to support that
> have come up so far can handle 1 or 2 special cases and the necessary
> munging could be handled safely by interceptors such that it is trivial to
> make sure all your apps do the right thing. I appreciate the pain of a ton
> of mailing list questions about an issue like this, but given the
> likelihood of encountering that particular value, I just find it unlikely
> it would be that common and I think it's a reasonable tradeoff to tell a
> user they might need to handle that one special case.
> 
> -Ewen
> 
> On Thu, Dec 28, 2017 at 12:58 PM, Matthias J. Sax 
> wrote:
> 
>> I agree that changing message format or using a flag bit might not be
>> worth it.
>>
>> However, just keeping -1 as "unknown" leaving a time gap give me a lot
>> of headache, too. Your arguments about "not an issue in practice" kinda
>> make sense to me, but I see the number of question on the mailing list
>> already if we really follow this path... It will confuse users that
>> don't pay attention and "loose" data if Kafka Streams drops records with
>> timestamp -1 but processes other records with negative timestamps.
>>
>> Thus, I was wondering if a new topic config (maybe
>> `allow.negative.timestamps` with default `false`) that allows for enable
>> negative timestamps would be the better solution? With this new config,
>> we would not have any sentinel value for "unknown" and all timestamps
>> would be valid. Old producers, can't write to those topics if they are
>> configured with CREATE_TIME though; APPEND_TIME would still work for
>> older producers but with APPEND_TIME no negative timestamps are possible
>> in the first place, so this config would 

Re: [VOTE] KIP-233: Simplify StreamsBuilder#addGlobalStore

2018-01-02 Thread Matthias J. Sax
It must be two different names, as we add two processor to the topology:
a source processor that only read the data from a topic, and the actual
processor that maintains the global table.


-Matthias

On 1/2/18 7:14 PM, Ewen Cheslack-Postava wrote:
> +1 binding, seems like a nice simplification.
> 
> Regarding the source and processor name, do they actually need to be unique
> or could they use the same value? Since these use incrementing integers, it
> could be nice for debuggability/understanding to have them use the same
> name if possible instead of generating 2 separate names.
> 
> -Ewen
> 
> On Tue, Jan 2, 2018 at 9:12 AM, Guozhang Wang  wrote:
> 
>> On a side note, could you update the "compatibility and upgrade" section,
>> that when users start to make code changes to leverage the new API, what
>> kind of upgrade executions they'd need to do? I feel they need to rename
>> topics / etc.
>>
>> On Tue, Jan 2, 2018 at 9:10 AM, Guozhang Wang  wrote:
>>
>>> +1, thanks!
>>>
>>> On Wed, Dec 27, 2017 at 6:01 PM, Ted Yu  wrote:
>>>
 +1

 On Wed, Dec 27, 2017 at 12:15 PM, Bill Bejeck 
>> wrote:

> +1
>
> On Wed, Dec 27, 2017 at 3:07 PM, Matthias J. Sax <
>> matth...@confluent.io
>
> wrote:
>
>> +1
>>
>> On 12/26/17 9:00 PM, Panuwat Anawatmongkhon wrote:
>>> Hi all,
>>> I would like to start the vote thread.
>>> This is link for the kip.
>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-233%
 3A+Simplify+
>> StreamsBuilder%23addGlobalStore
>>>
>>> Cheers
>>>
>>
>>
>

>>>
>>>
>>>
>>> --
>>> -- Guozhang
>>>
>>
>>
>>
>> --
>> -- Guozhang
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: [DISCUSS] KIP-228 Negative record timestamp support

2018-01-02 Thread Ewen Cheslack-Postava
For `allow.negative.timestamps`, do you mean this as a broker config? I'm
not entirely clear on what the proposal would entail.

I think taking into account whether we're talking about compatibility with
existing data in Kafka vs enabling use of negative timestamps is important
here. If they're effectively not supported today (though admittedly this is
really client-specific), then we need only concern ourselves with data that
hasn't been produced into Kafka yet. In that case, we can always handle
sentinel values in special ways if we really want to. For example, the Java
producer does not accept any values < 0 and the API supports passing null
rather than the sentinels. The implementation could easily be made to map
those values into a range that is less likely to be utilized (e.g. use the
values near Long.MIN_VALUE and have the consumer convert back as needed).
The sentinel for NO_TIMESTAMP could be changed between versions as long as
it is handled consistently between client versions.

IMO we already have way too many configs, so we should think about where
the impact is and if a not ideal, but also not significant compromise can
be made and avoid most of the additional complexity. Introducing the new
config seems like it has significant compatibility concerns that need to be
sorted out. In contrast, I suspect the use cases we need to support that
have come up so far can handle 1 or 2 special cases and the necessary
munging could be handled safely by interceptors such that it is trivial to
make sure all your apps do the right thing. I appreciate the pain of a ton
of mailing list questions about an issue like this, but given the
likelihood of encountering that particular value, I just find it unlikely
it would be that common and I think it's a reasonable tradeoff to tell a
user they might need to handle that one special case.

-Ewen

On Thu, Dec 28, 2017 at 12:58 PM, Matthias J. Sax 
wrote:

> I agree that changing message format or using a flag bit might not be
> worth it.
>
> However, just keeping -1 as "unknown" leaving a time gap give me a lot
> of headache, too. Your arguments about "not an issue in practice" kinda
> make sense to me, but I see the number of question on the mailing list
> already if we really follow this path... It will confuse users that
> don't pay attention and "loose" data if Kafka Streams drops records with
> timestamp -1 but processes other records with negative timestamps.
>
> Thus, I was wondering if a new topic config (maybe
> `allow.negative.timestamps` with default `false`) that allows for enable
> negative timestamps would be the better solution? With this new config,
> we would not have any sentinel value for "unknown" and all timestamps
> would be valid. Old producers, can't write to those topics if they are
> configured with CREATE_TIME though; APPEND_TIME would still work for
> older producers but with APPEND_TIME no negative timestamps are possible
> in the first place, so this config would not have any impact anyway.
>
> Kafka Streams could check the topic config and only drop negative
> timestamps is they are not enabled. Or course, existing topic should not
> enable negative timestamps if there are records with -1 in them already
> -- otherwise, semantics break down -- but this would be a config error
> we cannot prevent. However, I would expect that mostly newly created
> topics would enable this config anyway.
>
>
> -Matthias
>
> On 12/18/17 10:47 PM, Ewen Cheslack-Postava wrote:
> > I think the trivial change of just recognizing using -1 was a mistake
> for a
> > sentinel value and special casing it while allowing other negative values
> > through is the most practical, reasonable change.
> >
> > Realistically, the scope of impact for that -1 is pretty tiny, as has
> been
> > pointed out. A single millisecond gap in available timestamps in 1969.
> For
> > producers that really want to be careful (as the NYT data might want to
> > be), having the producer layer adjust accordingly is unlikely to be an
> > issue (you can't assume these timestamps are unique anyway, so they
> cannot
> > reasonably used for ordering; adjusting by 1ms is a practical tradeoff).
> >
> > Other approaches where we modify the semantics of the timestamp from the
> > two existing modes require eating up valuable flags in the message
> format,
> > or ramping the message format version, all of which make things
> > significantly messier. Hell, timezones, leap seconds, and ms granularity
> > probably make that 1ms window pretty much moot for any practical
> > applications, and for the extremely rare case that an application might
> > care, they are probably willing to pay the cost of a secondary index if
> > they needed to store timestamp values in the payload rather than in the
> > metadata.
> >
> > Given that we have the current system in place, I suspect that any
> > translation to using Long.MIN_VALUE as the sentinel is probably just more
> > confusing to users, 

Re: [VOTE] KIP-233: Simplify StreamsBuilder#addGlobalStore

2018-01-02 Thread Ewen Cheslack-Postava
+1 binding, seems like a nice simplification.

Regarding the source and processor name, do they actually need to be unique
or could they use the same value? Since these use incrementing integers, it
could be nice for debuggability/understanding to have them use the same
name if possible instead of generating 2 separate names.

-Ewen

On Tue, Jan 2, 2018 at 9:12 AM, Guozhang Wang  wrote:

> On a side note, could you update the "compatibility and upgrade" section,
> that when users start to make code changes to leverage the new API, what
> kind of upgrade executions they'd need to do? I feel they need to rename
> topics / etc.
>
> On Tue, Jan 2, 2018 at 9:10 AM, Guozhang Wang  wrote:
>
> > +1, thanks!
> >
> > On Wed, Dec 27, 2017 at 6:01 PM, Ted Yu  wrote:
> >
> >> +1
> >>
> >> On Wed, Dec 27, 2017 at 12:15 PM, Bill Bejeck 
> wrote:
> >>
> >> > +1
> >> >
> >> > On Wed, Dec 27, 2017 at 3:07 PM, Matthias J. Sax <
> matth...@confluent.io
> >> >
> >> > wrote:
> >> >
> >> > > +1
> >> > >
> >> > > On 12/26/17 9:00 PM, Panuwat Anawatmongkhon wrote:
> >> > > > Hi all,
> >> > > > I would like to start the vote thread.
> >> > > > This is link for the kip.
> >> > > >
> >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-233%
> >> 3A+Simplify+
> >> > > StreamsBuilder%23addGlobalStore
> >> > > >
> >> > > > Cheers
> >> > > >
> >> > >
> >> > >
> >> >
> >>
> >
> >
> >
> > --
> > -- Guozhang
> >
>
>
>
> --
> -- Guozhang
>


Build failed in Jenkins: kafka-trunk-jdk7 #3072

2018-01-02 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] Revert "KAFKA-6383: complete shutdown for CREATED StreamThreads 
(#4343)"

--
[...truncated 220.28 KB...]
kafka.api.SslEndToEndAuthorizationTest > 
testNoConsumeWithDescribeAclViaSubscribe PASSED

kafka.api.SslEndToEndAuthorizationTest > 
testNoConsumeWithoutDescribeAclViaAssign STARTED

kafka.api.SslEndToEndAuthorizationTest > 
testNoConsumeWithoutDescribeAclViaAssign PASSED

kafka.api.SslEndToEndAuthorizationTest > testNoGroupAcl STARTED

kafka.api.SslEndToEndAuthorizationTest > testNoGroupAcl PASSED

kafka.api.SslEndToEndAuthorizationTest > testNoProduceWithDescribeAcl STARTED

kafka.api.SslEndToEndAuthorizationTest > testNoProduceWithDescribeAcl PASSED

kafka.api.SslEndToEndAuthorizationTest > testProduceConsumeViaSubscribe STARTED

kafka.api.SslEndToEndAuthorizationTest > testProduceConsumeViaSubscribe PASSED

kafka.api.SslEndToEndAuthorizationTest > testNoProduceWithoutDescribeAcl STARTED

kafka.api.SslEndToEndAuthorizationTest > testNoProduceWithoutDescribeAcl PASSED

kafka.api.SaslClientsWithInvalidCredentialsTest > 
testTransactionalProducerWithAuthenticationFailure STARTED

kafka.api.SaslClientsWithInvalidCredentialsTest > 
testTransactionalProducerWithAuthenticationFailure PASSED

kafka.api.SaslClientsWithInvalidCredentialsTest > 
testManualAssignmentConsumerWithAuthenticationFailure STARTED

kafka.api.SaslClientsWithInvalidCredentialsTest > 
testManualAssignmentConsumerWithAuthenticationFailure PASSED

kafka.api.SaslClientsWithInvalidCredentialsTest > 
testProducerWithAuthenticationFailure STARTED

kafka.api.SaslClientsWithInvalidCredentialsTest > 
testProducerWithAuthenticationFailure PASSED

kafka.api.SaslClientsWithInvalidCredentialsTest > 
testConsumerGroupServiceWithAuthenticationFailure STARTED

kafka.api.SaslClientsWithInvalidCredentialsTest > 
testConsumerGroupServiceWithAuthenticationFailure PASSED

kafka.api.SaslClientsWithInvalidCredentialsTest > 
testManualAssignmentConsumerWithAutoCommitDisabledWithAuthenticationFailure 
STARTED

kafka.api.SaslClientsWithInvalidCredentialsTest > 
testManualAssignmentConsumerWithAutoCommitDisabledWithAuthenticationFailure 
PASSED

kafka.api.SaslClientsWithInvalidCredentialsTest > 
testKafkaAdminClientWithAuthenticationFailure STARTED

kafka.api.SaslClientsWithInvalidCredentialsTest > 
testKafkaAdminClientWithAuthenticationFailure PASSED

kafka.api.SaslClientsWithInvalidCredentialsTest > 
testConsumerWithAuthenticationFailure STARTED

kafka.api.SaslClientsWithInvalidCredentialsTest > 
testConsumerWithAuthenticationFailure PASSED

kafka.api.PlaintextProducerSendTest > 
testSendCompressedMessageWithLogAppendTime STARTED

kafka.api.PlaintextProducerSendTest > 
testSendCompressedMessageWithLogAppendTime PASSED

kafka.api.PlaintextProducerSendTest > testAutoCreateTopic STARTED

kafka.api.PlaintextProducerSendTest > testAutoCreateTopic PASSED

kafka.api.PlaintextProducerSendTest > testSendWithInvalidCreateTime STARTED

kafka.api.PlaintextProducerSendTest > testSendWithInvalidCreateTime PASSED

kafka.api.PlaintextProducerSendTest > testBatchSizeZero STARTED

kafka.api.PlaintextProducerSendTest > testBatchSizeZero PASSED

kafka.api.PlaintextProducerSendTest > testWrongSerializer STARTED

kafka.api.PlaintextProducerSendTest > testWrongSerializer PASSED

kafka.api.PlaintextProducerSendTest > 
testSendNonCompressedMessageWithLogAppendTime STARTED

kafka.api.PlaintextProducerSendTest > 
testSendNonCompressedMessageWithLogAppendTime PASSED

kafka.api.PlaintextProducerSendTest > 
testSendNonCompressedMessageWithCreateTime STARTED

kafka.api.PlaintextProducerSendTest > 
testSendNonCompressedMessageWithCreateTime PASSED

kafka.api.PlaintextProducerSendTest > testClose STARTED

kafka.api.PlaintextProducerSendTest > testClose PASSED

kafka.api.PlaintextProducerSendTest > testFlush STARTED

kafka.api.PlaintextProducerSendTest > testFlush PASSED

kafka.api.PlaintextProducerSendTest > testSendToPartition STARTED

kafka.api.PlaintextProducerSendTest > testSendToPartition PASSED

kafka.api.PlaintextProducerSendTest > testSendOffset STARTED

kafka.api.PlaintextProducerSendTest > testSendOffset PASSED

kafka.api.PlaintextProducerSendTest > testSendCompressedMessageWithCreateTime 
STARTED

kafka.api.PlaintextProducerSendTest > testSendCompressedMessageWithCreateTime 
PASSED

kafka.api.PlaintextProducerSendTest > testCloseWithZeroTimeoutFromCallerThread 
STARTED

kafka.api.PlaintextProducerSendTest > testCloseWithZeroTimeoutFromCallerThread 
PASSED

kafka.api.PlaintextProducerSendTest > testCloseWithZeroTimeoutFromSenderThread 
STARTED

kafka.api.PlaintextProducerSendTest > testCloseWithZeroTimeoutFromSenderThread 
PASSED

kafka.api.PlaintextProducerSendTest > testSendBeforeAndAfterPartitionExpansion 
STARTED

kafka.api.PlaintextProducerSendTest > testSendBeforeAndAfterPartitionExpansion 
PASSED

kafka.api.MetricsTest > 

Re: [VOTE] KIP-239 Add queryableStoreName() to GlobalKTable

2018-01-02 Thread Ewen Cheslack-Postava
+1 binding

The idea seems reasonable. Looking at it implementation-wise, seems there
is a bit of awkwardness because GlobalKTableImpl uses a
KTableValueGetterSupplier which seems to possibly have multiple stores, but
maybe using the more specific KTableSourceValueGetterSupplier
implementation instead can resolve that.

-Ewen

On Mon, Jan 1, 2018 at 6:22 PM, Ted Yu  wrote:

> Gentle reminder: one more binding vote is needed for the KIP to pass.
>
> Cheers
>
> On Thu, Dec 21, 2017 at 4:13 AM, Damian Guy  wrote:
>
> > +1
> >
> > On Wed, 20 Dec 2017 at 21:09 Ted Yu  wrote:
> >
> > > Ping for more (binding) votes.
> > >
> > > The pull request is ready.
> > >
> > > On Fri, Dec 15, 2017 at 12:57 PM, Guozhang Wang 
> > > wrote:
> > >
> > > > +1 (binding), thanks!
> > > >
> > > > On Fri, Dec 15, 2017 at 11:56 AM, Ted Yu 
> wrote:
> > > >
> > > > > Hi,
> > > > > Here is the discussion thread:
> > > > >
> > > > > http://search-hadoop.com/m/Kafka/uyzND12QnH514pPO9?subj=
> > > > > Re+DISCUSS+KIP+239+Add+queryableStoreName+to+GlobalKTable
> > > > >
> > > > > Please vote on this KIP.
> > > > >
> > > > > Thanks
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
>


Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability

2018-01-02 Thread Colin McCabe
On Tue, Jan 2, 2018, at 04:46, Becket Qin wrote:
> Hi Colin,
> 
> Good point about KIP-226. Maybe a separate broker epoch is needed although
> it is a little awkward to let the consumer set this. So was there a
> solution to the frequent pause and resume scenario? Did I miss something?
> 
> Thanks,
> Jiangjie (Becket) Qin

Hi Becket,

Allowing sessions to be re-initialized (as the current KIP does) makes frequent 
pauses and resumes less painful, because the memory associated with the old 
session can be reclaimed.  The only cost is sending a full fetch request once 
when the pause or resume is activated.

There are other follow-on optimizations that we might want to do later, like 
allowing new partitions to be added to existing fetch sessions without a 
re-initialization, that could make this even more efficient.  But that's not in 
the current KIP, in order to avoid expanding the scope too much.

best,
Colin

> 
> On Wed, Dec 27, 2017 at 1:40 PM, Colin McCabe  wrote:
> 
> > On Sat, Dec 23, 2017, at 09:15, Becket Qin wrote:
> > > Hi Colin,
> > >
> > > Thanks for the explanation. I want to clarify a bit more on my thoughts.
> > >
> > > I am fine with having a separate discussion as long as the follow-up
> > > discussion will be incremental on top of this KIP instead of override the
> > > protocol in this KIP.
> >
> > Hi Becket,
> >
> > Thanks for the clarification.  I do think that the changes we've been
> > discussing would be incremental rather than completely replacing what we've
> > talked about here.  See my responses inline below.
> >
> > >
> > > I completely agree this KIP is useful by itself. That being said, we want
> > > to avoid falling into a "local optimal" solution by just saying because
> > it
> > > solves the problem in this scope. I think we should also think if the
> > > solution aligns with a "global optimal" (systematic optimal) solution as
> > > well. That is why I brought up other considerations. If they turned out
> > to
> > > be orthogonal and should be addressed separately, that's good. But at
> > least
> > > it is worth thinking about the potential connections between those
> > things.
> > >
> > > One example of such related consideration is the following two seemingly
> > > unrelated things:
> > >
> > > 1. I might have missed the discussion, but it seems the concern of the
> > > clients doing frequent pause and resume is still not addressed. Since
> > this
> > > is a pretty common use case for applications that want to have flow
> > > control, or have prioritized consumption, or get consumption fairness, we
> > > probably want to see how to handle this case. One of the solution might
> > be
> > > a long-lived session id spanning the clients' life time.
> > >
> > > 2. KAFKA-6029. The key problem is that the leader wants to know if a
> > fetch
> > > request is from a shutting down broker or from a restarted broker.
> > >
> > > The connection between those two issues is that both of them could be
> > > addressed by having a life-long session id for each client (or fetcher,
> > to
> > > be more accurate). This may indicate that having a life long session id
> > > might be a "global optimal" solution so it should be considered in this
> > > KIP. Otherwise, a follow up KIP discussion for KAFKA-6029 may either
> > > introduce a broker epoch unnecessarily (which will not be used by the
> > > consumers at all) or override what we do in this KIP.
> >
> > Remember that a given follower will have more than one fetch session ID.
> > Each fetcher thread will have its own session ID.  And we will eventually
> > be able to dynamically add or remove fetcher threads using KIP-226.
> > Therefore, we can't use fetch session IDs to uniquely identify a given
> > broker incarnation.  Any time we increase the number of fetcher threads, a
> > new fetch session ID will show up.
> >
> > If we want to know if a fetch request is from a shutting down broker or
> > from a restarted broker, the most straightforward and robust way would
> > probably be to add an incarnation number for each broker.  ZK can track
> > this number.  This also helps with debugging and logging (you can tell
> > "aha-- this request came from the second incarnation, not the first."
> >
> > > BTW, to clarify, the main purpose of returning the data at the index
> > > boundary was to get the same benefit of efficient incremental fetch for
> > > both low vol and high vol partitions, which is directly related to the
> > > primary goal in this KIP. The other things (such as avoiding binary
> > search)
> > > are just potential additional gain, and they are also brought up to see
> > if
> > > that could be a "global optimal" solution.
> >
> > I still think these are separate.  The primary goal of the KIP was to make
> > fetch requests where not all partitions are returning data more efficient.
> > This isn't really related to the goal of trying to make accessing
> > historical data more efficient.  In most cases, the data 

Re: [DISCUSS] KIP-232: Detect outdated metadata by adding ControllerMetadataEpoch field

2018-01-02 Thread Jun Rao
Hi, Dong,

Thanks for the reply.

To solve the topic recreation issue, we could use either a global metadata
version or a partition level epoch. But either one will be a new concept,
right? To me, the latter seems more natural. It also makes it easier to
detect if a consumer's offset is still valid after a topic is recreated. As
you pointed out, we don't need to store the partition epoch in the message.
The following is what I am thinking. When a partition is created, we can
assign a partition epoch from an ever-increasing global counter and store
it in /brokers/topics/[topic]/partitions/[partitionId] in ZK. The partition
epoch is propagated to every broker. The consumer will be tracking a tuple
of  for offsets. If a topic is
recreated, it's possible that a consumer's offset and leader epoch still
match that in the broker, but partition epoch won't be. In this case, we
can potentially still treat the consumer's offset as out of range and reset
the offset based on the offset reset policy in the consumer. This seems
harder to do with a global metadata version.

Jun



On Mon, Dec 25, 2017 at 6:56 AM, Dong Lin  wrote:

> Hey Jun,
>
> This is a very good example. After thinking through this in detail, I agree
> that we need to commit offset with leader epoch in order to address this
> example.
>
> I think the remaining question is how to address the scenario that the
> topic is deleted and re-created. One possible solution is to commit offset
> with both the leader epoch and the metadata version. The logic and the
> implementation of this solution does not require a new concept (e.g.
> partition epoch) and it does not require any change to the message format
> or leader epoch. It also allows us to order the metadata in a
> straightforward manner which may be useful in the future. So it may be a
> better solution than generating a random partition epoch every time we
> create a partition. Does this sound reasonable?
>
> Previously one concern with using the metadata version is that consumer
> will be forced to refresh metadata even if metadata version is increased
> due to topics that the consumer is not interested in. Now I realized that
> this is probably not a problem. Currently client will refresh metadata
> either due to InvalidMetadataException in the response from broker or due
> to metadata expiry. The addition of the metadata version should increase
> the overhead of metadata refresh caused by InvalidMetadataException. If
> client refresh metadata due to expiry and it receives a metadata whose
> version is lower than the current metadata version, we can reject the
> metadata but still reset the metadata age, which essentially keep the
> existing behavior in the client.
>
> Thanks much,
> Dong
>


Re: Kafka Consumer manual partition assignment

2018-01-02 Thread Sagar
Hi,

Any help here would be highly appreciated :)

Sagar.

On Wed, 27 Dec 2017 at 07:53, Sagar  wrote:

> We have a use case where in we want to assign partitions manually for a
> set of topics to allow fine grained control of the records we are fetching.
>
> Basically what we are trying to achieve is that a group of messages which
> logically belong to a particular entity should be sent to the same
> partition . So, Topic A and topic B both have a certain field which are
> unique and common( say some id = 101) so we want to send both of those
> records to partition 1 ( 101 % 10 is our simple partition assignment
> strategy written in both producer and Kafka Connect interceptor.
>
> Fro what I understood, if I want to have a consumer which listens to
> partition 1 for both Topic A and Topic B, then we need to use the assign
> method.
>
> I have been reading up a bit and what has been mentioned everywhere is
> that we won't have any rebalancing triggered. Also, I tried a simple use
> case where in I didn't poll for more than the value configured in
> group.max.session.timeout.ms but it didn't die. Is it because its not
> part of a consumer group per se?
>
> So just wanted to understand what points should we declare a consumer is
> dead so that we can spin up a new consumer for the same topic partition. We
> are using AWS ECS for running our consumers so target group would spin up a
> new consumer based upon health checks.
>
> Any examples + guidelines around this would be highly appreciated.
>
> Thanks!
> Sagar.
>


Build failed in Jenkins: kafka-trunk-jdk8 #2306

2018-01-02 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] KAFKA-6391 ensure topics are created with correct partitions BEFORE

[wangguoz] KAFKA-6318: StreamsResetter should return non-zero return code on 
error

[wangguoz] MINOR: refactored code duplicates in several files (Streams project)

[wangguoz] Revert "KAFKA-6383: complete shutdown for CREATED StreamThreads 
(#4343)"

--
[...truncated 1.82 MB...]

org.apache.kafka.streams.kstream.internals.KTableImplTest > 
shouldNotAllowNullStoreSupplierInLeftJoin STARTED

org.apache.kafka.streams.kstream.internals.KTableImplTest > 
shouldNotAllowNullStoreSupplierInLeftJoin PASSED

org.apache.kafka.streams.kstream.internals.KTableImplTest > 
shouldNotAllowNullActionOnForEach STARTED

org.apache.kafka.streams.kstream.internals.KTableImplTest > 
shouldNotAllowNullActionOnForEach PASSED

org.apache.kafka.streams.kstream.internals.KTableImplTest > 
shouldThrowNullPointerOnFilterWhenMaterializedIsNull STARTED

org.apache.kafka.streams.kstream.internals.KTableImplTest > 
shouldThrowNullPointerOnFilterWhenMaterializedIsNull PASSED

org.apache.kafka.streams.kstream.internals.KTableImplTest > testKTable STARTED

org.apache.kafka.streams.kstream.internals.KTableImplTest > testKTable PASSED

org.apache.kafka.streams.kstream.internals.KTableImplTest > 
shouldNotAllowNullPredicateOnFilterNot STARTED

org.apache.kafka.streams.kstream.internals.KTableImplTest > 
shouldNotAllowNullPredicateOnFilterNot PASSED

org.apache.kafka.streams.kstream.internals.KTableImplTest > 
shouldAllowNullStoreInThrough STARTED

org.apache.kafka.streams.kstream.internals.KTableImplTest > 
shouldAllowNullStoreInThrough PASSED

org.apache.kafka.streams.kstream.internals.KTableImplTest > testValueGetter 
STARTED

org.apache.kafka.streams.kstream.internals.KTableImplTest > testValueGetter 
PASSED

org.apache.kafka.streams.kstream.internals.KTableImplTest > 
shouldNotAllowNullFilePathOnWriteAsText STARTED

org.apache.kafka.streams.kstream.internals.KTableImplTest > 
shouldNotAllowNullFilePathOnWriteAsText PASSED

org.apache.kafka.streams.kstream.internals.KTableImplTest > 
shouldAllowNullTopicInThrough STARTED

org.apache.kafka.streams.kstream.internals.KTableImplTest > 
shouldAllowNullTopicInThrough PASSED

org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregateProcessorTest
 > shouldRemoveMergedSessionsFromStateStore STARTED

org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregateProcessorTest
 > shouldRemoveMergedSessionsFromStateStore PASSED

org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregateProcessorTest
 > shouldMergeSessions STARTED

org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregateProcessorTest
 > shouldMergeSessions PASSED

org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregateProcessorTest
 > shouldHandleMultipleSessionsAndMerging STARTED

org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregateProcessorTest
 > shouldHandleMultipleSessionsAndMerging PASSED

org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregateProcessorTest
 > shouldImmediatelyForwardNewSessionWhenNonCachedStore STARTED

org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregateProcessorTest
 > shouldImmediatelyForwardNewSessionWhenNonCachedStore PASSED

org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregateProcessorTest
 > shouldGetAggregatedValuesFromValueGetter STARTED

org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregateProcessorTest
 > shouldGetAggregatedValuesFromValueGetter PASSED

org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregateProcessorTest
 > shouldImmediatelyForwardRemovedSessionsWhenMerging STARTED

org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregateProcessorTest
 > shouldImmediatelyForwardRemovedSessionsWhenMerging PASSED

org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregateProcessorTest
 > shouldUpdateSessionIfTheSameTime STARTED

org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregateProcessorTest
 > shouldUpdateSessionIfTheSameTime PASSED

org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregateProcessorTest
 > shouldHaveMultipleSessionsForSameIdWhenTimestampApartBySessionGap STARTED

org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregateProcessorTest
 > shouldHaveMultipleSessionsForSameIdWhenTimestampApartBySessionGap PASSED

org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregateProcessorTest
 > shouldCreateSingleSessionWhenWithinGap STARTED

org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregateProcessorTest
 > shouldCreateSingleSessionWhenWithinGap PASSED

org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest > 
shouldAddRegexTopicToLatestAutoOffsetResetList STARTED


Build failed in Jenkins: kafka-trunk-jdk8 #2305

2018-01-02 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] KAFKA-6383: complete shutdown for CREATED StreamThreads (#4343)

[wangguoz] KAFKA-5368: Add test for skipped-records metric (#4365)

[wangguoz] KAFKA-6269: KTable restore fails after rebalance (#4300)

[wangguoz] Fix wrong property mentioned in doc; Author: Praveen K Palaniswamy

--
[...truncated 126.85 KB...]
kafka.zk.KafkaZkClientTest > testGetConsumerOffsetNoData PASSED

kafka.zk.KafkaZkClientTest > testDeleteTopicPathMethods STARTED

kafka.zk.KafkaZkClientTest > testDeleteTopicPathMethods PASSED

kafka.zk.KafkaZkClientTest > testAclManagementMethods STARTED

kafka.zk.KafkaZkClientTest > testAclManagementMethods PASSED

kafka.zk.KafkaZkClientTest > testPreferredReplicaElectionMethods STARTED

kafka.zk.KafkaZkClientTest > testPreferredReplicaElectionMethods PASSED

kafka.zk.KafkaZkClientTest > testPropagateLogDir STARTED

kafka.zk.KafkaZkClientTest > testPropagateLogDir PASSED

kafka.zk.KafkaZkClientTest > testGetDataAndStat STARTED

kafka.zk.KafkaZkClientTest > testGetDataAndStat PASSED

kafka.zk.KafkaZkClientTest > testCreateTopLevelPaths STARTED

kafka.zk.KafkaZkClientTest > testCreateTopLevelPaths PASSED

kafka.zk.KafkaZkClientTest > testBrokerSequenceIdMethods STARTED

kafka.zk.KafkaZkClientTest > testBrokerSequenceIdMethods PASSED

kafka.zk.KafkaZkClientTest > testCreateSequentialPersistentPath STARTED

kafka.zk.KafkaZkClientTest > testCreateSequentialPersistentPath PASSED

kafka.zk.KafkaZkClientTest > testConditionalUpdatePath STARTED

kafka.zk.KafkaZkClientTest > testConditionalUpdatePath PASSED

kafka.zk.KafkaZkClientTest > testTopicAssignmentMethods STARTED

kafka.zk.KafkaZkClientTest > testTopicAssignmentMethods PASSED

kafka.zk.KafkaZkClientTest > testPropagateIsrChanges STARTED

kafka.zk.KafkaZkClientTest > testPropagateIsrChanges PASSED

kafka.zk.KafkaZkClientTest > testDeleteRecursive STARTED

kafka.zk.KafkaZkClientTest > testDeleteRecursive PASSED

kafka.zk.ZKPathTest > testCreatePersistentSequentialThrowsException STARTED

kafka.zk.ZKPathTest > testCreatePersistentSequentialThrowsException PASSED

kafka.zk.ZKPathTest > testCreatePersistentSequentialExists STARTED

kafka.zk.ZKPathTest > testCreatePersistentSequentialExists PASSED

kafka.zk.ZKPathTest > testCreateEphemeralPathExists STARTED

kafka.zk.ZKPathTest > testCreateEphemeralPathExists PASSED

kafka.zk.ZKPathTest > testCreatePersistentPath STARTED

kafka.zk.ZKPathTest > testCreatePersistentPath PASSED

kafka.zk.ZKPathTest > testMakeSurePersistsPathExistsThrowsException STARTED

kafka.zk.ZKPathTest > testMakeSurePersistsPathExistsThrowsException PASSED

kafka.zk.ZKPathTest > testCreateEphemeralPathThrowsException STARTED

kafka.zk.ZKPathTest > testCreateEphemeralPathThrowsException PASSED

kafka.zk.ZKPathTest > testCreatePersistentPathThrowsException STARTED

kafka.zk.ZKPathTest > testCreatePersistentPathThrowsException PASSED

kafka.zk.ZKPathTest > testMakeSurePersistsPathExists STARTED

kafka.zk.ZKPathTest > testMakeSurePersistsPathExists PASSED

kafka.server.LogOffsetTest > testFetchOffsetsBeforeWithChangingSegmentSize 
STARTED

kafka.server.LogOffsetTest > testFetchOffsetsBeforeWithChangingSegmentSize 
PASSED

kafka.server.LogOffsetTest > testGetOffsetsBeforeEarliestTime STARTED

kafka.server.LogOffsetTest > testGetOffsetsBeforeEarliestTime PASSED

kafka.server.LogOffsetTest > testGetOffsetsForUnknownTopic STARTED

kafka.server.LogOffsetTest > testGetOffsetsForUnknownTopic PASSED

kafka.server.LogOffsetTest > testEmptyLogsGetOffsets STARTED

kafka.server.LogOffsetTest > testEmptyLogsGetOffsets PASSED

kafka.server.LogOffsetTest > testFetchOffsetsBeforeWithChangingSegments STARTED

kafka.server.LogOffsetTest > testFetchOffsetsBeforeWithChangingSegments PASSED

kafka.server.LogOffsetTest > testGetOffsetsBeforeLatestTime STARTED

kafka.server.LogOffsetTest > testGetOffsetsBeforeLatestTime PASSED

kafka.server.LogOffsetTest > testGetOffsetsBeforeNow STARTED
ERROR: No tool found matching GRADLE_3_4_RC_2_HOME
ERROR: Could not install GRADLE_3_5_HOME
java.lang.NullPointerException
at 
hudson.plugins.toolenv.ToolEnvBuildWrapper$1.buildEnvVars(ToolEnvBuildWrapper.java:46)
at hudson.model.AbstractBuild.getEnvironment(AbstractBuild.java:895)
at hudson.plugins.git.GitSCM.getParamExpandedRepos(GitSCM.java:421)
at 
hudson.plugins.git.GitSCM.compareRemoteRevisionWithImpl(GitSCM.java:629)
at hudson.plugins.git.GitSCM.compareRemoteRevisionWith(GitSCM.java:594)
at hudson.scm.SCM.compareRemoteRevisionWith(SCM.java:391)
at hudson.scm.SCM.poll(SCM.java:408)
at hudson.model.AbstractProject._poll(AbstractProject.java:1384)
at hudson.model.AbstractProject.poll(AbstractProject.java:1287)
at hudson.triggers.SCMTrigger$Runner.runPolling(SCMTrigger.java:594)
at hudson.triggers.SCMTrigger$Runner.run(SCMTrigger.java:640)
   

Re: [VOTE] KIP-226 - Dynamic Broker Configuration

2018-01-02 Thread Jun Rao
Hi, Rajini,

Thank for the KIP. +1. Just a couple of minor comments below.


50. config.secret.*: Could you document how the encryption/decryption of
passwd work? In particular, how do we support changing config.secret?

51. At the topic level, we also have leader.replication.throttled.replicas
and follower.replication.throttled.replicas. Should they be dynamically
configurable?

Jun






On Tue, Dec 12, 2017 at 9:24 AM, Gwen Shapira  wrote:

> +1 (binding). Thank you for leading this, Rajini.
>
> On Tue, Dec 12, 2017 at 8:35 AM Tom Bentley  wrote:
>
> > +1 (nonbinding)
> >
> > On 12 December 2017 at 15:34, Ted Yu  wrote:
> >
> > > +1
> > >
> > > On Tue, Dec 12, 2017 at 5:44 AM, Rajini Sivaram <
> rajinisiva...@gmail.com
> > >
> > > wrote:
> > >
> > > > Since there are no more outstanding comments, I would like to start
> > vote
> > > > for KIP-226:
> > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 226+-+Dynamic+Broker+Configuration
> > > >
> > > >
> > > > The KIP enables dynamic update of commonly updated broker
> configuration
> > > > options to avoid expensive restarts.
> > > >
> > > > Thank you,
> > > >
> > > > Rajini
> > > >
> > >
> >
>


[jira] [Reopened] (KAFKA-6383) StreamThread.shutdown doesn't clean up completely when called before StreamThread.start

2018-01-02 Thread Rohan Desai (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rohan Desai reopened KAFKA-6383:


I missed a race condition in my fix. If we start and then shutdown a streams 
thread without the streams thread running in between, then shutdown() throws an 
IllegalThreadStateException. This happens because shutdown() uses 
StreamThread.state to decide whether to call start(), and the state is 
transitioned from run which may not have executed yet.

> StreamThread.shutdown doesn't clean up completely when called before 
> StreamThread.start
> ---
>
> Key: KAFKA-6383
> URL: https://issues.apache.org/jira/browse/KAFKA-6383
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Rohan Desai
>Assignee: Rohan Desai
> Fix For: 1.1.0
>
>
> The following code leaks a producer network thread:
> {code}
> ks = new KafkaStreams(...);
> ks.close();
> {code}
> The underlying issue is that KafkaStreams creates a bunch of StreamsThreads 
> via StreamThread.create, which in turn creates a bunch of stuff (including a 
> producer). These resources are cleaned up only when the thread exits. So if 
> the thread was never started, then they are never cleaned up. 
> StreamThread.shutdown should clean up if it sees that the thread has never 
> been started.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [DISCUSS] KIP 145 - Expose Record Headers in Kafka Connect

2018-01-02 Thread Ewen Cheslack-Postava
A few thoughts, mostly just details:

* Is the SchemaAndValue return type from Headers necessary? We needed to
use that in Converters, but otherwise I believe no public API has to use
that type. If possible I think it is better to avoid making Connector
developers aware of that type.
* For SimpleHeaderConverter, if we encounter a bare byte array (i.e. not
within some other structure), should it just get written directly similar
to how strings are handled? I guess the problem with this is that you then
either don't know how to decode or might get a different type (e.g. if the
bytes were utf-8 compatible, they'd parse as a string). But I'm not sure
many people will expect the current format.
* Also, did you mean utf-8 there or something like base64? utf-8 might not
handle all byte arrays.
* Header.with and Header.rename don't seem like they're going to be
particularly useful or common, what's the expected use case for these?
We're getting a lot of new API surface area here, so I think it'd be good
to try to keep it to the necessities and most valuable extras.
* Header.valueAsType seems like it doesn't need to be exposed publicly
* How much of the conversion stuff should be in the Header class vs as
generic utilities available in another class. Having them in the Header API
makes it obvious they are available and headers seem like they may be the
most common use case. But none of that functionality is really specific to
headers and seems like it could be useful in writing connectors that
robustly handle different formats (e.g. keys might be a good example of
something you want to preserve, but a connector could easily encounter int,
long, and string keys under very common circumstances).
* In the Headers class, why deviate from the naming used in the core
Headers class? Specifically, at least allWithName and lastWithName are
different.
* Headers.apply - this seems like a departure from other APIs that don't
have map-like functionality. Is the reason for this to avoid allocating a
new Headers object or do you get a new one out? I think we need to take
care when doing this since if we mutate the existing Headers object, then
connectors that may allocate a single Headers object and use it repeatedly
would see it changing out from under them (and something like
prefixing/suffixing a header as part of a transform could result in it
being done repeatedly). If it allocates a new Headers object anyway, I'm
not sure I see that much value in the method.
* HeaderTo - I think you need to update the remove.headers config to be
operation
* AddHeader has some inconsistent InsertHeader terminology. InsertHeader is
more consistent with the existing InsertField, but either one works.
Alternatively, the naming isn't great but you could also structure adding a
header as an InsertField$Header transformation.
* We have ReplaceField for changing or dropping fields. Would the same
approach/naming make sense instead of DropHeader?

In general I think this is the right direction for making headers work both
flexibly but also easily in the default case.

-Ewen


On Tue, Jan 2, 2018 at 8:42 AM, Gwen Shapira  wrote:

> I got the impression that use of Strings in headers is really common, so
> the SimpleHeaderConverter makes a lot of sense to me. Agree that this
> introduces overhead, but perhaps simply documenting an easy "optimization"
> will be enough to help those who are concerned about it? Since the
> connector-devs decide whether they'll use the header data or not, they can
> override the converter as needed.
>
> Gwen
>
>
> On Tue, Jan 2, 2018 at 3:52 PM Randall Hauch  wrote:
>
> > There's been a bit of discussion on the PR about the choice of the
> default
> > header converter. The proposal currently uses the new
> > `SimpleHeaderConverter` so that by default connector devs and users get
> > meaningful header values by default without much work. An alternative is
> to
> > default to `ByteArrayConverter` so that by default the framework doesn't
> > have to do much effort if headers aren't used/needed.
> >
> > Thoughts?
> >
> > On Tue, Dec 26, 2017 at 11:47 AM, Randall Hauch 
> wrote:
> >
> > > Does anyone have any thoughts about this proposal for Connect header
> > > support?
> > >
> > > On Thu, Dec 21, 2017 at 4:14 PM, Randall Hauch 
> wrote:
> > >
> > >> All,
> > >>
> > >> I've updated KIP-145 to reflect my proposal. The proposal addresses
> SMTs
> > >> and a different HeaderConverter default, but I'll be updating my PR (
> > >> https://github.com/apache/kafka/pull/4319) soon. Feedback is very
> > >> welcome!
> > >>
> > >> Best regards,
> > >>
> > >> Randall
> > >>
> > >> On Thu, Dec 14, 2017 at 10:20 AM, Randall Hauch 
> > wrote:
> > >>
> > >>> Hi, Michael. Yeah, I liked your PR a lot, and there definitely are a
> > lot
> > >>> of similarities. But here are the more significant differences from
> my
> > >>> perspective (none of which are 

Re: Pull Request emails are not being sent to the mailing list

2018-01-02 Thread Ted Yu
pull request emails, at least the initial one, are recorded on the
underlying JIRA.

For the time being, you can get to pull request(s) of interest thru JIRA.

FYI

On Tue, Jan 2, 2018 at 1:30 PM, James Cheng  wrote:

> Since Dec 21st, it seems that email notifications of Github pull requests
> are not being sent to the dev mailing list. Is this a bug, or have PR
> emails moved to somewhere else? If it’s a bug, can we look into this?
>
> The last email PR that was sent was this one: http://mail-archives.apache.
> org/mod_mbox/kafka-dev/201712.mbox/%3cgit-pr-4351-ka...@git.apache.org%3e
>  mbox/%3cgit-pr-4351-ka...@git.apache.org%3E>
>
> And there have been a bunch of PR’s created since then, and their emails
> don’t appear to be in the mailing list. https://github.com/apache/
> kafka/pulls 
>
> Is this related to the Gitbox transition? It looks like the the Gitbox
> transition happened just 3 hours after that last PR email.
> http://mail-archives.apache.org/mod_mbox/kafka-dev/201712.mbox/%
> 3cCAD5tkZZqHcM009kEPx7ccSy=c_bnaol4mok1pgsesei-vnh...@mail.gmail.com%3e <
> http://mail-archives.apache.org/mod_mbox/kafka-dev/201712.mbox/%
> 3CCAD5tkZZqHcM009kEPx7ccSy=c_bnaol4mok1pgsesei-vnh...@mail.gmail.com%3E>
>
> Thanks,
> -James
>
>


Pull Request emails are not being sent to the mailing list

2018-01-02 Thread James Cheng
Since Dec 21st, it seems that email notifications of Github pull requests are 
not being sent to the dev mailing list. Is this a bug, or have PR emails moved 
to somewhere else? If it’s a bug, can we look into this?

The last email PR that was sent was this one: 
http://mail-archives.apache.org/mod_mbox/kafka-dev/201712.mbox/%3cgit-pr-4351-ka...@git.apache.org%3e
 


And there have been a bunch of PR’s created since then, and their emails don’t 
appear to be in the mailing list. https://github.com/apache/kafka/pulls 


Is this related to the Gitbox transition? It looks like the the Gitbox 
transition happened just 3 hours after that last PR email. 
http://mail-archives.apache.org/mod_mbox/kafka-dev/201712.mbox/%3cCAD5tkZZqHcM009kEPx7ccSy=c_bnaol4mok1pgsesei-vnh...@mail.gmail.com%3e
 


Thanks,
-James



[jira] [Created] (KAFKA-6418) adminclient throws timeoutexception when there is a SchemaException

2018-01-02 Thread dan norwood (JIRA)
dan norwood created KAFKA-6418:
--

 Summary: adminclient throws timeoutexception when there is a 
SchemaException
 Key: KAFKA-6418
 URL: https://issues.apache.org/jira/browse/KAFKA-6418
 Project: Kafka
  Issue Type: Bug
Affects Versions: 1.0.0
Reporter: dan norwood


if you try to `createTopics(Collections.singleton(new NewTopic()));` you will 
get something like the following:
{noformat}
[2018-01-02 11:20:46,481] ERROR [kafka-admin-client-thread | adminclient-3] 
Uncaught exception in thread 'kafka-admin-client-thread | adminclient-3': 
(org.apache.kafka.common.utils.KafkaThread)
org.apache.kafka.common.protocol.types.SchemaException: Error computing size 
for field 'create_topic_requests': Error computing size for field 'topic': 
Missing value for field 'topic' which has no default value.
at org.apache.kafka.common.protocol.types.Schema.sizeOf(Schema.java:94)
at org.apache.kafka.common.protocol.types.Struct.sizeOf(Struct.java:341)
at 
org.apache.kafka.common.requests.AbstractRequestResponse.serialize(AbstractRequestResponse.java:28)
at 
org.apache.kafka.common.requests.AbstractRequest.serialize(AbstractRequest.java:98)
at 
org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:91)
at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:423)
at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:397)
at org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:358)
at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.sendEligibleCalls(KafkaAdminClient.java:810)
at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1002)
at java.lang.Thread.run(Thread.java:745)
[2018-01-02 11:20:46,481] ERROR [kafka-admin-client-thread | adminclient-3] 
Uncaught exception in thread 'kafka-admin-client-thread | adminclient-3': 
(org.apache.kafka.common.utils.KafkaThread)
org.apache.kafka.common.protocol.types.SchemaException: Error computing size 
for field 'create_topic_requests': Error computing size for field 'topic': 
Missing value for field 'topic' which has no default value.
at org.apache.kafka.common.protocol.types.Schema.sizeOf(Schema.java:94)
at org.apache.kafka.common.protocol.types.Struct.sizeOf(Struct.java:341)
at 
org.apache.kafka.common.requests.AbstractRequestResponse.serialize(AbstractRequestResponse.java:28)
at 
org.apache.kafka.common.requests.AbstractRequest.serialize(AbstractRequest.java:98)
at 
org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:91)
at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:423)
at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:397)
at org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:358)
at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.sendEligibleCalls(KafkaAdminClient.java:810)
at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1002)
at java.lang.Thread.run(Thread.java:745)
[2018-01-02 11:21:01,383] ERROR [qtp1875757262-59] Unhandled exception 
resulting in internal server error response 
(io.confluent.rest.exceptions.GenericExceptionMapper)
java.util.concurrent.TimeoutException
at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:108)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:225)
at 
io.confluent.controlcenter.data.KafkaDao.createTopics(KafkaDao.java:85)
at 
io.confluent.controlcenter.rest.KafkaResource.createTopic(KafkaResource.java:87)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
{noformat}

the actual error prints immediately, but the adminclient still waits for a 
timeout and then exposes a TimeoutException to the user




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: Vote for KIP-245: Use Properties instead of StreamsConfig in KafkaStreams constructor

2018-01-02 Thread Guozhang Wang
Boyang,

Thanks for the proposed change, the wiki page lgtm. One minor comment
otherwise I'm +1:

For the new API, we now also have a constructor that accepts both a
clientSupplier and a Time, so we should consider having four overloads in
total:


// New API (using Properties)
public KafkaStreams(final Topology, final Properties props)
public KafkaStreams(final Topology, final Properties props, final Time time)
public KafkaStreams(final Topology, final Properties props, final
KafkaClientSupplier
clientSupplier)
public KafkaStreams(final Topology, final Properties props, final
KafkaClientSupplier
clientSupplier, final Time time)

Guozhang

On Tue, Dec 26, 2017 at 7:26 PM, Satish Duggana 
wrote:

> Thanks for the KIP, +1 from me.
>
> On Wed, Dec 27, 2017 at 7:42 AM, Bill Bejeck  wrote:
>
> > Thanks for the KIP.  +1 for me.
> >
> > On Tue, Dec 26, 2017 at 6:22 PM Ted Yu  wrote:
> >
> > > +1 from me as well.
> > >
> > > On Tue, Dec 26, 2017 at 10:41 AM, Matthias J. Sax <
> matth...@confluent.io
> > >
> > > wrote:
> > >
> > > > Thanks for the KIP Boyang!
> > > >
> > > > I don't have any further comments.
> > > >
> > > > +1 from me.
> > > >
> > > > @Ted: This is a rather simple KIP, thus, skipping the DISCUSS thread
> > > > seems ok to me.
> > > >
> > > >
> > > >
> > > > -Matthias
> > > >
> > > >
> > > > @Boyang: it's recommended to use this format for the subject
> > > >
> > > > "[VOTE] KIP-245: ..."
> > > >
> > > > Same for DISCUSS threads. People are used to those headlines and they
> > > > pay more attention than. For this KIP, just leave it as it though.
> For
> > > > future reference only
> > > > .
> > > >
> > > >
> > > > On 12/26/17 4:55 AM, Ted Yu wrote:
> > > > > Normally a DISCUSS thread precedes VOTE thread so that people have
> > > ample
> > > > time examining the proposal.
> > > > >  Original message From: Boyang Chen <
> > > bche...@outlook.com>
> > > > Date: 12/26/17  1:22 AM  (GMT-07:00) To: dev@kafka.apache.org
> Subject:
> > > > Vote for KIP-245: Use Properties instead of StreamsConfig in
> > KafkaStreams
> > > > constructor
> > > > > Hi there,
> > > > >
> > > > > I'm Boyang who is a newbie contributor to Kafka. I would like to
> > start
> > > a
> > > > vote for the KIP-245:
> > > > >
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > >
> > > 245%3A+Use+Properties+instead+of+StreamsConfig+in+
> > KafkaStreams+constructor
> > > > >
> > > > >
> > > > > This is linked with JIRA: https://issues.apache.org/
> > > > jira/browse/KAFKA-6386
> > > > >
> > > > > [KAFKA-6386] Deprecate KafkaStreams constructor taking ...<
> > > > https://issues.apache.org/jira/browse/KAFKA-6386>
> > > > > issues.apache.org
> > > > > Currently, KafkaStreams constructor has overloads that take either
> > > > Properties or StreamsConfig a parameters. Because StreamsConfig is
> > > > immutable and is created from a ...
> > > > >
> > > > > And my pull request is here:
> > > > >
> > > > >
> > > > > https://github.com/apache/kafka/pull/4354
> > > > >
> > > > >
> > > > > Since this is my first time doing this, feel free to let me know if
> > > this
> > > > is the correct format!
> > > > >
> > > > >
> > > > > Best,
> > > > >
> > > > > Boyang
> > > > >
> > > >
> > > >
> > >
> >
>



-- 
-- Guozhang


Jenkins build is back to normal : kafka-trunk-jdk7 #3069

2018-01-02 Thread Apache Jenkins Server
See 




[jira] [Resolved] (KAFKA-6269) KTable state restore fails after rebalance

2018-01-02 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-6269.
--
Resolution: Fixed

> KTable state restore fails after rebalance
> --
>
> Key: KAFKA-6269
> URL: https://issues.apache.org/jira/browse/KAFKA-6269
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Andreas Schroeder
>Assignee: Bill Bejeck
>Priority: Blocker
> Fix For: 1.1.0, 1.0.1
>
>
> I have the following kafka streams topology:
> entity-B -> map step -> entity-B-exists (with state store)
> entity-A   -> map step -> entity-A-exists (with state store)
> (entity-B-exists, entity-A-exists) -> outer join with state store.
> The topology building code looks like this (some data type, serde, 
> valuemapper, and joiner code omitted):
> {code}
> def buildTable[V](builder: StreamsBuilder,
>   sourceTopic: String,
>   existsTopic: String,
>   valueSerde: Serde[V],
>   valueMapper: ValueMapper[String, V]): 
> KTable[String, V] = {
>   val stream: KStream[String, String] = builder.stream[String, 
> String](sourceTopic)
>   val transformed: KStream[String, V] = stream.mapValues(valueMapper)
>   transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde))
>   val inMemoryStoreName = s"$existsTopic-persisted"
>   val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(valueSerde)
>   .withLoggingDisabled()
>   builder.table(existsTopic, materialized)
> }
> val builder = new StreamsBuilder
> val mapToEmptyString: ValueMapper[String, String] = (value: String) => if 
> (value != null) "" else null
> val entitiesB: KTable[String, EntityBInfo] =
>   buildTable(builder,
>  "entity-B",
>  "entity-B-exists",
>  EntityBInfoSerde,
>  ListingImagesToEntityBInfo)
> val entitiesA: KTable[String, String] =
>   buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), 
> mapToEmptyString)
> val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => 
> EntityDiff.fromJoin(a, b)
> val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B"))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(EntityDiffSerde)
>   .withLoggingEnabled(new java.util.HashMap[String, String]())
> val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, 
> joiner, materialized)
> {code}
> We run 4 processor machines with 30 stream threads each; each topic has 30 
> partitions so that there is a total of 4 x 30 = 120 partitions to consume. 
> The initial launch of the processor works fine, but when killing one 
> processor and letting him re-join the stream threads leads to some faulty 
> behaviour.
> Fist, the total number of assigned partitions over all processor machines is 
> larger than 120 (sometimes 157, sometimes just 132), so the partition / task 
> assignment seems to assign the same job to different stream threads.
> The processor machines trying to re-join the consumer group fail constantly 
> with the error message of 'Detected a task that got migrated to another 
> thread.' We gave the processor half an hour to recover; usually, rebuilding 
> the KTable states take around 20 seconds (with Kafka 0.11.0.1).
> Here are the details of the errors we see:
> stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got 
> migrated to another thread. This implies that this thread missed a rebalance 
> and dropped out of the consumer group. Trying to rejoin the consumer group 
> now.
> {code}
> org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of 
> entity-B-exists-0 should not change while restoring: old end offset 4750539, 
> current offset 4751388
> > StreamsTask taskId: 1_0
> > >   ProcessorTopology:
> > KSTREAM-SOURCE-08:
> > topics: [entity-A-exists]
> > children:   [KTABLE-SOURCE-09]
> > KTABLE-SOURCE-09:
> > states: [entity-A-exists-persisted]
> > children:   [KTABLE-JOINTHIS-11]
> > KTABLE-JOINTHIS-11:
> > states: [entity-B-exists-persisted]
> > children:   [KTABLE-MERGE-10]
> > KTABLE-MERGE-10:
> > states: [entity-A-joined-with-entity-B]
> > KSTREAM-SOURCE-03:
> > topics: [entity-B-exists]
> > children:   [KTABLE-SOURCE-04]
> >   

[jira] [Resolved] (KAFKA-6383) StreamThread.shutdown doesn't clean up completely when called before StreamThread.start

2018-01-02 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-6383.
--
   Resolution: Fixed
Fix Version/s: 1.1.0

> StreamThread.shutdown doesn't clean up completely when called before 
> StreamThread.start
> ---
>
> Key: KAFKA-6383
> URL: https://issues.apache.org/jira/browse/KAFKA-6383
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Rohan Desai
>Assignee: Rohan Desai
> Fix For: 1.1.0
>
>
> The following code leaks a producer network thread:
> {code}
> ks = new KafkaStreams(...);
> ks.close();
> {code}
> The underlying issue is that KafkaStreams creates a bunch of StreamsThreads 
> via StreamThread.create, which in turn creates a bunch of stuff (including a 
> producer). These resources are cleaned up only when the thread exits. So if 
> the thread was never started, then they are never cleaned up. 
> StreamThread.shutdown should clean up if it sees that the thread has never 
> been started.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6417) plugin.path pointing at a plugin directory causes ClassNotFoundException

2018-01-02 Thread Dustin Cote (JIRA)
Dustin Cote created KAFKA-6417:
--

 Summary: plugin.path pointing at a plugin directory causes 
ClassNotFoundException
 Key: KAFKA-6417
 URL: https://issues.apache.org/jira/browse/KAFKA-6417
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 1.0.0
Reporter: Dustin Cote


When using the {{plugin.path}} configuration for the Connect workers, the user 
is expected to specify a list containing the following per the docs:

{quote}
The list should consist of top level directories that include any combination 
of: a) directories immediately containing jars with plugins and their 
dependencies b) uber-jars with plugins and their dependencies c) directories 
immediately containing the package directory structure of classes of plugins 
and their dependencies 
{quote}

This means we would expect {{plugin.path=/usr/share/plugins}} for a structure 
like {{/usr/share/plugins/myplugin1}},{{/usr/share/plugins/myplugin2}}, etc. 
However if you specify {{plugin.path=/usr/share/plugins/myplugin1}} the 
resulting behavior is that dependencies for {{myplugin1}} are not properly 
loaded. This causes a {{ClassNotFoundException}} that is not intuitive to 
debug. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [VOTE] KIP-233: Simplify StreamsBuilder#addGlobalStore

2018-01-02 Thread Guozhang Wang
On a side note, could you update the "compatibility and upgrade" section,
that when users start to make code changes to leverage the new API, what
kind of upgrade executions they'd need to do? I feel they need to rename
topics / etc.

On Tue, Jan 2, 2018 at 9:10 AM, Guozhang Wang  wrote:

> +1, thanks!
>
> On Wed, Dec 27, 2017 at 6:01 PM, Ted Yu  wrote:
>
>> +1
>>
>> On Wed, Dec 27, 2017 at 12:15 PM, Bill Bejeck  wrote:
>>
>> > +1
>> >
>> > On Wed, Dec 27, 2017 at 3:07 PM, Matthias J. Sax > >
>> > wrote:
>> >
>> > > +1
>> > >
>> > > On 12/26/17 9:00 PM, Panuwat Anawatmongkhon wrote:
>> > > > Hi all,
>> > > > I would like to start the vote thread.
>> > > > This is link for the kip.
>> > > >
>> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-233%
>> 3A+Simplify+
>> > > StreamsBuilder%23addGlobalStore
>> > > >
>> > > > Cheers
>> > > >
>> > >
>> > >
>> >
>>
>
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang


Re: [VOTE] KIP-233: Simplify StreamsBuilder#addGlobalStore

2018-01-02 Thread Guozhang Wang
+1, thanks!

On Wed, Dec 27, 2017 at 6:01 PM, Ted Yu  wrote:

> +1
>
> On Wed, Dec 27, 2017 at 12:15 PM, Bill Bejeck  wrote:
>
> > +1
> >
> > On Wed, Dec 27, 2017 at 3:07 PM, Matthias J. Sax 
> > wrote:
> >
> > > +1
> > >
> > > On 12/26/17 9:00 PM, Panuwat Anawatmongkhon wrote:
> > > > Hi all,
> > > > I would like to start the vote thread.
> > > > This is link for the kip.
> > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 233%3A+Simplify+
> > > StreamsBuilder%23addGlobalStore
> > > >
> > > > Cheers
> > > >
> > >
> > >
> >
>



-- 
-- Guozhang


[jira] [Created] (KAFKA-6416) Create an official Kafka Helm chart for running a Kafka cluster

2018-01-02 Thread Matthew T. Adams (JIRA)
Matthew T. Adams created KAFKA-6416:
---

 Summary: Create an official Kafka Helm chart for running a Kafka 
cluster
 Key: KAFKA-6416
 URL: https://issues.apache.org/jira/browse/KAFKA-6416
 Project: Kafka
  Issue Type: Wish
Affects Versions: 1.0.0
Reporter: Matthew T. Adams


This issue requests that the Apache Kafka team release a [Helm|https://helm.sh] 
chart for running Kafka as a cluster.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [DISCUSS] KIP-222 - Add "describe consumer group" to KafkaAdminClient

2018-01-02 Thread Gwen Shapira
listGroups and listGroupOffsets will make it a snap to transition the
existing ConsumerGroups CLI to depend on client libraries only.

Thanks for adding them :)

On Sun, Dec 31, 2017 at 1:39 PM Jorge Esteban Quilcate Otoya <
quilcate.jo...@gmail.com> wrote:

> Thanks all for your feedback, and sorry for late response.
>
> I'm considering the following:
>
> ```AdminClient.java
> public abstract ListGroupsResult listGroups(ListGroupsOptions options);
>
> public ListGroupsResult listGroups() {
> return listGroups(new ListGroupsOptions());
> }
>
> public ListGroupsResult listConsumerGroups(ListGroupsOptions options) {
> //filtering groups by ConsumerProtocol.PROTOCOL_TYPE
> }
>
> public ListGroupsResult listConsumerGroups() {
> return listConsumerGroups(new ListGroupsOptions());
> }
> ```
>
> About `describeConsumerGroups`, I'm considering renaming to
> `describeGroups` and rename `ConsumerGroupDescription` and
> `ConsumerDescription` to `GroupDescription` to `MemberDescription`.
> Not sure we need a deserializer, we can access `DescribeGroupsResponse`
> members directly.
>
> As @dan says, I also think `listGroupOffsets` could be added to this KIP to
> make it complete.
>
> I'm thinking about renaming this KIP to "Add Consumer Group operations to
> Admin API".
>
> I'm updating the KIP accordingly.
>
> Cheers and happy 2018!
>
> Jorge.
>
> El mié., 13 dic. 2017 a las 19:06, Colin McCabe ()
> escribió:
>
> > On Tue, Dec 12, 2017, at 09:39, Jason Gustafson wrote:
> > > Hi Colin,
> > >
> > > They do share the same namespace. We have a "protocol type" field in
> the
> > > JoinGroup request to make sure that all members are of the same kind.
> >
> > Hi Jason,
> >
> > Thanks.  That makes sense.
> >
> > > Very roughly what I was thinking is something like this. First we
> > introduce an
> > > interface for deserialization:
> > >
> > > interface GroupMetadataDeserializer {
> > >   String protocolType();
> > >   Metadata desrializeMetadata(ByteBuffer);
> > >   Assignment deserializeAssignment(ByteBuffer);
> > > }
> > >
> > > Then we add some kind of generic container:
> > >
> > > class MemberMetadata {
> > >   Metadata metadata;
> > >   Assignment assignment;
> > > }
> > >
> > > Then we have two APIs: one generic and one specific to consumer groups:
> > >
> > >  Map> describeGroup(String groupId,
> > > GroupMetadataDeserializer deserializer);
> > >
> > > Map describeConsumerGroup(String
> groupId);
> > >
> > > (This is just a sketch, so obviously we can change them to use futures
> or
> > > to batch or whatever.)
> > >
> > > I think it would be fine to not provide a connect-specific API since
> this
> > > usage will probably be limited to Connect itself.
> >
> > Yeah, it probably makes sense to have a separation between describeGroup
> > and describeConsumerGroup.
> >
> > We will have to be pretty careful with cross-version compatibility in
> > describeConsumerGroup.  It should be possible for an old client to talk
> > to a new broker, and a new client to talk to an old broker.  So we
> > should be prepared to read data in multiple formats.
> >
> > I'm not sure if we need to have a 'deserializer' argument to
> > describeGroup.  We can just let them access a byte array, right?
> > Theoretically they might also just want to check for the presence or
> > absence of a group, but not deserialize anything.
> >
> > best,
> > Colin
> >
> > >
> > > Thanks,
> > > Jason
> > >
> > >
> > > On Mon, Dec 11, 2017 at 9:15 PM, Colin McCabe 
> > wrote:
> > >
> > > > Sorry... this is probably a silly question, but do Kafka Connect
> groups
> > > > share a namespace with consumer groups?  If we had a separate API for
> > > > Kafka Connect groups vs. Consumer groups, would that make sense?  Or
> > > > should we unify them?
> > > >
> > > > best,
> > > > Colin
> > > >
> > > >
> > > > On Mon, Dec 11, 2017, at 16:11, Jason Gustafson wrote:
> > > > > Hi Jorge,
> > > > >
> > > > > Kafka group management is actually more general than consumer
> groups
> > > > > (e.g.
> > > > > there are kafka connect groups). If we are adding these APIs, I
> would
> > > > > suggest we consider the more general protocol and how to expose
> > > > > group-protocol-specific metadata. For example, it might be
> > reasonable to
> > > > > have both an API to access to the low-level bytes as well as some
> > > > > higher-level convenience APIs for accessing consumer groups.
> > > > >
> > > > > Thanks,
> > > > > Jason
> > > > >
> > > > > On Mon, Dec 4, 2017 at 4:07 PM, Matthias J. Sax <
> > matth...@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > Jorge,
> > > > > >
> > > > > > is there any update regarding this KIP?
> > > > > >
> > > > > >
> > > > > > -Matthias
> > > > > >
> > > > > > On 11/17/17 9:14 AM, Guozhang Wang wrote:
> > > > > > > Hello Jorge,
> > > > > > 

Re: [DISCUSS] KIP 145 - Expose Record Headers in Kafka Connect

2018-01-02 Thread Gwen Shapira
I got the impression that use of Strings in headers is really common, so
the SimpleHeaderConverter makes a lot of sense to me. Agree that this
introduces overhead, but perhaps simply documenting an easy "optimization"
will be enough to help those who are concerned about it? Since the
connector-devs decide whether they'll use the header data or not, they can
override the converter as needed.

Gwen


On Tue, Jan 2, 2018 at 3:52 PM Randall Hauch  wrote:

> There's been a bit of discussion on the PR about the choice of the default
> header converter. The proposal currently uses the new
> `SimpleHeaderConverter` so that by default connector devs and users get
> meaningful header values by default without much work. An alternative is to
> default to `ByteArrayConverter` so that by default the framework doesn't
> have to do much effort if headers aren't used/needed.
>
> Thoughts?
>
> On Tue, Dec 26, 2017 at 11:47 AM, Randall Hauch  wrote:
>
> > Does anyone have any thoughts about this proposal for Connect header
> > support?
> >
> > On Thu, Dec 21, 2017 at 4:14 PM, Randall Hauch  wrote:
> >
> >> All,
> >>
> >> I've updated KIP-145 to reflect my proposal. The proposal addresses SMTs
> >> and a different HeaderConverter default, but I'll be updating my PR (
> >> https://github.com/apache/kafka/pull/4319) soon. Feedback is very
> >> welcome!
> >>
> >> Best regards,
> >>
> >> Randall
> >>
> >> On Thu, Dec 14, 2017 at 10:20 AM, Randall Hauch 
> wrote:
> >>
> >>> Hi, Michael. Yeah, I liked your PR a lot, and there definitely are a
> lot
> >>> of similarities. But here are the more significant differences from my
> >>> perspective (none of which are really that big):
> >>>
> >>> First, your `SubjectConverter` and my `HeaderConverter` are pretty
> >>> similar -- mine is just more closely tied to headers. Also, we used
> >>> slightly different approaches to dealing with the fact that the
> `Converter`
> >>> interface does not extend `Configurable`, which Connect now uses for
> >>> transforms, connectors, etc. And our implementations take very
> different
> >>> approaches (see below).
> >>>
> >>> Second, I tried to follow Kafka client's `Header` and `Headers`
> >>> interfaces (at least in concept) so that ConnectRecord has a `Headers`
> >>> rather than a list of headers. It's a minor distinction, but I do think
> >>> it's important for future-proofing to have an interface for the
> collection
> >>> to abstract and encapsulate logic/behavior as well as leaving room for
> >>> alternative implementations. It also a convenient place to add methods
> for
> >>> source connectors and SMTs to easily add/modify/remove/transform
> headers.
> >>>
> >>> Third, our "header converter" implementations are where most of the
> >>> differences lie. Again, this goes back to my assertion that we should
> make
> >>> the serdes and cast/conversion orthogonal. If we allow sink connectors
> and
> >>> SMTs to get header values in the type they want (e.g.,
> >>> `Header.valueAsFloat()`), then we can tolerate a bit more variation in
> how
> >>> the header values are serialized and deserialized, since the serdes
> >>> mechanism doesn't have to get the type exactly right for the sink
> connector
> >>> and SMT. My `SimpleHeaderConverter` serializes all of the types to
> strings,
> >>> but during deserialization it attempts to infer the schemas (easy for
> >>> primitive values, a bit harder for structured types). IIUC, neither
> your
> >>> approach or mine is really able to maintain Struct schemas, but IMO we
> can
> >>> add that over time with improved/different header converters if people
> >>> really need it.
> >>>
> >>> Fourth, we use different defaults for the serdes implementation. I
> >>> dislike the StringConverter because it converts everything to strings
> that
> >>> are then difficult to convert back to the original form, especially
> for the
> >>> structured types. This is why I created the `SimpleHeaderConverter`
> >>> implementation, which doesn't need explicit configuration or explicit
> >>> mapping of header names to types, and thus can be used as the default.
> >>>
> >>> Finally, while I hope that `SimpleHeaderConverter` and its schema
> >>> inference will work most of the time with no special configuration,
> >>> especially since the `Header` interface makes it easy to cast/convert
> in
> >>> sink connectors and SMTs, I do like how your
> `PrimativeSubjectConverter`
> >>> allows the user to manually control how the values are serialized. I
> >>> thought of doing something similar, but I think that can be done at a
> later
> >>> time if/when needed.
> >>>
> >>> I hope that makes sense.
> >>>
> >>> Randall
> >>>
> >>> On Tue, Dec 12, 2017 at 11:35 PM, Michael André Pearce <
> >>> michael.andre.pea...@me.com> wrote:
> >>>
>  Hi Randall
> 
>  What’s the main difference between this and my earlier alternative
>  option PR
>  

Re: [DISCUSS] KIP 145 - Expose Record Headers in Kafka Connect

2018-01-02 Thread Randall Hauch
There's been a bit of discussion on the PR about the choice of the default
header converter. The proposal currently uses the new
`SimpleHeaderConverter` so that by default connector devs and users get
meaningful header values by default without much work. An alternative is to
default to `ByteArrayConverter` so that by default the framework doesn't
have to do much effort if headers aren't used/needed.

Thoughts?

On Tue, Dec 26, 2017 at 11:47 AM, Randall Hauch  wrote:

> Does anyone have any thoughts about this proposal for Connect header
> support?
>
> On Thu, Dec 21, 2017 at 4:14 PM, Randall Hauch  wrote:
>
>> All,
>>
>> I've updated KIP-145 to reflect my proposal. The proposal addresses SMTs
>> and a different HeaderConverter default, but I'll be updating my PR (
>> https://github.com/apache/kafka/pull/4319) soon. Feedback is very
>> welcome!
>>
>> Best regards,
>>
>> Randall
>>
>> On Thu, Dec 14, 2017 at 10:20 AM, Randall Hauch  wrote:
>>
>>> Hi, Michael. Yeah, I liked your PR a lot, and there definitely are a lot
>>> of similarities. But here are the more significant differences from my
>>> perspective (none of which are really that big):
>>>
>>> First, your `SubjectConverter` and my `HeaderConverter` are pretty
>>> similar -- mine is just more closely tied to headers. Also, we used
>>> slightly different approaches to dealing with the fact that the `Converter`
>>> interface does not extend `Configurable`, which Connect now uses for
>>> transforms, connectors, etc. And our implementations take very different
>>> approaches (see below).
>>>
>>> Second, I tried to follow Kafka client's `Header` and `Headers`
>>> interfaces (at least in concept) so that ConnectRecord has a `Headers`
>>> rather than a list of headers. It's a minor distinction, but I do think
>>> it's important for future-proofing to have an interface for the collection
>>> to abstract and encapsulate logic/behavior as well as leaving room for
>>> alternative implementations. It also a convenient place to add methods for
>>> source connectors and SMTs to easily add/modify/remove/transform headers.
>>>
>>> Third, our "header converter" implementations are where most of the
>>> differences lie. Again, this goes back to my assertion that we should make
>>> the serdes and cast/conversion orthogonal. If we allow sink connectors and
>>> SMTs to get header values in the type they want (e.g.,
>>> `Header.valueAsFloat()`), then we can tolerate a bit more variation in how
>>> the header values are serialized and deserialized, since the serdes
>>> mechanism doesn't have to get the type exactly right for the sink connector
>>> and SMT. My `SimpleHeaderConverter` serializes all of the types to strings,
>>> but during deserialization it attempts to infer the schemas (easy for
>>> primitive values, a bit harder for structured types). IIUC, neither your
>>> approach or mine is really able to maintain Struct schemas, but IMO we can
>>> add that over time with improved/different header converters if people
>>> really need it.
>>>
>>> Fourth, we use different defaults for the serdes implementation. I
>>> dislike the StringConverter because it converts everything to strings that
>>> are then difficult to convert back to the original form, especially for the
>>> structured types. This is why I created the `SimpleHeaderConverter`
>>> implementation, which doesn't need explicit configuration or explicit
>>> mapping of header names to types, and thus can be used as the default.
>>>
>>> Finally, while I hope that `SimpleHeaderConverter` and its schema
>>> inference will work most of the time with no special configuration,
>>> especially since the `Header` interface makes it easy to cast/convert in
>>> sink connectors and SMTs, I do like how your `PrimativeSubjectConverter`
>>> allows the user to manually control how the values are serialized. I
>>> thought of doing something similar, but I think that can be done at a later
>>> time if/when needed.
>>>
>>> I hope that makes sense.
>>>
>>> Randall
>>>
>>> On Tue, Dec 12, 2017 at 11:35 PM, Michael André Pearce <
>>> michael.andre.pea...@me.com> wrote:
>>>
 Hi Randall

 What’s the main difference between this and my earlier alternative
 option PR
 https://github.com/apache/kafka/pull/2942/files

 If none then +1.
 From what I can tell the only difference I make is the headers you
 support being able to cross convert primitive types eg if value after
 conversion is integer you can still ask for float and it will type concert
 if possible.

 Cheers
 Mike


 Sent from my iPhone

 > On 13 Dec 2017, at 01:36, Randall Hauch  wrote:
 >
 > Trying to revive this after several months of inactivity
 >
 > I've spent quite a bit of time evaluating the current KIP-145
 proposal and
 > several of the suggested PRs. The original KIP-145 proposal is
 

Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability

2018-01-02 Thread Becket Qin
Hi Colin,

Good point about KIP-226. Maybe a separate broker epoch is needed although
it is a little awkward to let the consumer set this. So was there a
solution to the frequent pause and resume scenario? Did I miss something?

Thanks,

Jiangjie (Becket) Qin

On Wed, Dec 27, 2017 at 1:40 PM, Colin McCabe  wrote:

> On Sat, Dec 23, 2017, at 09:15, Becket Qin wrote:
> > Hi Colin,
> >
> > Thanks for the explanation. I want to clarify a bit more on my thoughts.
> >
> > I am fine with having a separate discussion as long as the follow-up
> > discussion will be incremental on top of this KIP instead of override the
> > protocol in this KIP.
>
> Hi Becket,
>
> Thanks for the clarification.  I do think that the changes we've been
> discussing would be incremental rather than completely replacing what we've
> talked about here.  See my responses inline below.
>
> >
> > I completely agree this KIP is useful by itself. That being said, we want
> > to avoid falling into a "local optimal" solution by just saying because
> it
> > solves the problem in this scope. I think we should also think if the
> > solution aligns with a "global optimal" (systematic optimal) solution as
> > well. That is why I brought up other considerations. If they turned out
> to
> > be orthogonal and should be addressed separately, that's good. But at
> least
> > it is worth thinking about the potential connections between those
> things.
> >
> > One example of such related consideration is the following two seemingly
> > unrelated things:
> >
> > 1. I might have missed the discussion, but it seems the concern of the
> > clients doing frequent pause and resume is still not addressed. Since
> this
> > is a pretty common use case for applications that want to have flow
> > control, or have prioritized consumption, or get consumption fairness, we
> > probably want to see how to handle this case. One of the solution might
> be
> > a long-lived session id spanning the clients' life time.
> >
> > 2. KAFKA-6029. The key problem is that the leader wants to know if a
> fetch
> > request is from a shutting down broker or from a restarted broker.
> >
> > The connection between those two issues is that both of them could be
> > addressed by having a life-long session id for each client (or fetcher,
> to
> > be more accurate). This may indicate that having a life long session id
> > might be a "global optimal" solution so it should be considered in this
> > KIP. Otherwise, a follow up KIP discussion for KAFKA-6029 may either
> > introduce a broker epoch unnecessarily (which will not be used by the
> > consumers at all) or override what we do in this KIP.
>
> Remember that a given follower will have more than one fetch session ID.
> Each fetcher thread will have its own session ID.  And we will eventually
> be able to dynamically add or remove fetcher threads using KIP-226.
> Therefore, we can't use fetch session IDs to uniquely identify a given
> broker incarnation.  Any time we increase the number of fetcher threads, a
> new fetch session ID will show up.
>
> If we want to know if a fetch request is from a shutting down broker or
> from a restarted broker, the most straightforward and robust way would
> probably be to add an incarnation number for each broker.  ZK can track
> this number.  This also helps with debugging and logging (you can tell
> "aha-- this request came from the second incarnation, not the first."
>
> > BTW, to clarify, the main purpose of returning the data at the index
> > boundary was to get the same benefit of efficient incremental fetch for
> > both low vol and high vol partitions, which is directly related to the
> > primary goal in this KIP. The other things (such as avoiding binary
> search)
> > are just potential additional gain, and they are also brought up to see
> if
> > that could be a "global optimal" solution.
>
> I still think these are separate.  The primary goal of the KIP was to make
> fetch requests where not all partitions are returning data more efficient.
> This isn't really related to the goal of trying to make accessing
> historical data more efficient.  In most cases, the data we're accessing is
> very recent data, and index lookups are not an issue.
>
> >
> > Some other replies below.
> > >In order for improvements to succeed, I think that it's important to
> > clearly define the scope and goals.  One good example of this was the
> > AdminClient KIP.  We deliberately avoiding ?>discussing new
> administrative
> > RPCs in that KIP, in order to limit the scope.  This kept the discussion
> > focused on the user interfaces and configuration, rather than on the
> > details of possible >new RPCs.  Once the KIP was completed, it was easy
> for
> > us to add new RPCs later in separate KIPs.
> > Hmm, why is AdminClient is related? All the discussion are about how to
> > make fetch more efficient, right?
> >
> > >Finally, it's not clear that the approach you are proposing is the right
> > way to go.  

[jira] [Created] (KAFKA-6415) KafkaLog4jAppender deadlocks when logging from producer network thread

2018-01-02 Thread Rajini Sivaram (JIRA)
Rajini Sivaram created KAFKA-6415:
-

 Summary: KafkaLog4jAppender deadlocks when logging from producer 
network thread
 Key: KAFKA-6415
 URL: https://issues.apache.org/jira/browse/KAFKA-6415
 Project: Kafka
  Issue Type: Bug
  Components: log
Reporter: Rajini Sivaram
 Fix For: 1.1.0


If a log entry in producer network thread in the metadata update path is 
appended to a Kafka topic using KafkaLog4jAppender, a new send is initiated 
from the network thread which cannot complete since the metadata wait triggered 
by the new send from the network thread waits for metadata from the network 
thread, resulting in a deadlock.

This was probably the case right from the beginning when KafkaLog4jAppender was 
introduced, but did not cause any issues so far since there were only debug log 
entries in that path which were not logged to a Kafka topic by any of the 
tests. A recent info level log entry introduced by the commit 
https://github.com/apache/kafka/commit/a3aea3cf4dbedb293f2d7859e0298bebc8e2185f 
is causing system test failures in log4j_appender_test.py due to the deadlock.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)