[jira] [Resolved] (KAFKA-9102) Increase default zk session timeout and max lag

2019-10-25 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-9102.

Resolution: Fixed

> Increase default zk session timeout and max lag
> ---
>
> Key: KAFKA-9102
> URL: https://issues.apache.org/jira/browse/KAFKA-9102
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.5.0
>
>
> This tracks the implementation of KIP-537: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-537%3A+Increase+default+zookeeper+session+timeout



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Jenkins build is back to normal : kafka-trunk-jdk11 #914

2019-10-25 Thread Apache Jenkins Server
See 




Jenkins build is back to normal : kafka-trunk-jdk8 #3999

2019-10-25 Thread Apache Jenkins Server
See 




[DISCUSS] KIP-544: Make metrics exposed via JMX configurable

2019-10-25 Thread Xavier Léauté
Hi All,

I wrote a short KIP to make the set of metrics exposed via JMX configurable.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-544%3A+Make+metrics+exposed+via+JMX+configurable

Let me know what you think.

Thanks,
Xavier


[jira] [Created] (KAFKA-9106) metrics exposed via JMX shoud be configurable

2019-10-25 Thread Jira
Xavier Léauté created KAFKA-9106:


 Summary: metrics exposed via JMX shoud be configurable
 Key: KAFKA-9106
 URL: https://issues.apache.org/jira/browse/KAFKA-9106
 Project: Kafka
  Issue Type: Improvement
  Components: metrics
Reporter: Xavier Léauté
Assignee: Xavier Léauté


Kafka exposes a very large number of metrics, all of which are always visible 
in JMX by default. On large clusters with many partitions, this may result in 
tens of thousands of mbeans to be registered, which can lead to timeouts with 
some popular monitoring agents that rely on listing JMX metrics via RMI.

Making the set of JMX-visible metrics configurable would allow operators to 
decide on the set of critical metrics to collect and workaround limitation of 
JMX in those cases.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-25 Thread Vinoth Chandar
+1 to Sophie's suggestion. Having both lag in terms of time and offsets is
good and makes for a more complete API.

Since there are two soft votes for separate active/standby API methods, I
also change my position on that. Fine with 2 separate methods.
Once we remove the lag information from these APIs, returning a List is
less attractive, since the ordering has no special meaning now.

>> lag in offsets vs time: Having both, as suggested by Sophie would of
course be best. What is a little unclear to me is, how in details are we
going to compute both?
@navinder may be next step is to flesh out these details and surface any
larger changes we need to make if need be.

Any other details we need to cover, before a VOTE can be called on this?


On Fri, Oct 25, 2019 at 1:51 PM Bill Bejeck  wrote:

> I am jumping in a little late here.
>
> Overall I agree with the proposal to push decision making on what/how to
> query in the query layer.
>
> For point 5 from above, I'm slightly in favor of having a new method,
> "standbyMetadataForKey()" or something similar.
> Because even if we return all tasks in one list, the user will still have
> to perform some filtering to separate the different tasks, so I don't feel
> making two calls is a burden, and IMHO makes things more transparent for
> the user.
> If the final vote is for using an "isActive" field, I'm good with that as
> well.
>
> Just my 2 cents.
>
> On Fri, Oct 25, 2019 at 5:09 AM Navinder Brar
>  wrote:
>
> > I think now we are aligned on almost all the design parts. Summarising
> > below what has been discussed above and we have a general consensus on.
> >
> >
> >- Rather than broadcasting lag across all nodes at rebalancing/with
> the
> > heartbeat, we will just return a list of all available standby’s in the
> > system and the user can make IQ query any of those nodes which will
> return
> > the response, and the lag and offset time. Based on which user can decide
> > if he wants to return the response back or call another standby.
> >-  The current metadata query frequency will not change. It will be
> the
> > same as it does now, i.e. before each query.
> >
> >-  For fetching list in StreamsMetadataState.java and
> > List in StreamThreadStateStoreProvider.java
> (which
> > will return all active stores which are running/restoring and replica
> > stores which are running), we will add new functions and not disturb the
> > existing functions
> >
> >- There is no need to add new StreamsConfig for implementing this KIP
> >
> >- We will add standbyPartitionsByHost in AssignmentInfo and
> > StreamsMetadataState which would change the existing rebuildMetadata()
> and
> > setPartitionsByHostState()
> >
> >
> >
> > If anyone has any more concerns please feel free to add. Post this I will
> > be initiating a vote.
> > ~Navinder
> >
> > On Friday, 25 October, 2019, 12:05:29 pm IST, Matthias J. Sax <
> > matth...@confluent.io> wrote:
> >
> >  Just to close the loop @Vinoth:
> >
> > > 1. IIUC John intends to add (or we can do this in this KIP) lag
> > information
> > > to AssignmentInfo, which gets sent to every participant.
> >
> > As explained by John, currently KIP-441 plans to only report the
> > information to the leader. But I guess, with the new proposal to not
> > broadcast this information anyway, this concern is invalidated anyway
> >
> > > 2. At-least I was under the assumption that it can be called per query,
> > > since the API docs don't seem to suggest otherwise. Do you see any
> > > potential issues if we call this every query? (we should benchmark this
> > > nonetheless)
> >
> > I did not see a real issue if people refresh the metadata frequently,
> > because it would be a local call. My main point was, that this would
> > change the current usage pattern of the API, and we would clearly need
> > to communicate this change. Similar to (1), this concern in invalidated
> > anyway.
> >
> >
> > @John: I think it's a great idea to get rid of reporting lag, and
> > pushing the decision making process about "what to query" into the query
> > serving layer itself. This simplifies the overall design of this KIP
> > significantly, and actually aligns very well with the idea that Kafka
> > Streams (as it is a library) should only provide the basic building
> > block. Many of my raised questions are invalided by this.
> >
> >
> >
> > Some questions are still open though:
> >
> > > 10) Do we need to distinguish between active(restoring) and standby
> > > tasks? Or could be treat both as the same?
> >
> >
> > @Vinoth: about (5). I see your point about multiple calls vs a single
> > call. I still slightly prefer multiple calls, but it's highly subjective
> > and I would also be fine to add an #isActive() method. Would be good the
> > get feedback from others.
> >
> >
> > For (14), ie, lag in offsets vs time: Having both, as suggested by
> > Sophie would of course be best. What is a little unclear to me is, how
> > in details are we going to 

[jira] [Resolved] (KAFKA-9105) Truncate producer state when incrementing log start offset

2019-10-25 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-9105.

Resolution: Fixed

> Truncate producer state when incrementing log start offset
> --
>
> Key: KAFKA-9105
> URL: https://issues.apache.org/jira/browse/KAFKA-9105
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
>Reporter: Bob Barrett
>Assignee: Bob Barrett
>Priority: Blocker
>
> As part of the fix for KAFKA-7190, we removed the 
> ProducerStateManager.truncateHead method as part of the change to retain 
> producer state for longer. This removed some needed producer state management 
> (such as removing unreplicated transactions) when incrementing the log start 
> offset. We need to add this functionality back in.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-8968) Refactor Task-level Metrics

2019-10-25 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck resolved KAFKA-8968.

Resolution: Fixed

> Refactor Task-level Metrics
> ---
>
> Key: KAFKA-8968
> URL: https://issues.apache.org/jira/browse/KAFKA-8968
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 2.5.0
>
>
> Refactor task-level metrics as proposed in KIP-444.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-25 Thread Bill Bejeck
I am jumping in a little late here.

Overall I agree with the proposal to push decision making on what/how to
query in the query layer.

For point 5 from above, I'm slightly in favor of having a new method,
"standbyMetadataForKey()" or something similar.
Because even if we return all tasks in one list, the user will still have
to perform some filtering to separate the different tasks, so I don't feel
making two calls is a burden, and IMHO makes things more transparent for
the user.
If the final vote is for using an "isActive" field, I'm good with that as
well.

Just my 2 cents.

On Fri, Oct 25, 2019 at 5:09 AM Navinder Brar
 wrote:

> I think now we are aligned on almost all the design parts. Summarising
> below what has been discussed above and we have a general consensus on.
>
>
>- Rather than broadcasting lag across all nodes at rebalancing/with the
> heartbeat, we will just return a list of all available standby’s in the
> system and the user can make IQ query any of those nodes which will return
> the response, and the lag and offset time. Based on which user can decide
> if he wants to return the response back or call another standby.
>-  The current metadata query frequency will not change. It will be the
> same as it does now, i.e. before each query.
>
>-  For fetching list in StreamsMetadataState.java and
> List in StreamThreadStateStoreProvider.java (which
> will return all active stores which are running/restoring and replica
> stores which are running), we will add new functions and not disturb the
> existing functions
>
>- There is no need to add new StreamsConfig for implementing this KIP
>
>- We will add standbyPartitionsByHost in AssignmentInfo and
> StreamsMetadataState which would change the existing rebuildMetadata() and
> setPartitionsByHostState()
>
>
>
> If anyone has any more concerns please feel free to add. Post this I will
> be initiating a vote.
> ~Navinder
>
> On Friday, 25 October, 2019, 12:05:29 pm IST, Matthias J. Sax <
> matth...@confluent.io> wrote:
>
>  Just to close the loop @Vinoth:
>
> > 1. IIUC John intends to add (or we can do this in this KIP) lag
> information
> > to AssignmentInfo, which gets sent to every participant.
>
> As explained by John, currently KIP-441 plans to only report the
> information to the leader. But I guess, with the new proposal to not
> broadcast this information anyway, this concern is invalidated anyway
>
> > 2. At-least I was under the assumption that it can be called per query,
> > since the API docs don't seem to suggest otherwise. Do you see any
> > potential issues if we call this every query? (we should benchmark this
> > nonetheless)
>
> I did not see a real issue if people refresh the metadata frequently,
> because it would be a local call. My main point was, that this would
> change the current usage pattern of the API, and we would clearly need
> to communicate this change. Similar to (1), this concern in invalidated
> anyway.
>
>
> @John: I think it's a great idea to get rid of reporting lag, and
> pushing the decision making process about "what to query" into the query
> serving layer itself. This simplifies the overall design of this KIP
> significantly, and actually aligns very well with the idea that Kafka
> Streams (as it is a library) should only provide the basic building
> block. Many of my raised questions are invalided by this.
>
>
>
> Some questions are still open though:
>
> > 10) Do we need to distinguish between active(restoring) and standby
> > tasks? Or could be treat both as the same?
>
>
> @Vinoth: about (5). I see your point about multiple calls vs a single
> call. I still slightly prefer multiple calls, but it's highly subjective
> and I would also be fine to add an #isActive() method. Would be good the
> get feedback from others.
>
>
> For (14), ie, lag in offsets vs time: Having both, as suggested by
> Sophie would of course be best. What is a little unclear to me is, how
> in details are we going to compute both?
>
>
>
> -Matthias
>
>
>
> On 10/24/19 11:07 PM, Sophie Blee-Goldman wrote:
> > Just to chime in on the "report lag vs timestamp difference" issue, I
> would
> > actually advocate for both. As mentioned already, time difference is
> > probably a lot easier and/or more useful to reason about in terms of
> > "freshness"
> > of the state. But in the case when all queried stores are far behind, lag
> > could
> > be used to estimate the recovery velocity. You can then get a (pretty
> rough)
> > idea of when a store might be ready, and wait until around then to query
> > again.
> >
> > On Thu, Oct 24, 2019 at 9:53 PM Guozhang Wang 
> wrote:
> >
> >> I think I agree with John's recent reasoning as well: instead of letting
> >> the storeMetadataAPI to return the staleness information, letting the
> >> client to query either active or standby and letting standby query
> response
> >> to include both the values + timestamp (or lag, as in diffs of
> timestamps)
> >> would actually be 

Re: [DISCUSS] KIP-150 - Kafka-Streams Cogroup

2019-10-25 Thread Walker Carlson
Hi Guozhang,

1. I am familiar with the cogroup of spark, it is very similar to
their join operator but instead it makes the values iterable. I think that
the use cases are different enough that it makes sense to specify the
aggregator when we do.

I like the idea of "absorb" and I think it could be useful. Although I do
not think it is as intuitive.

If we were to go that route we would either use more processors or do
essentially the same thing but would have to store the information
required to cogroup inside that KTable. I think this would violate some
design principles. I would argue that we should consider adding absorb as
well and auto re-write it to use cogroup.

2. We have not considered this thought that would be a convenient operation.

3. There is only one processor made. We are actually having the naming
conversation right now in the above thread

4, 5. fair points

Walker

On Fri, Oct 25, 2019 at 11:58 AM Guozhang Wang  wrote:

> Hi Walker, thanks for the KIP! I made a pass on the writeup and have some
> comments below:
>
> Meta:
>
> 1. Syntax-wise, I'm wondering if we have compared our current proposal with
> Spark's co-group syntax (I know they are targeting for different use cases,
> but wondering if their syntax is closer to the join operator), what are the
> syntax / semantics trade-off here?
>
> Just playing a devil's advocate here, if the main motivation is to provide
> a more convienent multi-way join syntax, and in order to only have one
> materialized store we need to specify the final joined format at the
> beginning, then what about the following alternative (with the given
> example in your wiki page):
>
>
> KGroupedStream grouped1 = builder.stream("topic1").groupByKey();
> KGroupedStream grouped2 = builder.stream("topic2").groupByKey();
> KGroupedStream grouped3 = builder.stream("topic3").groupByKey();
>
> KTable aggregated = grouped1.aggregate(initializer, materialized,
> aggregator1);
>
> aggregated.absorb(grouped2, aggregator2);  // I'm just using a random name
> on top of my head here
>   .absorb(grouped3, aggregator3);
>
> In this way, we just add a new API to the KTable to "absorb" new streams as
> aggregated results without needing to introduce new first citizen classes.
>
> 2. From the DSL optimization, have we considered if we can auto re-write
> the user written old fashioned multi-join into this new DSL operator?
>
> 3. Although it is not needed for the wiki page itself, for internal
> implementation how many processor nodes would we create for the new
> operator, and how we can allow users to name them?
>
> Minor:
>
> 4. In "Public Interfaces", better add the templated generics to
> "KGroupedStream" as "KGroupedStream".
>
> 5. Naming wise, I'd suggest we keep the "K" together with Stream/Table,
> e.g. "TimeWindowed*CogroupedKStream*".
>
>
> Guozhang
>
>
>
>
> On Thu, Oct 24, 2019 at 11:43 PM Matthias J. Sax 
> wrote:
>
> > Walker,
> >
> > I am not sure if I can follow your argument. What do you exactly mean by
> >
> > > I also
> > >> think that in this case it would be better to separate the 2 option
> out
> > >> into separate overloads.
> >
> > Maybe you can give an example what method signature you have in mind?
> >
> > >> We could take a named parameter from upstream or add an extra naming
> > option
> > >> however I don't really see the advantage that would give.
> >
> > Are you familiar with KIP-307? Before KIP-307, KS generated all names
> > for all Processors. This makes it hard to reason about a Topology if
> > it's getting complex. Adding `Named` to the new co-group operator would
> > actually align with KIP-307.
> >
> > > It seems to go in
> > >> the opposite direction from the cogroup configuration idea you
> proposed.
> >
> > Can you elaborate? Not sure if I can follow.
> >
> >
> >
> > -Matthias
> >
> >
> > On 10/24/19 10:20 AM, Walker Carlson wrote:
> > > While I like the idea Sophie I don't think that it is necessary. I also
> > > think that in this case it would be better to separate the 2 option out
> > > into separate overloads.
> > > We could take a named parameter from upstream or add an extra naming
> > option
> > > however I don't really see the advantage that would give. It seems to
> go
> > in
> > > the opposite direction from the cogroup configuration idea you
> proposed.
> > >
> > > John, I think it could be both. It depends on when you aggregate and
> what
> > > kind of data you have. In the example it is aggregating before joining,
> > > there are probably some cases where you could join before aggregating.
> > IMHO
> > > it would be easier to group all the streams together then perform the
> one
> > > operation that results in a single KTable.
> > >
> > >
> > >
> > > On Wed, Oct 23, 2019 at 9:58 PM Sophie Blee-Goldman <
> sop...@confluent.io
> > >
> > > wrote:
> > >
> > >>> I can personally not see any need to add other configuration
> > >> Famous last words?
> > >>
> > >> Just kidding, 95% confidence is more than enough 

Re: [DISCUSS] KIP-150 - Kafka-Streams Cogroup

2019-10-25 Thread Guozhang Wang
Hi Walker, thanks for the KIP! I made a pass on the writeup and have some
comments below:

Meta:

1. Syntax-wise, I'm wondering if we have compared our current proposal with
Spark's co-group syntax (I know they are targeting for different use cases,
but wondering if their syntax is closer to the join operator), what are the
syntax / semantics trade-off here?

Just playing a devil's advocate here, if the main motivation is to provide
a more convienent multi-way join syntax, and in order to only have one
materialized store we need to specify the final joined format at the
beginning, then what about the following alternative (with the given
example in your wiki page):


KGroupedStream grouped1 = builder.stream("topic1").groupByKey();
KGroupedStream grouped2 = builder.stream("topic2").groupByKey();
KGroupedStream grouped3 = builder.stream("topic3").groupByKey();

KTable aggregated = grouped1.aggregate(initializer, materialized,
aggregator1);

aggregated.absorb(grouped2, aggregator2);  // I'm just using a random name
on top of my head here
  .absorb(grouped3, aggregator3);

In this way, we just add a new API to the KTable to "absorb" new streams as
aggregated results without needing to introduce new first citizen classes.

2. From the DSL optimization, have we considered if we can auto re-write
the user written old fashioned multi-join into this new DSL operator?

3. Although it is not needed for the wiki page itself, for internal
implementation how many processor nodes would we create for the new
operator, and how we can allow users to name them?

Minor:

4. In "Public Interfaces", better add the templated generics to
"KGroupedStream" as "KGroupedStream".

5. Naming wise, I'd suggest we keep the "K" together with Stream/Table,
e.g. "TimeWindowed*CogroupedKStream*".


Guozhang




On Thu, Oct 24, 2019 at 11:43 PM Matthias J. Sax 
wrote:

> Walker,
>
> I am not sure if I can follow your argument. What do you exactly mean by
>
> > I also
> >> think that in this case it would be better to separate the 2 option out
> >> into separate overloads.
>
> Maybe you can give an example what method signature you have in mind?
>
> >> We could take a named parameter from upstream or add an extra naming
> option
> >> however I don't really see the advantage that would give.
>
> Are you familiar with KIP-307? Before KIP-307, KS generated all names
> for all Processors. This makes it hard to reason about a Topology if
> it's getting complex. Adding `Named` to the new co-group operator would
> actually align with KIP-307.
>
> > It seems to go in
> >> the opposite direction from the cogroup configuration idea you proposed.
>
> Can you elaborate? Not sure if I can follow.
>
>
>
> -Matthias
>
>
> On 10/24/19 10:20 AM, Walker Carlson wrote:
> > While I like the idea Sophie I don't think that it is necessary. I also
> > think that in this case it would be better to separate the 2 option out
> > into separate overloads.
> > We could take a named parameter from upstream or add an extra naming
> option
> > however I don't really see the advantage that would give. It seems to go
> in
> > the opposite direction from the cogroup configuration idea you proposed.
> >
> > John, I think it could be both. It depends on when you aggregate and what
> > kind of data you have. In the example it is aggregating before joining,
> > there are probably some cases where you could join before aggregating.
> IMHO
> > it would be easier to group all the streams together then perform the one
> > operation that results in a single KTable.
> >
> >
> >
> > On Wed, Oct 23, 2019 at 9:58 PM Sophie Blee-Goldman  >
> > wrote:
> >
> >>> I can personally not see any need to add other configuration
> >> Famous last words?
> >>
> >> Just kidding, 95% confidence is more than enough to  me (and better to
> >> optimize for current
> >> design than for hypothetical future changes).
> >>
> >> One last question I have then is about the operator/store/repartition
> >> naming -- seems like
> >> we can name the underlying store/changelog through the Materialized
> >> parameter, but do we
> >> also want to include an overload taking a Named parameter for the
> operator
> >> name (as in the
> >> KTable#join variations)?
> >>
> >> On Wed, Oct 23, 2019 at 5:14 PM Matthias J. Sax 
> >> wrote:
> >>
> >>> Interesting idea, Sophie.
> >>>
> >>> So far, we tried to reuse existing config objects and only add new ones
> >>> when needed to avoid creating "redundant" classes. This is of course a
> >>> reactive approach (with the drawback to deprecate stuff if we change
> it,
> >>> as you described).
> >>>
> >>> I can personally not see any need to add other configuration parameters
> >>> atm, so it's a 95% obvious "no" IMHO. The final `aggregate()` has only
> a
> >>> single state store that we need to configure, and reusing
> `Materialized`
> >>> seems to be appropriate.
> >>>
> >>> Also note, that the `Initializer` is a mandatory parameter and not a
> >>> configuration and 

Build failed in Jenkins: kafka-2.4-jdk8 #40

2019-10-25 Thread Apache Jenkins Server
See 


Changes:

[jason] KAFKA-9038; Allow creating partitions while a reassignment is in

[jason] KAFKA-8992; Redefine RemoveMembersFromGroup interface on AdminClient 


--
[...truncated 5.09 MB...]
kafka.server.MetadataRequestTest > testClusterIdIsValid PASSED

kafka.server.MetadataRequestTest > testNoTopicsRequest STARTED

kafka.server.MetadataRequestTest > testNoTopicsRequest PASSED

kafka.server.MetadataRequestTest > 
testAutoCreateTopicWithInvalidReplicationFactor STARTED

kafka.server.MetadataRequestTest > 
testAutoCreateTopicWithInvalidReplicationFactor PASSED

kafka.server.MetadataRequestTest > testPreferredReplica STARTED

kafka.server.MetadataRequestTest > testPreferredReplica PASSED

kafka.server.MetadataRequestTest > testClusterIdWithRequestVersion1 STARTED

kafka.server.MetadataRequestTest > testClusterIdWithRequestVersion1 PASSED

kafka.server.MetadataRequestTest > testAutoTopicCreation STARTED

kafka.server.MetadataRequestTest > testAutoTopicCreation PASSED

kafka.server.EdgeCaseRequestTest > testInvalidApiVersionRequest STARTED

kafka.server.EdgeCaseRequestTest > testInvalidApiVersionRequest PASSED

kafka.server.EdgeCaseRequestTest > testMalformedHeaderRequest STARTED

kafka.server.EdgeCaseRequestTest > testMalformedHeaderRequest PASSED

kafka.server.EdgeCaseRequestTest > testProduceRequestWithNullClientId STARTED

kafka.server.EdgeCaseRequestTest > testProduceRequestWithNullClientId PASSED

kafka.server.EdgeCaseRequestTest > testInvalidApiKeyRequest STARTED

kafka.server.EdgeCaseRequestTest > testInvalidApiKeyRequest PASSED

kafka.server.EdgeCaseRequestTest > testHeaderOnlyRequest STARTED

kafka.server.EdgeCaseRequestTest > testHeaderOnlyRequest PASSED

kafka.server.epoch.OffsetsForLeaderEpochTest > shouldGetEpochsFromReplica 
STARTED

kafka.server.epoch.OffsetsForLeaderEpochTest > shouldGetEpochsFromReplica PASSED

kafka.server.epoch.OffsetsForLeaderEpochTest > 
shouldReturnUnknownTopicOrPartitionIfThrown STARTED

kafka.server.epoch.OffsetsForLeaderEpochTest > 
shouldReturnUnknownTopicOrPartitionIfThrown PASSED

kafka.server.epoch.OffsetsForLeaderEpochTest > 
shouldReturnNoLeaderForPartitionIfThrown STARTED

kafka.server.epoch.OffsetsForLeaderEpochTest > 
shouldReturnNoLeaderForPartitionIfThrown PASSED

kafka.server.epoch.LeaderEpochIntegrationTest > 
shouldIncreaseLeaderEpochBetweenLeaderRestarts STARTED

kafka.server.epoch.LeaderEpochIntegrationTest > 
shouldIncreaseLeaderEpochBetweenLeaderRestarts PASSED

kafka.server.epoch.LeaderEpochIntegrationTest > 
shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader STARTED

kafka.server.epoch.LeaderEpochIntegrationTest > 
shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader PASSED

kafka.server.epoch.LeaderEpochIntegrationTest > 
shouldSendLeaderEpochRequestAndGetAResponse STARTED

kafka.server.epoch.LeaderEpochIntegrationTest > 
shouldSendLeaderEpochRequestAndGetAResponse PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldFetchLatestEpochOfEmptyCache STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldFetchLatestEpochOfEmptyCache PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldNotUpdateEpochAndStartOffsetIfItDidNotChange STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldNotUpdateEpochAndStartOffsetIfItDidNotChange PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldDropEntriesBetweenEpochBoundaryWhenRemovingNewest STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldDropEntriesBetweenEpochBoundaryWhenRemovingNewest PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldReturnLogEndOffsetIfLatestEpochRequested STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldReturnLogEndOffsetIfLatestEpochRequested PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldNotClearAnythingIfOffsetToEarly STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldNotClearAnythingIfOffsetToEarly PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldIncreaseAndTrackEpochsAsFollowerReceivesManyMessages STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldIncreaseAndTrackEpochsAsFollowerReceivesManyMessages PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldTruncateIfMatchingEpochButEarlierStartingOffset STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldTruncateIfMatchingEpochButEarlierStartingOffset PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldNotOverwriteOffsetForALeaderEpochOnceItHasBeenAssigned STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldNotOverwriteOffsetForALeaderEpochOnceItHasBeenAssigned PASSED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldNotOverwriteLogEndOffsetForALeaderEpochOnceItHasBeenAssigned STARTED

kafka.server.epoch.LeaderEpochFileCacheTest > 
shouldNotOverwriteLogEndOffsetForALeaderEpochOnceItHasBeenAssigned PASSED


Re: [VOTE] KIP-514 Add a bounded flush() API to Kafka Producer

2019-10-25 Thread Harsha Chintalapani
+1 (binding)
-Harsha


On Fri, Oct 25 2019 at 11:01 AM,  wrote:

>
> +1
>
> On Thu, Oct 24, 2019 at 9:33 PM radai  wrote:
>
> Hello,
>
> >
>
> I'd like to initiate a vote on KIP-514.
>
> >
>
> links:
> the kip -
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-514%3A+Add+a+bounded+flush%28%29+API+to+Kafka+Producer
> the PR - https://github.com/apache/kafka/pull/7569
>
> >
>
> Thank you
>
> >
>


Re: [VOTE] KIP-514 Add a bounded flush() API to Kafka Producer

2019-10-25 Thread Joel Koshy
+1

On Thu, Oct 24, 2019 at 9:33 PM radai  wrote:

> Hello,
>
> I'd like to initiate a vote on KIP-514.
>
> links:
> the kip -
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-514%3A+Add+a+bounded+flush%28%29+API+to+Kafka+Producer
> the PR - https://github.com/apache/kafka/pull/7569
>
> Thank you
>


[jira] [Created] (KAFKA-9105) Truncate producer state when incrementing log start offset

2019-10-25 Thread Bob Barrett (Jira)
Bob Barrett created KAFKA-9105:
--

 Summary: Truncate producer state when incrementing log start offset
 Key: KAFKA-9105
 URL: https://issues.apache.org/jira/browse/KAFKA-9105
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.4.0
Reporter: Bob Barrett
Assignee: Bob Barrett


In github.com/apache/kafka/commit/c49775b, we removed the 
ProducerStateManager.truncateHead method as part of the change to retain 
producer state for longer. This removed some needed producer state management 
(such as removing unreplicated transactions) when incrementing the log start 
offset. We need to add this functionality back in.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-150 - Kafka-Streams Cogroup

2019-10-25 Thread Walker Carlson
Matthias,

I am not familiar with KIP-307. After looking into it a bit I think I
understand better what Named is for and it answers some of my concerns. I
think that we should consider how many more methods it would make, there
would be 12 aggregate functions including timeWindowed and sessionWindowed.
If we were to add Named then Sophie's idea of using a Cogrouped object to
hold the parameters (materialized, named) makes a lot more sense.

Another option is that because there is only one processor made so we
probably do not need to have that option in the aggregate call. It would be
natural to add to the option to cogroup method in KGroupedStreams.

To Sophie's point earlier "(ie the first stream is joined as
stream.cogroup(Aggregator) while the subsequent ones are joined as
.cogroup(Stream, Aggregator) )", we could make it optional to have the
first stream joined stream.cogroup(Aggregator, Named)

On Thu, Oct 24, 2019 at 11:43 PM Matthias J. Sax 
wrote:

> Walker,
>
> I am not sure if I can follow your argument. What do you exactly mean by
>
> > I also
> >> think that in this case it would be better to separate the 2 option out
> >> into separate overloads.
>
> Maybe you can give an example what method signature you have in mind?
>
> >> We could take a named parameter from upstream or add an extra naming
> option
> >> however I don't really see the advantage that would give.
>
> Are you familiar with KIP-307? Before KIP-307, KS generated all names
> for all Processors. This makes it hard to reason about a Topology if
> it's getting complex. Adding `Named` to the new co-group operator would
> actually align with KIP-307.
>
> > It seems to go in
> >> the opposite direction from the cogroup configuration idea you proposed.
>
> Can you elaborate? Not sure if I can follow.
>
>
>
> -Matthias
>
>
> On 10/24/19 10:20 AM, Walker Carlson wrote:
> > While I like the idea Sophie I don't think that it is necessary. I also
> > think that in this case it would be better to separate the 2 option out
> > into separate overloads.
> > We could take a named parameter from upstream or add an extra naming
> option
> > however I don't really see the advantage that would give. It seems to go
> in
> > the opposite direction from the cogroup configuration idea you proposed.
> >
> > John, I think it could be both. It depends on when you aggregate and what
> > kind of data you have. In the example it is aggregating before joining,
> > there are probably some cases where you could join before aggregating.
> IMHO
> > it would be easier to group all the streams together then perform the one
> > operation that results in a single KTable.
> >
> >
> >
> > On Wed, Oct 23, 2019 at 9:58 PM Sophie Blee-Goldman  >
> > wrote:
> >
> >>> I can personally not see any need to add other configuration
> >> Famous last words?
> >>
> >> Just kidding, 95% confidence is more than enough to  me (and better to
> >> optimize for current
> >> design than for hypothetical future changes).
> >>
> >> One last question I have then is about the operator/store/repartition
> >> naming -- seems like
> >> we can name the underlying store/changelog through the Materialized
> >> parameter, but do we
> >> also want to include an overload taking a Named parameter for the
> operator
> >> name (as in the
> >> KTable#join variations)?
> >>
> >> On Wed, Oct 23, 2019 at 5:14 PM Matthias J. Sax 
> >> wrote:
> >>
> >>> Interesting idea, Sophie.
> >>>
> >>> So far, we tried to reuse existing config objects and only add new ones
> >>> when needed to avoid creating "redundant" classes. This is of course a
> >>> reactive approach (with the drawback to deprecate stuff if we change
> it,
> >>> as you described).
> >>>
> >>> I can personally not see any need to add other configuration parameters
> >>> atm, so it's a 95% obvious "no" IMHO. The final `aggregate()` has only
> a
> >>> single state store that we need to configure, and reusing
> `Materialized`
> >>> seems to be appropriate.
> >>>
> >>> Also note, that the `Initializer` is a mandatory parameter and not a
> >>> configuration and should be passed directly, and not via a
> configuration
> >>> object.
> >>>
> >>>
> >>> -Matthias
> >>>
> >>> On 10/23/19 11:37 AM, Sophie Blee-Goldman wrote:
>  Thanks for the explanation, makes sense to me! As for the API, one
> >> other
>  thought I had is might we ever want or need to introduce any other
> >>> configs
>  or parameters in the future? Obviously that's difficult to say now (or
>  maybe the
>  answer seems obviously "no") but we seem to often end up needing to
> add
> >>> new
>  overloads and/or deprecate old ones as new features or requirements
> >> come
>  into
>  play.
> 
>  What do you (and others?) think about wrapping the config parameters
> >> (ie
>  everything
>  except the actual grouped streams) in a new config object? For
> example,
> >>> the
>  CogroupedStream#aggregate field could take a single Cogrouped object,
> 

Re: [DISCUSS] Apache Kafka 2.4.0 release

2019-10-25 Thread Manikumar
Hi all,

Quick update on the 2.4 release. We still have one blocker to close.
I will create the first RC after closing the blocker.

https://issues.apache.org/jira/browse/KAFKA-8972

Thank you!

On Fri, Oct 18, 2019 at 12:51 AM Matthias J. Sax 
wrote:

> Just FYI:
>
> There was also https://issues.apache.org/jira/browse/KAFKA-9058 that I
> just merged.
>
>
> -Matthias
>
> On 10/17/19 7:59 AM, Manikumar wrote:
> > Hi all,
> >
> > The code freeze deadline has now passed and at this point only blockers
> > will be allowed.
> > We have three blockers for 2.4.0. I will move out most of the JIRAs that
> > aren't currently
> > being worked on. If you think any of the other JIRAs are critical to
> > include in 2.4.0,
> > please update the fix version, mark as blocker and ensure a PR is ready
> to
> > merge.
> > I will create the first RC as soon as we close the blockers.
> > Please help to close out the 2.4.0 JIRAs.
> >
> > current blockers:
> > https://issues.apache.org/jira/browse/KAFKA-8943
> > https://issues.apache.org/jira/browse/KAFKA-8992
> > https://issues.apache.org/jira/browse/KAFKA-8972
> >
> > Thank you!
> >
> > On Tue, Oct 8, 2019 at 8:27 PM Manikumar 
> wrote:
> >
> >> Thanks Bruno. We will mark KIP-471 as complete.
> >>
> >> On Tue, Oct 8, 2019 at 2:39 PM Bruno Cadonna 
> wrote:
> >>
> >>> Hi Manikumar,
> >>>
> >>> It is technically true that KIP-471 is not completed, but the only
> >>> aspect that is not there are merely two metrics that I could not add
> >>> due to the RocksDB version currently used in Streams. Adding those two
> >>> metrics once the RocksDB version will have been increased, will be a
> >>> minor effort. So, I would consider KIP-471 as complete with those two
> >>> metrics blocked.
> >>>
> >>> Best,
> >>> Bruno
> >>>
> >>> On Mon, Oct 7, 2019 at 8:44 PM Manikumar 
> >>> wrote:
> 
>  Hi all,
> 
>  I have moved couple of accepted KIPs without a PR to the next release.
> >>> We
>  still have quite a few KIPs
>  with PRs that are being reviewed, but haven't yet been merged. I have
> >>> left
>  all of these in assuming these
>  PRs are ready and not risky to merge.  Please update your assigned
>  KIPs/JIRAs, if they are not ready and
>   if you know they cannot make it to 2.4.0.
> 
>  Please ensure that all KIPs for 2.4.0 have been merged by Oct 16th.
> Any
>  remaining KIPs
>  will be moved to the next release.
> 
>  The KIPs still in progress are:
> 
>  - KIP-517: Add consumer metrics to observe user poll behavior
>   <
> 
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-517%3A+Add+consumer+metrics+to+observe+user+poll+behavior
> >
> 
>  - KIP-511: Collect and Expose Client's Name and Version in the Brokers
>   <
> 
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-511%3A+Collect+and+Expose+Client%27s+Name+and+Version+in+the+Brokers
> >
> 
>  - KIP-474: To deprecate WindowStore#put(key, value)
>   
> 
>  - KIP-471: Expose RocksDB Metrics in Kafka Streams
>   <
> 
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-471%3A+Expose+RocksDB+Metrics+in+Kafka+Streams
> >
> 
>  - KIP-466: Add support for List serialization and deserialization
>   <
> 
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-466%3A+Add+support+for+List
>  +serialization+and+deserialization>
> 
>  - KIP-455: Create an Administrative API for Replica Reassignment
>   <
> 
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-455%3A+Create+an+Administrative+API+for+Replica+Reassignment
> >
> 
>  - KIP-446: Add changelog topic configuration to KTable suppress
>   <
> 
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-446%3A+Add+changelog+topic+configuration+to+KTable+suppress
> >
> 
>  - KIP-444: Augment metrics for Kafka Streams
>   <
> 
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-444%3A+Augment+metrics+for+Kafka+Streams
> >
> 
>  - KIP-434: Add Replica Fetcher and Log Cleaner Count Metrics
>   <
> 
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-434%3A+Add+Replica+Fetcher+and+Log+Cleaner+Count+Metrics
> >
> 
>  - KIP-401: TransformerSupplier/ProcessorSupplier StateStore connecting
>   <
> >>>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756
> 
> 
>  - KIP-396: Add Reset/List Offsets Operations to AdminClient
>    <
> >>>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97551484
> >
> 
>  - KIP-221: Enhance DSL with Connecting Topic Creation and Repartition
> >>> Hint
>   <
> 
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint
> >
> 
> 
>  Thanks,
> 

Build failed in Jenkins: kafka-trunk-jdk8 #3998

2019-10-25 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-9089; Reassignment should be resilient to unexpected errors

[github] KAFKA-8729: Add upgrade docs for KIP-467 on augmented produce response

[wangguoz] KAFKA-8972: Need to flush state even on unclean close (#7589)


--
[...truncated 8.11 MB...]

org.apache.kafka.connect.storage.KafkaOffsetBackingStoreTest > testStartStop 
PASSED

org.apache.kafka.connect.storage.KafkaOffsetBackingStoreTest > 
testReloadOnStart STARTED

org.apache.kafka.connect.storage.KafkaOffsetBackingStoreTest > 
testReloadOnStart PASSED

org.apache.kafka.connect.storage.KafkaOffsetBackingStoreTest > testGetSet 
STARTED

org.apache.kafka.connect.storage.KafkaOffsetBackingStoreTest > testGetSet PASSED

org.apache.kafka.connect.storage.KafkaOffsetBackingStoreTest > testGetSetNull 
STARTED

org.apache.kafka.connect.storage.KafkaOffsetBackingStoreTest > testGetSetNull 
PASSED

org.apache.kafka.connect.storage.KafkaOffsetBackingStoreTest > testSetFailure 
STARTED

org.apache.kafka.connect.storage.KafkaOffsetBackingStoreTest > testSetFailure 
PASSED

org.apache.kafka.connect.storage.KafkaStatusBackingStoreTest > readTaskState 
STARTED

org.apache.kafka.connect.storage.KafkaStatusBackingStoreTest > readTaskState 
PASSED

org.apache.kafka.connect.storage.KafkaStatusBackingStoreTest > putTaskState 
STARTED

org.apache.kafka.connect.storage.KafkaStatusBackingStoreTest > putTaskState 
PASSED

org.apache.kafka.connect.storage.KafkaStatusBackingStoreTest > 
putSafeWithNoPreviousValueIsPropagated STARTED

org.apache.kafka.connect.storage.KafkaStatusBackingStoreTest > 
putSafeWithNoPreviousValueIsPropagated PASSED

org.apache.kafka.connect.storage.KafkaStatusBackingStoreTest > 
putConnectorStateNonRetriableFailure STARTED

org.apache.kafka.connect.storage.KafkaStatusBackingStoreTest > 
putConnectorStateNonRetriableFailure PASSED

org.apache.kafka.connect.storage.KafkaStatusBackingStoreTest > 
putConnectorStateShouldOverride STARTED

org.apache.kafka.connect.storage.KafkaStatusBackingStoreTest > 
putConnectorStateShouldOverride PASSED

org.apache.kafka.connect.storage.KafkaStatusBackingStoreTest > 
putConnectorStateRetriableFailure STARTED

org.apache.kafka.connect.storage.KafkaStatusBackingStoreTest > 
putConnectorStateRetriableFailure PASSED

org.apache.kafka.connect.storage.KafkaStatusBackingStoreTest > 
putSafeOverridesValueSetBySameWorker STARTED

org.apache.kafka.connect.storage.KafkaStatusBackingStoreTest > 
putSafeOverridesValueSetBySameWorker PASSED

org.apache.kafka.connect.storage.KafkaStatusBackingStoreTest > 
readConnectorState STARTED

org.apache.kafka.connect.storage.KafkaStatusBackingStoreTest > 
readConnectorState PASSED

org.apache.kafka.connect.storage.KafkaStatusBackingStoreTest > 
putSafeConnectorIgnoresStaleStatus STARTED

org.apache.kafka.connect.storage.KafkaStatusBackingStoreTest > 
putSafeConnectorIgnoresStaleStatus PASSED

org.apache.kafka.connect.storage.KafkaStatusBackingStoreTest > 
putConnectorState STARTED

org.apache.kafka.connect.storage.KafkaStatusBackingStoreTest > 
putConnectorState PASSED

org.apache.kafka.connect.storage.MemoryStatusBackingStoreTest > 
putAndGetConnectorStatus STARTED

org.apache.kafka.connect.storage.MemoryStatusBackingStoreTest > 
putAndGetConnectorStatus PASSED

org.apache.kafka.connect.storage.MemoryStatusBackingStoreTest > 
putAndGetTaskStatus STARTED

org.apache.kafka.connect.storage.MemoryStatusBackingStoreTest > 
putAndGetTaskStatus PASSED

org.apache.kafka.connect.storage.MemoryStatusBackingStoreTest > 
deleteTaskStatus STARTED

org.apache.kafka.connect.storage.MemoryStatusBackingStoreTest > 
deleteTaskStatus PASSED

org.apache.kafka.connect.storage.MemoryStatusBackingStoreTest > 
deleteConnectorStatus STARTED

org.apache.kafka.connect.storage.MemoryStatusBackingStoreTest > 
deleteConnectorStatus PASSED

org.apache.kafka.connect.storage.OffsetStorageWriterTest > testWriteFlush 
STARTED

org.apache.kafka.connect.storage.OffsetStorageWriterTest > testWriteFlush PASSED

org.apache.kafka.connect.storage.OffsetStorageWriterTest > 
testWriteNullValueFlush STARTED

org.apache.kafka.connect.storage.OffsetStorageWriterTest > 
testWriteNullValueFlush PASSED

org.apache.kafka.connect.storage.OffsetStorageWriterTest > 
testWriteNullKeyFlush STARTED

org.apache.kafka.connect.storage.OffsetStorageWriterTest > 
testWriteNullKeyFlush PASSED

org.apache.kafka.connect.storage.OffsetStorageWriterTest > testNoOffsetsToFlush 
STARTED

org.apache.kafka.connect.storage.OffsetStorageWriterTest > testNoOffsetsToFlush 
PASSED

org.apache.kafka.connect.storage.OffsetStorageWriterTest > 
testFlushFailureReplacesOffsets STARTED

org.apache.kafka.connect.storage.OffsetStorageWriterTest > 
testFlushFailureReplacesOffsets PASSED

org.apache.kafka.connect.storage.OffsetStorageWriterTest > testAlreadyFlushing 
STARTED


Subject: [VOTE] 2.2.2 RC2

2019-10-25 Thread Randall Hauch
Hello all, we identified around three dozen bug fixes, including an update
of a third party dependency, and wanted to release a patch release for the
Apache Kafka 2.2.0 release.

This is the *second* candidate for release of Apache Kafka 2.2.2. (RC1 did
not include a fix for https://issues.apache.org/jira/browse/KAFKA-9053, but
the fix appeared before RC1 was announced so it was easier to just create
RC2.)

Check out the release notes for a complete list of the changes in this
release candidate:
https://home.apache.org/~rhauch/kafka-2.2.2-rc2/RELEASE_NOTES.html

*** Please download, test and vote by Wednesday, October 30, 9am PT>

Kafka's KEYS file containing PGP keys we use to sign the release:
https://kafka.apache.org/KEYS

* Release artifacts to be voted upon (source and binary):
https://home.apache.org/~rhauch/kafka-2.2.2-rc2/

* Maven artifacts to be voted upon:
https://repository.apache.org/content/groups/staging/org/apache/kafka/

* Javadoc:
https://home.apache.org/~rhauch/kafka-2.2.2-rc2/javadoc/

* Tag to be voted upon (off 2.2 branch) is the 2.2.2 tag:
https://github.com/apache/kafka/releases/tag/2.2.2-rc2

* Documentation:
https://kafka.apache.org/22/documentation.html

* Protocol:
https://kafka.apache.org/22/protocol.html

* Successful Jenkins builds for the 2.2 branch:
Unit/integration tests: https://builds.apache.org/job/kafka-2.2-jdk8/1/
System tests:
https://jenkins.confluent.io/job/system-test-kafka/job/2.2/216/

/**

Thanks,

Randall Hauch


Build failed in Jenkins: kafka-trunk-jdk11 #913

2019-10-25 Thread Apache Jenkins Server
See 


Changes:

[jason] KAFKA-8992; Redefine RemoveMembersFromGroup interface on AdminClient 


--
[...truncated 2.71 MB...]
org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfTimestampIsDifferentForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfTimestampIsDifferentForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualWithNullForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualWithNullForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 

Re: [VOTE] KIP-541: Create a fetch.max.bytes configuration for the broker

2019-10-25 Thread David Arthur
+1 binding, this will be a nice improvement. Thanks, Colin!

-David

On Fri, Oct 25, 2019 at 4:33 AM Tom Bentley  wrote:

> +1 nb. Thanks!
>
> On Fri, Oct 25, 2019 at 7:43 AM Ismael Juma  wrote:
>
> > +1 (binding)
> >
> > On Thu, Oct 24, 2019, 4:56 PM Colin McCabe  wrote:
> >
> > > Hi all,
> > >
> > > I'd like to start the vote on KIP-541: Create a fetch.max.bytes
> > > configuration for the broker.
> > >
> > > KIP: https://cwiki.apache.org/confluence/x/4g73Bw
> > >
> > > Discussion thread:
> > >
> >
> https://lists.apache.org/thread.html/9d9dde93a07e1f1fc8d9f182f94f4bda9d016c5e9f3c8541cdc6f53b@%3Cdev.kafka.apache.org%3E
> > >
> > > cheers,
> > > Colin
> > >
> >
>


-- 
David Arthur


Build failed in Jenkins: kafka-2.4-jdk8 #39

2019-10-25 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] KAFKA-8729: Add upgrade docs for KIP-467 on augmented produce 
response

[jason] KAFKA-9089; Reassignment should be resilient to unexpected errors

[wangguoz] KAFKA-8972: Need to flush state even on unclean close (#7589)


--
[...truncated 5.08 MB...]
kafka.server.KafkaConfigTest > testLogRetentionTimeMsProvided STARTED

kafka.server.KafkaConfigTest > testLogRetentionTimeMsProvided PASSED

kafka.server.KafkaConfigTest > 
testInterBrokerListenerNameMissingFromListenerSecurityProtocolMap STARTED

kafka.server.KafkaConfigTest > 
testInterBrokerListenerNameMissingFromListenerSecurityProtocolMap PASSED

kafka.server.KafkaConfigTest > testMaxConnectionsPerIpProp STARTED

kafka.server.KafkaConfigTest > testMaxConnectionsPerIpProp PASSED

kafka.server.KafkaConfigTest > testLogRollTimeNoConfigProvided STARTED

kafka.server.KafkaConfigTest > testLogRollTimeNoConfigProvided PASSED

kafka.server.KafkaConfigTest > testInvalidInterBrokerSecurityProtocol STARTED

kafka.server.KafkaConfigTest > testInvalidInterBrokerSecurityProtocol PASSED

kafka.server.KafkaConfigTest > testAdvertiseDefaults STARTED

kafka.server.KafkaConfigTest > testAdvertiseDefaults PASSED

kafka.server.KafkaConfigTest > testBadListenerProtocol STARTED

kafka.server.KafkaConfigTest > testBadListenerProtocol PASSED

kafka.server.KafkaConfigTest > testListenerDefaults STARTED

kafka.server.KafkaConfigTest > testListenerDefaults PASSED

kafka.server.KafkaConfigTest > testLogRetentionTimeBothMinutesAndHoursProvided 
STARTED

kafka.server.KafkaConfigTest > testLogRetentionTimeBothMinutesAndHoursProvided 
PASSED

kafka.server.KafkaConfigTest > testUncleanElectionDisabled STARTED

kafka.server.KafkaConfigTest > testUncleanElectionDisabled PASSED

kafka.server.KafkaConfigTest > 
testListenerNameMissingFromListenerSecurityProtocolMap STARTED

kafka.server.KafkaConfigTest > 
testListenerNameMissingFromListenerSecurityProtocolMap PASSED

kafka.server.KafkaConfigTest > testLogRetentionTimeNoConfigProvided STARTED

kafka.server.KafkaConfigTest > testLogRetentionTimeNoConfigProvided PASSED

kafka.server.KafkaConfigTest > testCaseInsensitiveListenerProtocol STARTED

kafka.server.KafkaConfigTest > testCaseInsensitiveListenerProtocol PASSED

kafka.server.KafkaConfigTest > testListenerAndAdvertisedListenerNames STARTED

kafka.server.KafkaConfigTest > testListenerAndAdvertisedListenerNames PASSED

kafka.server.KafkaConfigTest > testNonroutableAdvertisedListeners STARTED

kafka.server.KafkaConfigTest > testNonroutableAdvertisedListeners PASSED

kafka.server.KafkaConfigTest > 
testInterBrokerListenerNameAndSecurityProtocolSet STARTED

kafka.server.KafkaConfigTest > 
testInterBrokerListenerNameAndSecurityProtocolSet PASSED

kafka.server.KafkaConfigTest > testFromPropsInvalid STARTED

kafka.server.KafkaConfigTest > testFromPropsInvalid PASSED

kafka.server.KafkaConfigTest > testInvalidCompressionType STARTED

kafka.server.KafkaConfigTest > testInvalidCompressionType PASSED

kafka.server.KafkaConfigTest > testAdvertiseHostNameDefault STARTED

kafka.server.KafkaConfigTest > testAdvertiseHostNameDefault PASSED

kafka.server.KafkaConfigTest > testLogRetentionTimeMinutesProvided STARTED

kafka.server.KafkaConfigTest > testLogRetentionTimeMinutesProvided PASSED

kafka.server.KafkaConfigTest > testValidCompressionType STARTED

kafka.server.KafkaConfigTest > testValidCompressionType PASSED

kafka.server.KafkaConfigTest > testUncleanElectionInvalid STARTED

kafka.server.KafkaConfigTest > testUncleanElectionInvalid PASSED

kafka.server.KafkaConfigTest > testListenerNamesWithAdvertisedListenerUnset 
STARTED

kafka.server.KafkaConfigTest > testListenerNamesWithAdvertisedListenerUnset 
PASSED

kafka.server.KafkaConfigTest > testLogRetentionTimeBothMinutesAndMsProvided 
STARTED

kafka.server.KafkaConfigTest > testLogRetentionTimeBothMinutesAndMsProvided 
PASSED

kafka.server.KafkaConfigTest > testLogRollTimeMsProvided STARTED

kafka.server.KafkaConfigTest > testLogRollTimeMsProvided PASSED

kafka.server.KafkaConfigTest > testUncleanLeaderElectionDefault STARTED

kafka.server.KafkaConfigTest > testUncleanLeaderElectionDefault PASSED

kafka.server.KafkaConfigTest > testInvalidAdvertisedListenersProtocol STARTED

kafka.server.KafkaConfigTest > testInvalidAdvertisedListenersProtocol PASSED

kafka.server.KafkaConfigTest > testUncleanElectionEnabled STARTED

kafka.server.KafkaConfigTest > testUncleanElectionEnabled PASSED

kafka.server.KafkaConfigTest > testInterBrokerVersionMessageFormatCompatibility 
STARTED

kafka.server.KafkaConfigTest > testInterBrokerVersionMessageFormatCompatibility 
PASSED

kafka.server.KafkaConfigTest > testAdvertisePortDefault STARTED

kafka.server.KafkaConfigTest > testAdvertisePortDefault PASSED

kafka.server.KafkaConfigTest > testVersionConfiguration STARTED

kafka.server.KafkaConfigTest > testVersionConfiguration 

Re: [VOTE] KIP-373: Allow users to create delegation tokens for other users

2019-10-25 Thread Viktor Somogyi-Vass
Hi All,

Would like to bump this in the hope of one binding vote (or any additional
feedback).

Thanks,
Viktor

On Wed, Sep 18, 2019 at 5:25 PM Viktor Somogyi-Vass 
wrote:

> Hi All,
>
> Harsha, Ryanne: thanks for the vote!
>
> I'd like to bump this again as today is the KIP freeze date and there is
> still one binding vote needed which I'm hoping to get in order to have this
> included in 2.4.
>
> Thanks,
> Viktor
>
> On Tue, Sep 17, 2019 at 1:18 AM Ryanne Dolan 
> wrote:
>
>> +1 non-binding
>>
>> Ryanne
>>
>> On Mon, Sep 16, 2019, 5:11 PM Harsha Ch  wrote:
>>
>> > +1 (binding). Thanks for the KIP Viktor
>> >
>> > Thanks,
>> >
>> > Harsha
>> >
>> > On Mon, Sep 16, 2019 at 3:02 AM, Viktor Somogyi-Vass <
>> > viktorsomo...@gmail.com > wrote:
>> >
>> > >
>> > >
>> > >
>> > > Hi All,
>> > >
>> > >
>> > >
>> > > I'd like to bump this again in order to get some more binding votes
>> > and/or
>> > > feedback in the hope we can push this in for 2.4.
>> > >
>> > >
>> > >
>> > > Thank you Manikumar, Gabor and Ryanne so far for the votes! (the last
>> two
>> > > were on the discussion thread after starting the vote but I think it
>> > still
>> > > counts :) )
>> > >
>> > >
>> > >
>> > > Thanks,
>> > > Viktor
>> > >
>> > >
>> > >
>> > > On Wed, Aug 21, 2019 at 1:44 PM Manikumar < manikumar. reddy@ gmail.
>> > com (
>> > > manikumar.re...@gmail.com ) > wrote:
>> > >
>> > >
>> > >>
>> > >>
>> > >> Hi,
>> > >>
>> > >>
>> > >>
>> > >> +1 (binding).
>> > >>
>> > >>
>> > >>
>> > >> Thanks for the updated KIP. LGTM.
>> > >>
>> > >>
>> > >>
>> > >> Thanks,
>> > >> Manikumar
>> > >>
>> > >>
>> > >>
>> > >> On Tue, Aug 6, 2019 at 3:14 PM Viktor Somogyi-Vass < viktorsomogyi@
>> > gmail.
>> > >> com ( viktorsomo...@gmail.com ) >
>> > >> wrote:
>> > >>
>> > >>
>> > >>>
>> > >>>
>> > >>> Hi All,
>> > >>>
>> > >>>
>> > >>>
>> > >>> Bumping this, I'd be happy to get some additional feedback and/or
>> > votes.
>> > >>>
>> > >>>
>> > >>>
>> > >>> Thanks,
>> > >>> Viktor
>> > >>>
>> > >>>
>> > >>>
>> > >>> On Wed, Jul 31, 2019 at 11:04 AM Viktor Somogyi-Vass <
>> viktorsomogyi@
>> > gmail.
>> > >>> com ( viktorsomo...@gmail.com ) > wrote:
>> > >>>
>> > >>>
>> > 
>> > 
>> >  Hi All,
>> > 
>> > 
>> > 
>> >  I'd like to start a vote on this KIP.
>> > 
>> > 
>> > >>>
>> > >>>
>> > >>
>> > >>
>> > >>
>> > >> https:/ / cwiki. apache. org/ confluence/ display/ KAFKA/
>> > KIP-373%3A+Allow+users+to+create+delegation+tokens+for+other+users
>> > >> (
>> > >>
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-373%3A+Allow+users+to+create+delegation+tokens+for+other+users
>> > >> )
>> > >>
>> > >>
>> > >>>
>> > 
>> > 
>> >  To summarize it: the proposed feature would allow users (usually
>> >  superusers) to create delegation tokens for other users. This is
>> > 
>> > 
>> > >>>
>> > >>>
>> > >>>
>> > >>> especially
>> > >>>
>> > >>>
>> > 
>> > 
>> >  helpful in Spark where the delegation token created this way can be
>> >  distributed to workers.
>> > 
>> > 
>> > 
>> >  I'd be happy to receive any votes or additional feedback.
>> > 
>> > 
>> > 
>> >  Viktor
>> > 
>> > 
>> > >>>
>> > >>>
>> > >>
>> > >>
>> > >
>> > >
>> > >
>>
>


Build failed in Jenkins: kafka-trunk-jdk11 #912

2019-10-25 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-9089; Reassignment should be resilient to unexpected errors

[github] KAFKA-8729: Add upgrade docs for KIP-467 on augmented produce response

[wangguoz] KAFKA-8972: Need to flush state even on unclean close (#7589)

[jason] KAFKA-9038; Allow creating partitions while a reassignment is in


--
[...truncated 2.71 MB...]
org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfTimestampIsDifferentForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualWithNullForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualWithNullForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-25 Thread Navinder Brar
I think now we are aligned on almost all the design parts. Summarising below 
what has been discussed above and we have a general consensus on.

   
   - Rather than broadcasting lag across all nodes at rebalancing/with the 
heartbeat, we will just return a list of all available standby’s in the system 
and the user can make IQ query any of those nodes which will return the 
response, and the lag and offset time. Based on which user can decide if he 
wants to return the response back or call another standby.
   -  The current metadata query frequency will not change. It will be the same 
as it does now, i.e. before each query.   

   -  For fetching list in StreamsMetadataState.java and 
List in StreamThreadStateStoreProvider.java (which will 
return all active stores which are running/restoring and replica stores which 
are running), we will add new functions and not disturb the existing functions  
 

   - There is no need to add new StreamsConfig for implementing this KIP   

   - We will add standbyPartitionsByHost in AssignmentInfo and 
StreamsMetadataState which would change the existing rebuildMetadata() and 
setPartitionsByHostState()   



If anyone has any more concerns please feel free to add. Post this I will be 
initiating a vote.
~Navinder

On Friday, 25 October, 2019, 12:05:29 pm IST, Matthias J. Sax 
 wrote:  
 
 Just to close the loop @Vinoth:

> 1. IIUC John intends to add (or we can do this in this KIP) lag information
> to AssignmentInfo, which gets sent to every participant.

As explained by John, currently KIP-441 plans to only report the
information to the leader. But I guess, with the new proposal to not
broadcast this information anyway, this concern is invalidated anyway

> 2. At-least I was under the assumption that it can be called per query,
> since the API docs don't seem to suggest otherwise. Do you see any
> potential issues if we call this every query? (we should benchmark this
> nonetheless)

I did not see a real issue if people refresh the metadata frequently,
because it would be a local call. My main point was, that this would
change the current usage pattern of the API, and we would clearly need
to communicate this change. Similar to (1), this concern in invalidated
anyway.


@John: I think it's a great idea to get rid of reporting lag, and
pushing the decision making process about "what to query" into the query
serving layer itself. This simplifies the overall design of this KIP
significantly, and actually aligns very well with the idea that Kafka
Streams (as it is a library) should only provide the basic building
block. Many of my raised questions are invalided by this.



Some questions are still open though:

> 10) Do we need to distinguish between active(restoring) and standby
> tasks? Or could be treat both as the same?


@Vinoth: about (5). I see your point about multiple calls vs a single
call. I still slightly prefer multiple calls, but it's highly subjective
and I would also be fine to add an #isActive() method. Would be good the
get feedback from others.


For (14), ie, lag in offsets vs time: Having both, as suggested by
Sophie would of course be best. What is a little unclear to me is, how
in details are we going to compute both?



-Matthias



On 10/24/19 11:07 PM, Sophie Blee-Goldman wrote:
> Just to chime in on the "report lag vs timestamp difference" issue, I would
> actually advocate for both. As mentioned already, time difference is
> probably a lot easier and/or more useful to reason about in terms of
> "freshness"
> of the state. But in the case when all queried stores are far behind, lag
> could
> be used to estimate the recovery velocity. You can then get a (pretty rough)
> idea of when a store might be ready, and wait until around then to query
> again.
> 
> On Thu, Oct 24, 2019 at 9:53 PM Guozhang Wang  wrote:
> 
>> I think I agree with John's recent reasoning as well: instead of letting
>> the storeMetadataAPI to return the staleness information, letting the
>> client to query either active or standby and letting standby query response
>> to include both the values + timestamp (or lag, as in diffs of timestamps)
>> would actually be more intuitive -- not only the streams client is simpler,
>> from user's perspective they also do not need to periodically refresh their
>> staleness information from the client, but only need to make decisions on
>> the fly whenever they need to query.
>>
>> Again the standby replica then need to know the current active task's
>> timestamp, which can be found from the log end record's encoded timestamp;
>> today we standby tasks do not read that specific record, but only refresh
>> its knowledge on the log end offset, but I think refreshing the latest
>> record timestamp is not a very bad request to add on the standby replicas.
>>
>>
>> Guozhang
>>
>>
>> On Thu, Oct 24, 2019 at 5:43 PM Vinoth Chandar 
>> wrote:
>>
>>> +1 As someone implementing a query routing layer, there is already a need
>>> to have 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-25 Thread Navinder Brar
 I think now we are aligned on almost all the design parts. Summarising below 
what has been discussed above and we have a general consensus on.
1. Rather than broadcasting lag across all nodes at rebalancing/with the 
heartbeat, we will just return a list of all available standby’s in the system 
and the user can make IQ query any of those nodes which will return the 
response, and the lag and offset time. Based on which user can decide if he 
wants to return the response back or call another standby.2. The current 
metadata query frequency will not change. It will be the same as it does now, 
i.e. before each query.3. For fetching list in 
StreamsMetadataState.java and List in 
StreamThreadStateStoreProvider.java (which will return all active stores which 
are running/restoring and replica stores which are running), we will add new 
functions and not disturb the existing functions4. There is no need to add new 
StreamsConfig for implementing this KIP5. We will add standbyPartitionsByHost 
in AssignmentInfo and StreamsMetadataState which would change the existing 
rebuildMetadata() and setPartitionsByHostState()

If anyone has any more concerns please feel free to add. Post this I will be 
initiating a vote.
~Navinder
On Friday, 25 October, 2019, 12:05:29 pm IST, Matthias J. Sax 
 wrote:  
 
 Just to close the loop @Vinoth:

> 1. IIUC John intends to add (or we can do this in this KIP) lag information
> to AssignmentInfo, which gets sent to every participant.

As explained by John, currently KIP-441 plans to only report the
information to the leader. But I guess, with the new proposal to not
broadcast this information anyway, this concern is invalidated anyway

> 2. At-least I was under the assumption that it can be called per query,
> since the API docs don't seem to suggest otherwise. Do you see any
> potential issues if we call this every query? (we should benchmark this
> nonetheless)

I did not see a real issue if people refresh the metadata frequently,
because it would be a local call. My main point was, that this would
change the current usage pattern of the API, and we would clearly need
to communicate this change. Similar to (1), this concern in invalidated
anyway.


@John: I think it's a great idea to get rid of reporting lag, and
pushing the decision making process about "what to query" into the query
serving layer itself. This simplifies the overall design of this KIP
significantly, and actually aligns very well with the idea that Kafka
Streams (as it is a library) should only provide the basic building
block. Many of my raised questions are invalided by this.



Some questions are still open though:

> 10) Do we need to distinguish between active(restoring) and standby
> tasks? Or could be treat both as the same?


@Vinoth: about (5). I see your point about multiple calls vs a single
call. I still slightly prefer multiple calls, but it's highly subjective
and I would also be fine to add an #isActive() method. Would be good the
get feedback from others.


For (14), ie, lag in offsets vs time: Having both, as suggested by
Sophie would of course be best. What is a little unclear to me is, how
in details are we going to compute both?



-Matthias



On 10/24/19 11:07 PM, Sophie Blee-Goldman wrote:
> Just to chime in on the "report lag vs timestamp difference" issue, I would
> actually advocate for both. As mentioned already, time difference is
> probably a lot easier and/or more useful to reason about in terms of
> "freshness"
> of the state. But in the case when all queried stores are far behind, lag
> could
> be used to estimate the recovery velocity. You can then get a (pretty rough)
> idea of when a store might be ready, and wait until around then to query
> again.
> 
> On Thu, Oct 24, 2019 at 9:53 PM Guozhang Wang  wrote:
> 
>> I think I agree with John's recent reasoning as well: instead of letting
>> the storeMetadataAPI to return the staleness information, letting the
>> client to query either active or standby and letting standby query response
>> to include both the values + timestamp (or lag, as in diffs of timestamps)
>> would actually be more intuitive -- not only the streams client is simpler,
>> from user's perspective they also do not need to periodically refresh their
>> staleness information from the client, but only need to make decisions on
>> the fly whenever they need to query.
>>
>> Again the standby replica then need to know the current active task's
>> timestamp, which can be found from the log end record's encoded timestamp;
>> today we standby tasks do not read that specific record, but only refresh
>> its knowledge on the log end offset, but I think refreshing the latest
>> record timestamp is not a very bad request to add on the standby replicas.
>>
>>
>> Guozhang
>>
>>
>> On Thu, Oct 24, 2019 at 5:43 PM Vinoth Chandar 
>> wrote:
>>
>>> +1 As someone implementing a query routing layer, there is already a need
>>> to have mechanisms in place to do 

Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2019-10-25 Thread Satish Duggana
>Do you want to
discount supporting compacted topics this early in the KIP design or
do you want to leave open the option of supporting them eventually?

We are not targeting compact topics in this KIP. This KIP's goal is to
address non-compact topics with tiered storage work well. Generally,
compact topics do not consume large disk unlike non-compact topics do. So,
this is not a high priority for now. We plan to address it in the future.

>- why do we need per topic remote retention time and bytes? Why isn't
per topic retention time and bytes (without the "remote" part)
sufficient? E.g., if I have a topic and I want retention bytes to be
1TB, and I currently have 500GB local and 500GB remote, Kafka can
manage what segments get deleted first. This would avoid the user
needing to think even more about these extra configs.

We want to give flexibility to user about how much data they want to serve
form local storage based on their SLAs. This varies from org to org based
on their usecases.

Thanks,
Satish.


On Thu, Oct 24, 2019 at 7:43 PM Eno Thereska  wrote:

> Going back to initial thread with general questions on KIP. I think
> aspects of the user experience still need clarification:
>
> - if a user has a mix of compacted and non-compacted topics it will be
> hard to reason about storage needs overall. Could you give a reason
> why compacted topics are not supported? This is probably because to do
> that you'd have to go with a paging approach (like Ryanne earlier
> suggested) and that will be expensive in terms of IO. Do you want to
> discount supporting compacted topics this early in the KIP design or
> do you want to leave open the option of supporting them eventually? In
> an ideal system, Kafka figures out if the topic is compacted or not
> and for non-compacted topics it doesn't do the local copy so it goes
> through a fast path.
>
> - why do we need per topic remote retention time and bytes? Why isn't
> per topic retention time and bytes (without the "remote" part)
> sufficient? E.g., if I have a topic and I want retention bytes to be
> 1TB, and I currently have 500GB local and 500GB remote, Kafka can
> manage what segments get deleted first. This would avoid the user
> needing to think even more about these extra configs.
>
> Thanks
> Eno
>
>
> On Mon, Oct 21, 2019 at 4:46 PM Harsha  wrote:
> >
> > Hi All,
> >   Thanks for the initial feedback on the KIP-405.  We opened a
> PR here https://github.com/apache/kafka/pull/7561 .
> > Please take a look and let us know if you have any questions.
> > Since this feature is being developed by engineers from different
> companies we would like to open a feature branch in apache kafka git. It
> will allow us collaborate in open source community rather than in private
> branches. Please let me know if you have any objections to opening a
> feature branch in kafka's git repo.
> >
> > Thanks,
> > Harsha
> >
> > On Mon, Apr 8, 2019, at 10:04 PM, Harsha wrote:
> > > Thanks, Ron. Updating the KIP. will add answers here as well
> > >
> > >  1) If the cold storage technology can be cross-region, is there a
> > >  possibility for a disaster recovery Kafka cluster to share the
> messages in
> > >  cold storage?  My guess is the answer is no, and messages replicated
> to the
> > >  D/R cluster have to be migrated to cold storage from there
> independently.
> > >  (The same cross-region cold storage medium could be used, but every
> message
> > >  would appear there twice).
> > >
> > > If I understand the question correctly, what you are saying is Kafka A
> > > cluster (active) shipping logs to remote storage which cross-region
> > > replication and another Kafka Cluster B (Passive) will it be able to
> > > use the remote storage copied logs directly.
> > > For the initial version my answer is No. We can handle this in
> > > subsequent changes after this one.
> > >
> > >  2) Can/should external (non-Kafka) tools have direct access to the
> messages
> > >  in cold storage.  I think this might have been addressed when someone
> asked
> > >  about ACLs, and I believe the answer is "no" -- if some external tool
> needs
> > >  to operate on that data then that external tool should read that data
> by
> > > acting as a Kafka consumer.  Again, just asking to get the answer
> clearly
> > > documented in case it is unclear.
> > >
> > > The answer is No. All tools/clients must go through broker APIs to
> > > access any data (local or remote).
> > > Only Kafka broker user will have access to remote storage logs and
> > > Security/ACLs will work the way it does today.
> > > Tools/Clients going directly to the remote storage might help in terms
> > > of efficiency but this requires Protocol changes and some way of
> > > syncing ACLs in Kafka to the Remote storage.
> > >
> > >
> > > Thanks,
> > > Harsha
> > >
> > > On Mon, Apr 8, 2019, at 8:48 AM, Ron Dagostino wrote:
> > > > Hi Harsha.  A couple of questions.  I think I know the answers, but
> it
> > > > would be good to 

Re: [VOTE] KIP-530: Consider renaming 'UsePreviousTimeOnInvalidTimeStamp' class to 'UsePartitionTimeOnInvalidTimeStamp'

2019-10-25 Thread RABI K.C.
Hi Bill,

Yes we will deprecate 'UsePreviousTimeOnInvalidTimeStamp' and then add
'UsePartitionOnInvalidTimeStamp'.

With Best Regards,
Rabi Kumar K C

On Thu, Oct 24, 2019 at 5:32 PM Guozhang Wang  wrote:

> +1 (binding).
>
>
> Guozhang
>
> On Thu, Oct 24, 2019 at 8:22 AM Bill Bejeck  wrote:
>
> > Hi Rabi,
> >
> > Thanks for the KIP.
> > Just to be clear, we're going to deprecate
> > `UsePreviousTimeOnInvalidTimeStamp`
> > and add `UsePartitionOnInvalidTimeStamp` class correct?
> >
> > Otherwise, it's a +1 (binding) from me.
> >
> > -Bill
> >
> > On Thu, Oct 24, 2019 at 8:09 AM Bruno Cadonna 
> wrote:
> >
> > > Hi Rabi,
> > >
> > > Thank you for the KIP!
> > >
> > > +1 (non-binding)
> > >
> > > Best,
> > > Bruno
> > >
> > > On Thu, Oct 24, 2019 at 8:42 AM Matthias J. Sax  >
> > > wrote:
> > > >
> > > > +1 (binding)
> > > >
> > > > Thanks for the KIP!
> > > >
> > > > -Matthias
> > > >
> > > > On 10/23/19 9:37 AM, RABI K.C. wrote:
> > > > > Hello All,
> > > > >
> > > > > As per the discussion I want to call for a vote to go forward with
> > the
> > > > > changes mentioned in KIP-530.
> > > > >
> > > > > Links:
> > > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=130028807
> > > > > https://issues.apache.org/jira/browse/KAFKA-8953
> > > > >
> > > > > With Best Regards,
> > > > > Rabi Kumar K C
> > > > >
> > > >
> > >
> >
>
>
> --
> -- Guozhang
>


Re: [VOTE] KIP-541: Create a fetch.max.bytes configuration for the broker

2019-10-25 Thread Tom Bentley
+1 nb. Thanks!

On Fri, Oct 25, 2019 at 7:43 AM Ismael Juma  wrote:

> +1 (binding)
>
> On Thu, Oct 24, 2019, 4:56 PM Colin McCabe  wrote:
>
> > Hi all,
> >
> > I'd like to start the vote on KIP-541: Create a fetch.max.bytes
> > configuration for the broker.
> >
> > KIP: https://cwiki.apache.org/confluence/x/4g73Bw
> >
> > Discussion thread:
> >
> https://lists.apache.org/thread.html/9d9dde93a07e1f1fc8d9f182f94f4bda9d016c5e9f3c8541cdc6f53b@%3Cdev.kafka.apache.org%3E
> >
> > cheers,
> > Colin
> >
>


Re: [VOTE] KIP-455: Create an Administrative API for Replica Reassignment

2019-10-25 Thread Stanislav Kozlovski
Hello all,

To best shape up KIP-455 before 2.4 releases, we had some very minor
last-minute improvements which I'd like to share for the record.

1) MINOR: Check against empty replicas in AlterPartitionReassignments (
https://github.com/apache/kafka/commit/78e7c90e90efa18b2a5b298e49154834d8d5bf67
)
Added validation against passing in a set of empty replicas (e.g []) to the
alter API. We now properly raise a InvalidReplicaAssignmentException :)

2) MINOR: ListPartitionReassignmentsResponse should not be entirely failed
when a topic-partition does not exist (
https://github.com/apache/kafka/commit/fa2a9f09e4042f821d7373e2d9e01b21aede775a
)
The List API would fail the whole request if one topic partition didn't
exist. We now simply ignore that partition in the response

3) Iterate on the NewPartitionReassignment interface
We had left the design of that interface to the implementation and did not
discuss it in the voting thread, as it is a very trivial class that should
be used only for passing in a parameter to the Alter API.
Regardless, we iterated on it in PR discussions and ended up with the
following interface, described in the KIP. (
https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=112820260=42=41
)

I've done my best to keep the KIP page up to date with the latest
information.

Thanks again to everybody who helped discuss, vote on, review and implement
this KIP. :)

On Wed, Aug 28, 2019 at 8:47 PM Colin McCabe  wrote:

> Hi all,
>
> After some discussion with Jun and Stan, we decided that we should bump
> the version of the topics znode from 1 to 2.  The bump is backwards
> compatible (older brokers can read the v2 znode).  I have updated the KIP.
>
> best,
> Colin
>
>
> On Thu, Aug 8, 2019, at 11:09, Colin McCabe wrote:
> > Hi Koushik,
> >
> > The vote for this KIP already passed.
> >
> > See https://www.mail-archive.com/dev@kafka.apache.org/msg99636.html
> >
> > best,
> > Colin
> >
> > On Thu, Aug 8, 2019, at 10:50, Koushik Chitta wrote:
> > > Thanks Colin, George.   Can we restart the voting for this KIP.
> > >
> > > Thanks,
> > > Koushik
> > >
> > > -Original Message-
> > > From: Colin McCabe 
> > > Sent: Wednesday, August 7, 2019 5:17 PM
> > > To: dev@kafka.apache.org
> > > Subject: Re: [VOTE] KIP-455: Create an Administrative API for Replica
> > > Reassignment
> > >
> > > On Wed, Aug 7, 2019, at 15:41, George Li wrote:
> > > > This email seemed to get lost in the dev email server.  Resending.
> > > >
> > > >
> > > > On Tuesday, August 6, 2019, 10:16:57 PM PDT, George Li
> > > >  wrote:
> > > >
> > > >
> > > > The pending reassignments partitions would be reported as URP (Under
> > > > Replicated Partitions).  or maybe reported as a separate metrics of
> > > > RURP (Reassignment URP) since now we can derived from the new
> > > > AddingReplicas. An alert could be triggered based on this.
> > > >
> > >
> > > Hi George,
> > >
> > > I agree that this would be a great idea for follow up work.  Check out
> > > KIP-352, which discusses creating a such a metric. :)
> > >
> > > >
> > > >
> > > > It would be nice if ListPartitionReassignmentResult could return the
> > > > "elapsed time/duration" of the current pending reassignments, the
> > > > calling client can flag those current long running reassignments and
> > > > alert.  However, what I would be interested is probably the total #
> of
> > > > pending reassignments because I will submit reassignments in
> batches,
> > > > e.g. 50 reassignments per batch.  If the pending reassignments # is
> > > > below that per batch #, submit more new reassignments = (per_batch_#
> -
> > > > pending_#).
> > > >
> > >
> > > It is definitely useful to know what reassignments exist.  If you call
> > > ListPartitionReassignments, you can count how many results you get, in
> > > order to implement a policy like that.
> > >
> > > I'm not sure if knowing how long reassignments have been in progress
> > > will be important or not.  I think we should give people some time to
> > > try out the new APIs and see what could be improved based on their
> > > experience.
> > >
> > > >
> > > >
> > > > It seems currently, the ReplicaFetcher threads could quite easily
> crash
> > > > because of some exceptions. e.g. Java Out Of Memory, and would just
> > > > remain dead (jstack to dump threads to check the # of running
> > > > ReplicaFetcher threads) without getting restarted automatically, so
> > > > needs to bounce the broker.  It would be nice to make the
> > > > ReplicaFetcher more robust/resilient of catching more exceptions,
> and
> > > > if crashed, get restarted after some time.
> > > >
> > >
> > > This has definitely been an issue in the past, I agree.  Thankfully,
> we
> > > recently did improve the robustness of the ReplicaFetcher.  Check out
> > > "KIP-461: Improve Replica Fetcher behavior at handling partition
> > > failure."
> > >
> > > cheers,
> > > Colin
> > >
> > > >
> > > >
> > > > Thanks,
> > > >
> > > > George
> > > >
> > > >
> > > 

[jira] [Resolved] (KAFKA-8992) Don't expose Errors in `RemoveMemberFromGroupResult`

2019-10-25 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8992?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-8992.

Resolution: Fixed

> Don't expose Errors in `RemoveMemberFromGroupResult`
> 
>
> Key: KAFKA-8992
> URL: https://issues.apache.org/jira/browse/KAFKA-8992
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Boyang Chen
>Priority: Blocker
> Fix For: 2.4.0
>
>
> The type `RemoveMemberFromGroupResult` exposes `Errors` from `topLevelError`. 
> We should just get rid of this API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9104) Allow AdminClient to manage users resources

2019-10-25 Thread Jakub (Jira)
Jakub created KAFKA-9104:


 Summary: Allow AdminClient to manage users resources
 Key: KAFKA-9104
 URL: https://issues.apache.org/jira/browse/KAFKA-9104
 Project: Kafka
  Issue Type: Improvement
  Components: admin
Affects Versions: 2.3.1
Reporter: Jakub


Right now, AdminClient only supports 
* TopicResource
* GroupResource
* ClusterResource
* BrokerResource

 

It's important for our automation environment to also support User resources, 
since as of right now only way for to manage different users is to run 
{code:java}
kafka-configs 
--alter 
--add-config 'producer_byte_rate=XXX,consumer_byte_rate=XXX' 
--entity-type users 
--entity-name Xx
{code}
which is not feasible in automated way per each user.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9038) Allow creating partitions while partition reassignment is in progress

2019-10-25 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-9038.

Fix Version/s: 2.4.0
   Resolution: Fixed

> Allow creating partitions while partition reassignment is in progress
> -
>
> Key: KAFKA-9038
> URL: https://issues.apache.org/jira/browse/KAFKA-9038
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Bob Barrett
>Priority: Major
> Fix For: 2.4.0
>
>
> If a user attempts to create partitions for a topic while a partition 
> reassignment is in progress, we block the request even if the topic for which 
> partitions are being created is not involved in the reassignment. This is an 
> unnecessarily strict requirement; we should allow partition creation for 
> topics that are not involved in reassignments.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Build failed in Jenkins: kafka-2.4-jdk8 #38

2019-10-25 Thread Apache Jenkins Server
See 


Changes:

[cmccabe] MINOR: Re-implement NewPartitionReassignment#of() (#7592)


--
[...truncated 5.10 MB...]
kafka.server.KafkaApisTest > 
rejectSyncGroupRequestWhenStaticMembershipNotSupported STARTED

kafka.server.KafkaApisTest > 
rejectSyncGroupRequestWhenStaticMembershipNotSupported PASSED

kafka.server.KafkaApisTest > 
rejectHeartbeatRequestWhenStaticMembershipNotSupported STARTED

kafka.server.KafkaApisTest > 
rejectHeartbeatRequestWhenStaticMembershipNotSupported PASSED

kafka.server.KafkaApisTest > testReadCommittedConsumerListOffsetLatest STARTED

kafka.server.KafkaApisTest > testReadCommittedConsumerListOffsetLatest PASSED

kafka.server.KafkaApisTest > 
testMetadataRequestOnSharedListenerWithInconsistentListenersAcrossBrokers 
STARTED

kafka.server.KafkaApisTest > 
testMetadataRequestOnSharedListenerWithInconsistentListenersAcrossBrokers PASSED

kafka.server.KafkaApisTest > testAddPartitionsToTxnWithInvalidPartition STARTED

kafka.server.KafkaApisTest > testAddPartitionsToTxnWithInvalidPartition PASSED

kafka.server.KafkaApisTest > testOffsetDeleteWithInvalidPartition STARTED

kafka.server.KafkaApisTest > testOffsetDeleteWithInvalidPartition PASSED

kafka.server.KafkaApisTest > 
testLeaderReplicaIfLocalRaisesUnknownTopicOrPartition STARTED

kafka.server.KafkaApisTest > 
testLeaderReplicaIfLocalRaisesUnknownTopicOrPartition PASSED

kafka.server.KafkaApisTest > 
shouldThrowUnsupportedVersionExceptionOnHandleAddOffsetToTxnRequestWhenInterBrokerProtocolNotSupported
 STARTED

kafka.server.KafkaApisTest > 
shouldThrowUnsupportedVersionExceptionOnHandleAddOffsetToTxnRequestWhenInterBrokerProtocolNotSupported
 PASSED

kafka.server.KafkaApisTest > testLeaderReplicaIfLocalRaisesUnknownLeaderEpoch 
STARTED

kafka.server.KafkaApisTest > testLeaderReplicaIfLocalRaisesUnknownLeaderEpoch 
PASSED

kafka.server.KafkaApisTest > testTxnOffsetCommitWithInvalidPartition STARTED

kafka.server.KafkaApisTest > testTxnOffsetCommitWithInvalidPartition PASSED

kafka.server.KafkaApisTest > testSingleLeaveGroup STARTED

kafka.server.KafkaApisTest > testSingleLeaveGroup PASSED

kafka.server.KafkaApisTest > 
rejectJoinGroupRequestWhenStaticMembershipNotSupported STARTED

kafka.server.KafkaApisTest > 
rejectJoinGroupRequestWhenStaticMembershipNotSupported PASSED

kafka.server.KafkaApisTest > 
shouldRespondWithUnsupportedMessageFormatForBadPartitionAndNoErrorsForGoodPartition
 STARTED

kafka.server.KafkaApisTest > 
shouldRespondWithUnsupportedMessageFormatForBadPartitionAndNoErrorsForGoodPartition
 PASSED

kafka.server.KafkaApisTest > 
shouldThrowUnsupportedVersionExceptionOnHandleEndTxnRequestWhenInterBrokerProtocolNotSupported
 STARTED

kafka.server.KafkaApisTest > 
shouldThrowUnsupportedVersionExceptionOnHandleEndTxnRequestWhenInterBrokerProtocolNotSupported
 PASSED

kafka.server.KafkaApisTest > testMultipleLeaveGroup STARTED

kafka.server.KafkaApisTest > testMultipleLeaveGroup PASSED

kafka.server.KafkaApisTest > 
shouldRespondWithUnknownTopicOrPartitionForBadPartitionAndNoErrorsForGoodPartition
 STARTED

kafka.server.KafkaApisTest > 
shouldRespondWithUnknownTopicOrPartitionForBadPartitionAndNoErrorsForGoodPartition
 PASSED

kafka.server.KafkaApisTest > 
rejectOffsetCommitRequestWhenStaticMembershipNotSupported STARTED

kafka.server.KafkaApisTest > 
rejectOffsetCommitRequestWhenStaticMembershipNotSupported PASSED

kafka.server.DeleteTopicsRequestWithDeletionDisabledTest > 
testDeleteRecordsRequest STARTED

kafka.server.DeleteTopicsRequestWithDeletionDisabledTest > 
testDeleteRecordsRequest PASSED

kafka.server.AlterReplicaLogDirsRequestTest > testAlterReplicaLogDirsRequest 
STARTED

kafka.server.AlterReplicaLogDirsRequestTest > testAlterReplicaLogDirsRequest 
PASSED

kafka.server.AlterReplicaLogDirsRequestTest > 
testAlterReplicaLogDirsRequestErrorCode STARTED

kafka.server.AlterReplicaLogDirsRequestTest > 
testAlterReplicaLogDirsRequestErrorCode PASSED

kafka.server.CreateTopicsRequestTest > testValidCreateTopicsRequests STARTED

kafka.server.CreateTopicsRequestTest > testValidCreateTopicsRequests PASSED

kafka.server.CreateTopicsRequestTest > testErrorCreateTopicsRequests STARTED

kafka.server.CreateTopicsRequestTest > testErrorCreateTopicsRequests PASSED

kafka.server.CreateTopicsRequestTest > testInvalidCreateTopicsRequests STARTED

kafka.server.CreateTopicsRequestTest > testInvalidCreateTopicsRequests PASSED

kafka.server.CreateTopicsRequestTest > testNotController STARTED

kafka.server.CreateTopicsRequestTest > testNotController PASSED

kafka.server.CreateTopicsRequestWithPolicyTest > testValidCreateTopicsRequests 
STARTED

kafka.server.CreateTopicsRequestWithPolicyTest > testValidCreateTopicsRequests 
PASSED

kafka.server.CreateTopicsRequestWithPolicyTest > testErrorCreateTopicsRequests 
STARTED

kafka.server.CreateTopicsRequestWithPolicyTest > testErrorCreateTopicsRequests 
PASSED


Build failed in Jenkins: kafka-trunk-jdk11 #911

2019-10-25 Thread Apache Jenkins Server
See 


Changes:

[cmccabe] MINOR: Re-implement NewPartitionReassignment#of() (#7592)


--
[...truncated 5.49 MB...]
org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnAllStoresNames[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnAllStoresNames[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessConsumerRecordList[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessConsumerRecordList[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSinkSpecificSerializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSinkSpecificSerializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFlushStoreForFirstInput[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFlushStoreForFirstInput[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourceThatMatchPattern[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourceThatMatchPattern[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUpdateStoreForNewKey[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUpdateStoreForNewKey[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldSendRecordViaCorrectSourceTopicDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldSendRecordViaCorrectSourceTopicDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTime[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTime[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldSetRecordMetadata[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldSetRecordMetadata[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForLargerValue[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForLargerValue[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectInMemoryStoreTypeOnly[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectInMemoryStoreTypeOnly[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldThrowForMissingTime[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldThrowForMissingTime[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTimeDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTimeDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessRecordForTopic[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessRecordForTopic[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldForwardRecordsFromSubtopologyToSubtopology[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldForwardRecordsFromSubtopologyToSubtopology[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForSmallerValue[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForSmallerValue[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCreateStateDirectoryForStatefulTopology[Eos enabled = false] STARTED


Re: [DISCUSS] KIP-150 - Kafka-Streams Cogroup

2019-10-25 Thread Matthias J. Sax
Walker,

I am not sure if I can follow your argument. What do you exactly mean by

> I also
>> think that in this case it would be better to separate the 2 option out
>> into separate overloads.

Maybe you can give an example what method signature you have in mind?

>> We could take a named parameter from upstream or add an extra naming option
>> however I don't really see the advantage that would give.

Are you familiar with KIP-307? Before KIP-307, KS generated all names
for all Processors. This makes it hard to reason about a Topology if
it's getting complex. Adding `Named` to the new co-group operator would
actually align with KIP-307.

> It seems to go in
>> the opposite direction from the cogroup configuration idea you proposed.

Can you elaborate? Not sure if I can follow.



-Matthias


On 10/24/19 10:20 AM, Walker Carlson wrote:
> While I like the idea Sophie I don't think that it is necessary. I also
> think that in this case it would be better to separate the 2 option out
> into separate overloads.
> We could take a named parameter from upstream or add an extra naming option
> however I don't really see the advantage that would give. It seems to go in
> the opposite direction from the cogroup configuration idea you proposed.
> 
> John, I think it could be both. It depends on when you aggregate and what
> kind of data you have. In the example it is aggregating before joining,
> there are probably some cases where you could join before aggregating. IMHO
> it would be easier to group all the streams together then perform the one
> operation that results in a single KTable.
> 
> 
> 
> On Wed, Oct 23, 2019 at 9:58 PM Sophie Blee-Goldman 
> wrote:
> 
>>> I can personally not see any need to add other configuration
>> Famous last words?
>>
>> Just kidding, 95% confidence is more than enough to  me (and better to
>> optimize for current
>> design than for hypothetical future changes).
>>
>> One last question I have then is about the operator/store/repartition
>> naming -- seems like
>> we can name the underlying store/changelog through the Materialized
>> parameter, but do we
>> also want to include an overload taking a Named parameter for the operator
>> name (as in the
>> KTable#join variations)?
>>
>> On Wed, Oct 23, 2019 at 5:14 PM Matthias J. Sax 
>> wrote:
>>
>>> Interesting idea, Sophie.
>>>
>>> So far, we tried to reuse existing config objects and only add new ones
>>> when needed to avoid creating "redundant" classes. This is of course a
>>> reactive approach (with the drawback to deprecate stuff if we change it,
>>> as you described).
>>>
>>> I can personally not see any need to add other configuration parameters
>>> atm, so it's a 95% obvious "no" IMHO. The final `aggregate()` has only a
>>> single state store that we need to configure, and reusing `Materialized`
>>> seems to be appropriate.
>>>
>>> Also note, that the `Initializer` is a mandatory parameter and not a
>>> configuration and should be passed directly, and not via a configuration
>>> object.
>>>
>>>
>>> -Matthias
>>>
>>> On 10/23/19 11:37 AM, Sophie Blee-Goldman wrote:
 Thanks for the explanation, makes sense to me! As for the API, one
>> other
 thought I had is might we ever want or need to introduce any other
>>> configs
 or parameters in the future? Obviously that's difficult to say now (or
 maybe the
 answer seems obviously "no") but we seem to often end up needing to add
>>> new
 overloads and/or deprecate old ones as new features or requirements
>> come
 into
 play.

 What do you (and others?) think about wrapping the config parameters
>> (ie
 everything
 except the actual grouped streams) in a new config object? For example,
>>> the
 CogroupedStream#aggregate field could take a single Cogrouped object,
 which itself would have an initializer and a materialized. If we ever
>>> need
 to add
 a new parameter, we can just add it to the Cogrouped class.

 Also, will the backing store be available for IQ if a Materialized is
 passed in?

 On Wed, Oct 23, 2019 at 10:49 AM Walker Carlson >>
 wrote:

> Hi Sophie,
>
> Thank you for your comments. As for the different methods signatures I
>>> have
> not really considered any other options but  while I do agree it is
> confusing, I don't see any obvious solutions. The problem is that the
> cogroup essentially pairs a group stream with an aggregator and when
>> it
>>> is
> first made the method is called on a groupedStream already. However
>> each
> subsequent stream-aggregator pair is added on to a cogroup stream so
>> it
> needs both arguments.
>
> For the second question you should not need a joiner. The idea is that
>>> you
> can collect many grouped streams with overlapping key spaces and any
>>> kind
> of value types. Once aggregated its value will be reduced into one
>> type.
> This is why you need only one initializer. Each 

Re: [VOTE] KIP-541: Create a fetch.max.bytes configuration for the broker

2019-10-25 Thread Ismael Juma
+1 (binding)

On Thu, Oct 24, 2019, 4:56 PM Colin McCabe  wrote:

> Hi all,
>
> I'd like to start the vote on KIP-541: Create a fetch.max.bytes
> configuration for the broker.
>
> KIP: https://cwiki.apache.org/confluence/x/4g73Bw
>
> Discussion thread:
> https://lists.apache.org/thread.html/9d9dde93a07e1f1fc8d9f182f94f4bda9d016c5e9f3c8541cdc6f53b@%3Cdev.kafka.apache.org%3E
>
> cheers,
> Colin
>


Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-25 Thread Matthias J. Sax
Just to close the loop @Vinoth:

> 1. IIUC John intends to add (or we can do this in this KIP) lag information
> to AssignmentInfo, which gets sent to every participant.

As explained by John, currently KIP-441 plans to only report the
information to the leader. But I guess, with the new proposal to not
broadcast this information anyway, this concern is invalidated anyway

> 2. At-least I was under the assumption that it can be called per query,
> since the API docs don't seem to suggest otherwise. Do you see any
> potential issues if we call this every query? (we should benchmark this
> nonetheless)

I did not see a real issue if people refresh the metadata frequently,
because it would be a local call. My main point was, that this would
change the current usage pattern of the API, and we would clearly need
to communicate this change. Similar to (1), this concern in invalidated
anyway.


@John: I think it's a great idea to get rid of reporting lag, and
pushing the decision making process about "what to query" into the query
serving layer itself. This simplifies the overall design of this KIP
significantly, and actually aligns very well with the idea that Kafka
Streams (as it is a library) should only provide the basic building
block. Many of my raised questions are invalided by this.



Some questions are still open though:

> 10) Do we need to distinguish between active(restoring) and standby
> tasks? Or could be treat both as the same?


@Vinoth: about (5). I see your point about multiple calls vs a single
call. I still slightly prefer multiple calls, but it's highly subjective
and I would also be fine to add an #isActive() method. Would be good the
get feedback from others.


For (14), ie, lag in offsets vs time: Having both, as suggested by
Sophie would of course be best. What is a little unclear to me is, how
in details are we going to compute both?



-Matthias



On 10/24/19 11:07 PM, Sophie Blee-Goldman wrote:
> Just to chime in on the "report lag vs timestamp difference" issue, I would
> actually advocate for both. As mentioned already, time difference is
> probably a lot easier and/or more useful to reason about in terms of
> "freshness"
> of the state. But in the case when all queried stores are far behind, lag
> could
> be used to estimate the recovery velocity. You can then get a (pretty rough)
> idea of when a store might be ready, and wait until around then to query
> again.
> 
> On Thu, Oct 24, 2019 at 9:53 PM Guozhang Wang  wrote:
> 
>> I think I agree with John's recent reasoning as well: instead of letting
>> the storeMetadataAPI to return the staleness information, letting the
>> client to query either active or standby and letting standby query response
>> to include both the values + timestamp (or lag, as in diffs of timestamps)
>> would actually be more intuitive -- not only the streams client is simpler,
>> from user's perspective they also do not need to periodically refresh their
>> staleness information from the client, but only need to make decisions on
>> the fly whenever they need to query.
>>
>> Again the standby replica then need to know the current active task's
>> timestamp, which can be found from the log end record's encoded timestamp;
>> today we standby tasks do not read that specific record, but only refresh
>> its knowledge on the log end offset, but I think refreshing the latest
>> record timestamp is not a very bad request to add on the standby replicas.
>>
>>
>> Guozhang
>>
>>
>> On Thu, Oct 24, 2019 at 5:43 PM Vinoth Chandar 
>> wrote:
>>
>>> +1 As someone implementing a query routing layer, there is already a need
>>> to have mechanisms in place to do healthchecks/failure detection to
>> detect
>>> failures for queries, while Streams rebalancing eventually kicks in the
>>> background.
>>> So, pushing this complexity to the IQ client app keeps Streams simpler as
>>> well. IQs will be potentially issues at an order of magnitude more
>>> frequently and it can achieve good freshness for the lag information.
>>>
>>> I would like to add however, that we would also need to introduce apis in
>>> KafkaStreams class, for obtaining lag information for all stores local to
>>> that host. This is for the IQs to relay back with the response/its own
>>> heartbeat mechanism.
>>>
>>> On Thu, Oct 24, 2019 at 3:12 PM John Roesler  wrote:
>>>
 Hi all,

 I've been mulling about this KIP, and I think I was on the wrong track
 earlier with regard to task lags. Tl;dr: I don't think we should add
 lags at all to the metadata API (and also not to the AssignmentInfo
 protocol message).

 Like I mentioned early on, reporting lag via
 SubscriptionInfo/AssignmentInfo would only work while rebalances are
 happening. Once the group stabilizes, no members would be notified of
 each others' lags anymore. I had been thinking that the solution would
 be the heartbeat proposal I mentioned earlier, but that proposal would
 have reported the 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-25 Thread Sophie Blee-Goldman
Just to chime in on the "report lag vs timestamp difference" issue, I would
actually advocate for both. As mentioned already, time difference is
probably a lot easier and/or more useful to reason about in terms of
"freshness"
of the state. But in the case when all queried stores are far behind, lag
could
be used to estimate the recovery velocity. You can then get a (pretty rough)
idea of when a store might be ready, and wait until around then to query
again.

On Thu, Oct 24, 2019 at 9:53 PM Guozhang Wang  wrote:

> I think I agree with John's recent reasoning as well: instead of letting
> the storeMetadataAPI to return the staleness information, letting the
> client to query either active or standby and letting standby query response
> to include both the values + timestamp (or lag, as in diffs of timestamps)
> would actually be more intuitive -- not only the streams client is simpler,
> from user's perspective they also do not need to periodically refresh their
> staleness information from the client, but only need to make decisions on
> the fly whenever they need to query.
>
> Again the standby replica then need to know the current active task's
> timestamp, which can be found from the log end record's encoded timestamp;
> today we standby tasks do not read that specific record, but only refresh
> its knowledge on the log end offset, but I think refreshing the latest
> record timestamp is not a very bad request to add on the standby replicas.
>
>
> Guozhang
>
>
> On Thu, Oct 24, 2019 at 5:43 PM Vinoth Chandar 
> wrote:
>
> > +1 As someone implementing a query routing layer, there is already a need
> > to have mechanisms in place to do healthchecks/failure detection to
> detect
> > failures for queries, while Streams rebalancing eventually kicks in the
> > background.
> > So, pushing this complexity to the IQ client app keeps Streams simpler as
> > well. IQs will be potentially issues at an order of magnitude more
> > frequently and it can achieve good freshness for the lag information.
> >
> > I would like to add however, that we would also need to introduce apis in
> > KafkaStreams class, for obtaining lag information for all stores local to
> > that host. This is for the IQs to relay back with the response/its own
> > heartbeat mechanism.
> >
> > On Thu, Oct 24, 2019 at 3:12 PM John Roesler  wrote:
> >
> > > Hi all,
> > >
> > > I've been mulling about this KIP, and I think I was on the wrong track
> > > earlier with regard to task lags. Tl;dr: I don't think we should add
> > > lags at all to the metadata API (and also not to the AssignmentInfo
> > > protocol message).
> > >
> > > Like I mentioned early on, reporting lag via
> > > SubscriptionInfo/AssignmentInfo would only work while rebalances are
> > > happening. Once the group stabilizes, no members would be notified of
> > > each others' lags anymore. I had been thinking that the solution would
> > > be the heartbeat proposal I mentioned earlier, but that proposal would
> > > have reported the heartbeats of the members only to the leader member
> > > (the one who makes assignments). To be useful in the context of _this_
> > > KIP, we would also have to report the lags in the heartbeat responses
> > > to of _all_ members. This is a concern to be because now _all_ the
> > > lags get reported to _all_ the members on _every_ heartbeat... a lot
> > > of chatter.
> > >
> > > Plus, the proposal for KIP-441 is only to report the lags of each
> > > _task_. This is the sum of the lags of all the stores in the tasks.
> > > But this would be insufficient for KIP-535. For this kip, we would
> > > want the lag specifically of the store we want to query. So this
> > > means, we have to report the lags of all the stores of all the members
> > > to every member... even more chatter!
> > >
> > > The final nail in the coffin to me is that IQ clients would have to
> > > start refreshing their metadata quite frequently to stay up to date on
> > > the lags, which adds even more overhead to the system.
> > >
> > > Consider a strawman alternative: we bring KIP-535 back to extending
> > > the metadata API to tell the client the active and standby replicas
> > > for the key in question (not including and "staleness/lag"
> > > restriction, just returning all the replicas). Then, the client picks
> > > a replica and sends the query. The server returns the current lag
> > > along with the response (maybe in an HTML header or something). Then,
> > > the client keeps a map of its last observed lags for each replica, and
> > > uses this information to prefer fresher replicas.
> > >
> > > OR, if it wants only to query the active replica, it would throw an
> > > error on any lag response greater than zero, refreshes its metadata by
> > > re-querying the metadata API, and tries again with the current active
> > > replica.
> > >
> > > This way, the lag information will be super fresh for the client, and
> > > we keep the Metadata API / Assignment,Subscription / and Heartbeat as
> > > 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-25 Thread Navinder Brar
 Sending the older replies again as they were unreadable due to bad formatting.

>> There was a follow-on idea I POCed to continuously share lag information in 
>> the heartbeat protocol
+1 that would be great, I will update the KIP assuming this work will finish 
soon




>> I think that adding a new method to StreamsMetadataState and deprecating the 
>> existing method is

the best way to go; we just can't change the return types of any existing 
methods.

+1 on this, we will add new methods for users who would be interested in 
querying back a list of possible options to query from and leave the current 
function getStreamsMetadataForKey() untouched for users who want absolute 
consistency.




>> why not just _always_ return all available metadata (including 
>> active/standby or lag) and let the caller decide to which node they want to 
>> route the query

+1. I think this makes sense as from a user standpoint there is no difference 
b/w an active and a standby if both have same lag, Infact users would be able 
to use this api to reduce query load on actives, so returning all available 
options along with the current lag in each would make sense and leave it to 
user how they want to use this data. This has another added advantage. If a 
user queries any random machine for a key and that machine has a replica for 
the partition(where key belongs) user might choose to serve the data from there 
itself(if it doesn’t lag much) rather than finding the active and making an IQ 
to that. This would save some critical time in serving for some applications. 




>> Adding the lag in terms of timestamp diff comparing the committed offset.

+1 on this, I think it’s more readable. But as John said the function 
allMetadataForKey() is just returning the possible options from where users can 
query a key, so we can even drop the parameter 
enableReplicaServing/tolerableDataStaleness and just return all the 
streamsMetadata containing that key along with the offset limit.



   
   - @John has already commented on this one.
   - Yeah the usage pattern would include querying this prior to every request 
   - Will add the changes to StreamsMetadata in the KIP, would include changes 
in rebuildMetadata() etc.
   - Makes sense, already addressed above
   - Is it important from a user perspective if they are querying an  
active(processing), active(restoring), a standby task if we have away of 
denoting lag in a readable manner which kind of signifies the user that this is 
the best node to query the fresh data.
   - Yes, I intend to return the actives and replicas in the same return list 
in allMetadataForKey()
   - I think we can’t be dependent on propagating lag just during rebalances, 
sending it with each hearbeat every 3 secs seems like a better idea and having 
this less delay also saves us from this race condition.
   - yes, we need new functions to return activeRestoring and standbyRunning 
tasks.
   - StreamsConfig doesn’t look like of much use to me since we are giving all 
possible options via this function, or they can use existing function 
getStreamsMetadataForKey() and get just the active
   - I think treat them both the same and let the lag do the talking
   - We are just sending them the option to query from in allMetadataForKey(), 
which doesn’t include any handle. We then query that machine for the key where 
it calls allStores() and tries to find the task in 
activeRunning/activeRestoring/standbyRunning and adds the store handle here. 
   - Need to verify, but during the exact point when store is closed to 
transition it from restoring to running the queries will fail. The caller in 
such case can have their own configurable retries to check again or try the 
replica if a call fails to active
   - I think KIP-216 is working on those lines, we might not need few of those 
exceptions since now the basic idea of this KIP is to support IQ during 
rebalancing.
   - Addressed above, agreed it looks more readable.

 

On Friday, 25 October, 2019, 10:23:57 am IST, Guozhang Wang 
 wrote:  
 
 I think I agree with John's recent reasoning as well: instead of letting
the storeMetadataAPI to return the staleness information, letting the
client to query either active or standby and letting standby query response
to include both the values + timestamp (or lag, as in diffs of timestamps)
would actually be more intuitive -- not only the streams client is simpler,
from user's perspective they also do not need to periodically refresh their
staleness information from the client, but only need to make decisions on
the fly whenever they need to query.

Again the standby replica then need to know the current active task's
timestamp, which can be found from the log end record's encoded timestamp;
today we standby tasks do not read that specific record, but only refresh
its knowledge on the log end offset, but I think refreshing the latest
record timestamp is not a very bad request to add on the standby replicas.