Re: [DISCUSS] KIP-221: Repartition Topic Hints in Streams

2019-11-14 Thread Sophie Blee-Goldman
It seems like we all agree at this point (please correct me if wrong!) that
we should NOT change
the existing repartitioning behavior, ie we should allow Streams to
continue to determine when and
where to repartition -- *unless* explicitly informed to by the use of a
.through or the new .repartition operator.

Regarding groupBy, the existing behavior we should not disrupt is
a) repartition *only* when required due to upstream key-changing operation
(ie don't force repartitioning
based on the presence of an optional config parameter), and
b) allow optimization of required repartitions, if any

Within the constraint of not breaking the existing behavior, this still
leaves open the question of whether we
want to improve the user experience by allowing to provide groupBy with a
*suggestion* for numPartitions (or to
put it more fairly, whether that *will* improve the experience). I agree
with many of the arguments outlined above but
let me just push back on this one issue one final time, and if we can't
come to a consensus then I am happy to drop
it for now so that the KIP can proceed.

Specifically, my proposal would be to simply augment Grouped with an
optional numPartitions, understood to
indicate the user's desired number of partitions *if Streams decides to
repartition due to that groupBy*

> if a user cares about the number of partition, the user wants to enforce
a repartitioning
First, I think we should take a step back and examine this claim. I agree
100% that *if this is true,*
*then we should not give groupBy an optional numPartitions.* As far as I
see it, there's no argument
to be had there if we *presuppose that claim.* But I'm not convinced in
that as an axiom of the user
experience and think we should be examining that claim itself, not the
consequences of it.

To give a simple example, let's say some new user is trying out Streams and
wants to just play around
with it to see if it might be worth looking into. They want to just write
up a simple app and test it out on the
data in some existing topics they have with a large number of partitions,
and a lot of data. They're just messing
around, trying new topologies and don't want to go through each new one
step by step to determine if (or where)
a repartition might be required. They also don't want to force a
repartition if it turns out to not be required, so they'd
like to avoid the nice new .repartition operator they saw. But given the
huge number of input partitions, they'd like
to rest assured that if a repartition does end up being required somewhere
during dev, it will not be created with
the same huge number of partitions that their input topic has -- so they
just pass groupBy a small numPartitions
suggestion.

I know that's a bit of a contrived example but I think it does highlight
how and when this might be a considerable
quality of life improvement, in particular for new users to Streams and/or
during the dev cycle. *You don't want to*
*force a repartition if it wasn't necessary, but you don't want to create a
topic with a huge partition count either.*

Also, while the optimization discussion took us down an interesting but
ultimately more distracting road, it's worth
pointing out that it is clearly a major win to have as few
repartition topics/steps as possible. Given that we
don't want to change existing behavior, the optimization framework can only
help out when the placement of
repartition steps is flexible, which means only those from .groupBy (and
not .repartition). *Users should not*
*have to choose between allowing Streams to optimize the repartition
placement, and allowing to specify a *
*number of partitions.*

Lastly, I have what may be a stupid question but for my own edification of
how groupBy works:
if you do a .groupBy and a repartition is NOT required, does it ever need
to serialize/deserialize
any of the data? In other words, if you pass a key/value serde to groupBy
and it doesn't trigger
a repartition, is the serde(s) just ignored and thus more like a suggestion
than a requirement?

So again, I don't want to hold up this KIP forever but I feel we've spent
some time getting slightly
off track (although certainly into very interesting discussions) yet never
really addressed or questioned
the basic premise: *could a user want to specify a number of partitions but
not enforce a repartition (at that*
*specific point in the topology)?*



On Fri, Nov 15, 2019 at 12:18 AM Matthias J. Sax 
wrote:

> Side remark:
>
> If the user specifies `repartition()` on both side of the join, we can
> actually throw the execption earlier, ie, when we build the topology.
>
> Current, we can do this check only after Kafka Streams was started,
> within `StreamPartitionAssignor#assign()` -- we still need to keep this
> check for the case that none or only one side has a user specified
> number of partitions though.
>
>
> -Matthias
>
> On 11/14/19 8:15 AM, John Roesler wrote:
> > Thanks, all,
> >
> > I can get behind just totally leaving out 

[jira] [Created] (KAFKA-9194) Missing documentation for replicaMaxWaitTimeMs config value

2019-11-14 Thread Tomasz Szlek (Jira)
Tomasz Szlek created KAFKA-9194:
---

 Summary: Missing documentation for replicaMaxWaitTimeMs config 
value
 Key: KAFKA-9194
 URL: https://issues.apache.org/jira/browse/KAFKA-9194
 Project: Kafka
  Issue Type: Improvement
  Components: documentation
Affects Versions: 2.3.0
Reporter: Tomasz Szlek


I have read documentation and was interested in *replica.fetch.min.bytes* 
property.

In description of this config name there is information about related config 
*replicaMaxWaitTimeMs* however there is no documentation about this related 
config at all. Can you add it to the [configuration 
page|[https://kafka.apache.org/documentation/#newconsumerconfigs]] ? 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Jenkins build is back to normal : kafka-trunk-jdk8 #4041

2019-11-14 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-221: Repartition Topic Hints in Streams

2019-11-14 Thread Matthias J. Sax
Side remark:

If the user specifies `repartition()` on both side of the join, we can
actually throw the execption earlier, ie, when we build the topology.

Current, we can do this check only after Kafka Streams was started,
within `StreamPartitionAssignor#assign()` -- we still need to keep this
check for the case that none or only one side has a user specified
number of partitions though.


-Matthias

On 11/14/19 8:15 AM, John Roesler wrote:
> Thanks, all,
> 
> I can get behind just totally leaving out reparation-via-groupBy. If
> we only introduce `repartition()` for now, we're making the minimal
> change to gain the desired capability.
> 
> Plus, since we agree that `repartition()` should never be optimizable,
> it's a future-compatible proposal. I.e., if we were to add a
> non-optimizable groupBy(partitions) operation now, and want to make it
> optimizable in the future, we have to worry about topology
> compatibility. Better to just do non-optimizable `repartition()` now,
> and add an optimizable `groupBy(partitions)` in the future (maybe).
> 
> About joins, yes, it's a concern, and IMO we should just do the same
> thing we do now... check at runtime that the partition counts on both
> sides match and throw an exception otherwise. What this means as a
> user is that if you explicitly repartition the left side to 100
> partitions, and then join with the right side at 10 partitions, you
> get an exception, since this operation is not possible. You'd either
> have to "step down" the left side again, back to 10 partitions, or you
> could repartition the right side to 100 partitions before the join.
> The choice has to be the user's, since it depends on their desired
> execution parallelism.
> 
> Thanks,
> -John
> 
> On Thu, Nov 14, 2019 at 12:55 AM Matthias J. Sax  
> wrote:
>>
>> Thanks a lot John. I think the way you decompose the operators is super
>> helpful for this discussion.
>>
>> What you suggest with regard to using `Grouped` and enforcing
>> repartitioning if the number of partitions is specified is certainly
>> possible. However, I am not sure if we _should_ do this. My reasoning is
>> that an enforce repartitioning as introduced via `repartition()` is an
>> expensive operations, and it seems better to demand an more explicit
>> user opt-in to trigger it. Just setting an optional parameter might be
>> too subtle to trigger such a heavy "side effect".
>>
>> While I agree about "usability" in general, I would prefer a more
>> conservative appraoch to introduce this feature, see how it goes, and
>> maybe make it more advance later on. This also applies to what
>> optimzation we may or may not allow (or are able to perform at all).
>>
>> @Levani: Reflecting about my suggestion about `Repartioned extends
>> Grouped`, I agree that it might not be a good idea.
>>
>> Atm, I see an enforces repartitioning as non-optimizable and as a good
>> first step and I would suggest to not intoruce anything else for now.
>> Introducing optimizable enforce repartitioning via `groupBy(...,
>> Grouped)` is something we could add later.
>>
>>
>> Therefore, I would not change `Grouped` but only introduce
>> `repartition()`. Users that use `grouBy()` atm, and want to opt-in to
>> set the number of partitions, would need to rewrite their code to
>> `selectKey(...).repartition(...).groupByKey()`. It's less convinient but
>> also less risky from an API and optimization point of view.
>>
>>
>> @Levani: about joins -> yes, we will need to check the specified number
>> of partitions (if any) and if they don't match, throw an exception. We
>> can discuss this on the PR -- I am just trying to get the PR for KIP-466
>> merged -- your is next on the list :)
>>
>>
>> Thoughts?
>>
>>
>> -Matthias
>>
>>
>> On 11/12/19 4:51 PM, Levani Kokhreidze wrote:
>>> Thank you all for an interesting discussion. This is very enlightening.
>>>
>>> Thank you Matthias for your explanation. Your arguments are very true. It 
>>> makes sense that if user specifies number of partitions he/she really cares 
>>> that those specifications are applied to internal topics.
>>> Unfortunately, in current implementation this is not true during `join` 
>>> operation. As I’ve written in the PR comment, currently, when `Stream#join` 
>>> is used, `CopartitionedTopicsEnforcer` chooses max number of partitions 
>>> from the two source topics.
>>> I’m not really sure what would be the other way around this situation. 
>>> Maybe fail the stream altogether and inform the user to specify same number 
>>> of partitions?
>>> Or we should treat join operations in a same way as it is right now and 
>>> basically choose max number of partitions even when `repartition` operation 
>>> is specified, because Kafka Streams “knows the best” how to handle joins?
>>> You can check integration tests how it’s being handled currently. Open to 
>>> suggestions on that part.
>>>
>>> As for groupBy, I agree and John raised very interesting points. My 
>>> arguments for allowing users to 

Re: [VOTE] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-14 Thread Matthias J. Sax
+1 (binding)


On 11/14/19 3:48 PM, Guozhang Wang wrote:
> +1 (binding), thanks for the KIP!
> 
> Guozhang
> 
> On Fri, Nov 15, 2019 at 4:38 AM Navinder Brar
>  wrote:
> 
>> Hello all,
>>
>> I'd like to propose a vote for serving interactive queries during
>> Rebalancing, as it is a big deal for applications looking for high
>> availability. With this change, users will have control over the tradeoff
>> between consistency and availability during serving.
>> The full KIP is provided here:
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance
>>
>>
>> Thanks,
>> Navinder
> 
> 
> 



signature.asc
Description: OpenPGP digital signature


Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-14 Thread Matthias J. Sax
Thanks.

I buy the argument about the lag for active tasks.

Nit: The KIP briefly mentions the deprecation of `metadataFoKey()`
methods -- those should be listed as `@deprecated` next to the newly
added methods to point this change out more visibly.

Nit: in the code example, why do we loop over `inSyncStandbys` ? Would
we not just query only the most up-to-date one?

Nit: Section "Compatibility, Deprecation, and Migration Plan" should
point out that two methods are deprecated and user can migrate their
code to use the two new methods instead.

Those nits only address the write-up of the KIP, not the actual design
that LGTM.


-Matthias




On 11/14/19 3:48 PM, Guozhang Wang wrote:
> 10/20: I think I'm aligned with John's replies as well.
> 
> Guozhang
> 
> On Fri, Nov 15, 2019 at 1:45 AM Vinoth Chandar 
> wrote:
> 
>>> during restoring state the active might have some lag
>>
>> Great catch, yes.. we cannot assume lag = 0 for active. Lets report active
>> lag as well then. If active is too laggy, the app can then deem the store
>> partition unavailable (based on what the application is willing to
>> tolerate).
>>
>> @matthias do you agree? We can then begin the vote.
>>
>> On Thu, Nov 14, 2019 at 9:03 AM Navinder Brar
>>  wrote:
>>
>>> I agree with Vinoth and John on having "allLocalStoreOffsetLags()", all
>>> actives don't have 0 lag, as during restoring state the active might have
>>> some lag and one of the features of this KIP is to provide an option to
>>> query from active (which might be in restoring state).
>>> I will update the KIP with rejected alternatives and post this will start
>>> a vote if everyone agrees on this.
>>> On Thursday, 14 November, 2019, 09:34:52 pm IST, John Roesler <
>>> j...@confluent.io> wrote:
>>>
>>>  Hi all,
>>>
>>> Thanks for the "reset", Vinoth. It brings some needed clarity to the
>>> discussion.
>>>
>>> 10. My 2 cents: we might as well include the lags for the active
>>> copies as well. This is a more context-free API. If we only include
>>> standbys, this choice won't make sense to users unless they understand
>>> that the active task cannot lag in the steady state, since it's the
>>> source of updates. This isn't a bad thing to realize, but it's just
>>> more mental overhead for the person who wants to list the lags for
>>> "all local stores".
>>>
>>> Another reason is that we could consider also reporting the lag for
>>> actives during recovery (when they would have non-zero lag). We don't
>>> have to now, but if we choose to call the method "standby lags", then
>>> we can't make this choice in the future.
>>>
>>> That said, it's just my opinion. I'm fine either way.
>>>
>>> 20. Vinoth's reply makes sense to me, fwiw.
>>>
>>> Beyond these two points, I'm happy with the current proposal.
>>>
>>> Thanks again,
>>> -John
>>>
>>> On Thu, Nov 14, 2019 at 4:48 AM Vinoth Chandar 
>>> wrote:

 10. I considered that. Had to pick one or the other. Can just return
 standby too and rename method to may be “allLocalStandbyOffsetLags()”
>> to
 have it explicit. (Standby should implicitly convey that we are talking
 about stores)

 20. What I meant was, we are returning HostInfo instead of
>>> StreamsMetadata
 since thats sufficient to route query; same for “int partition “ vs
>> topic
 partition before. Previously KeyQueryMetadata had similar structure but
 used StreamsMetadata and TopicPartition objects to convey same
>>> information

 @navinder KIP is already upto date with the email I sent, except for
>> the
 reasonings I was laying out. +1 on revisiting rejected alternatives.
 Please make the follow up changes

 On Wed, Nov 13, 2019 at 9:12 PM Matthias J. Sax >>
 wrote:

> Thanks for the summary Vinoth!
>
> I buy the overall argument. Couple of clarification questions:
>
>
> 10. Why do we need to include the active stores in
> `allLocalStoreOffsetLags()`? Would it not be simpler to just return
>> lag
> for standbys?
>
>
> 20: What does
>
>> Thin the KeyQueryMetadata object to just contain the minimum
>>> information
>> needed.
>
> exaclty mean? What is the "minimum information needed" ?
>
>
> @Navinder: if you agree, can you update the KIP accoringly? With all
>>> the
> proposals, it's hard to keep track and it would be great to have the
> current proposal summarized in the wiki page.
>
> Please also update the "Rejected alternative" sections to avoid that
>> we
> cycle back to old proposal (including the reason _why_ they got
>>> rejected).
>
>
> Thanks a lot!
>
>
> -Matthias
>
>
>
> On 11/13/19 7:10 PM, Vinoth Chandar wrote:
>> Given we have had a healthy discussion on this topic for a month
>> now
>>> and
>> still with many loose ends and open ended conversations, I thought
>> It
> would
>> be worthwhile to take a step back and re-evaluate 

Re: Potential Bug in 2.3 version (leading to deletion of state directories)

2019-11-14 Thread Giridhar Addepalli
Hi John,

Can you please point us to code where Thread-2 will be able to recreate the 
state directory once cleaner is done ?

Also, we see that in https://issues.apache.org/jira/browse/KAFKA-6122, retries 
around locks is removed. Please let us know why retry mechanism is removed?

Also can you please explain below comment in 
AssignedTasks.java#initializeNewTasks function
catch (final LockException e) {
// made this trace as it will spam the logs in the poll 
loop.
log.trace("Could not create {} {} due to {}; will retry", 
taskTypeName, entry.getKey(), e.getMessage());
}
Thanks,
Giridhar.

On 2019/11/14 21:25:28, John Roesler  wrote: 
> Hey Navinder,
> 
> I think what's happening is a little different. Let's see if my
> worldview also explains your experiences.
> 
> There is no such thing as "mark for deletion". When a thread loses a
> task, it simply releases its lock on the directory. If no one else on
> the instance claims that lock within `state.cleanup.delay.ms` amount
> of milliseconds, then the state cleaner will itself grab the lock and
> delete the directory. On the other hand, if another thread (or the
> same thread) gets the task back and claims the lock before the
> cleaner, it will be able to re-open the store and use it.
> 
> The default for `state.cleanup.delay.ms` is 10 minutes, which is
> actually short enough that it could pass during a single rebalance (if
> Streams starts recovering a lot of state). I recommend you increase
> `state.cleanup.delay.ms` by a lot, like maybe set it to one hour.
> 
> One thing I'm curious about... You didn't mention if Thread-2
> eventually is able to re-create the state directory (after the cleaner
> is done) and transition to RUNNING. This should be the case. If not, I
> would consider it a bug.
> 
> Thanks,
> -John
> 
> On Thu, Nov 14, 2019 at 3:02 PM Navinder Brar
>  wrote:
> >
> > Hi,
> > We are facing a peculiar situation in the 2.3 version of Kafka Streams. 
> > First of all, I want to clarify if it is possible that a Stream Thread (say 
> > Stream Thread-1) which had got an assignment for a standby task (say 0_0) 
> > can change to Stream Thread-2 on the same host post rebalancing. The issue 
> > we are facing is this is happening for us and post rebalancing since the 
> > Stream Thread-1 had 0_0 and is not assigned back to it, it closes that task 
> > and marks it for deletion(after cleanup delay time), and meanwhile, the 
> > task gets assigned to Stream Thread-2. When the Stream Thread-2 tries to 
> > transition this task to Running, it gets a LockException which is caught in 
> > AssignedTasks#initializeNewTasks(). This makes 0_0 stay in Created state on 
> > Stream Thread-2 and after the cleanup delay is over the task directories 
> > for 0_0 get deleted.
> > Can someone please comment on this behavior.
> > Thanks,Navinder
> 


Standby Tasks stays in “created” hash map in AssignedTasks

2019-11-14 Thread Giridhar Addepalli
We are using kakfa streams version 1.1.0

We made some changes to kafka streams code. We are observing following sequence 
of events in our production environment. We want to understand if following 
sequence of events is possible in 1.1.0 version also.

time T0

StreamThread-1 : got assigned 0_1, 0_2 standby tasks
StreamThread-2 : got assigned 0_3 standby task

time T1 -

Now let us say there is a consumer group rebalance.

And task 0_1 got assigned to StreamThread-2 (i.e; it 0_1 standby task moved 
from StreamThread-1 to StreamThread-2).

time T2 --

StreamThread-2 sees that new standby task, 0_1, is assigned to it. 
Tries to initializeStateStores for 0_1, but gets *LockException* because 
*owningThread* for the lock is StreamThread-1.

But LockException is being swallowed in *initializeNewTasks* function of 
*AssignedTasks.java*

And 0_1 remains in *created* map inside *AssignedTasks.java*

time T3 --

StreamThread-1 realizes that 0_1 is not re-assigned to it and closes the 
suspended task. 
As part of closing suspended task, entry for 0_1 is deleted from *locks* map in 
*unlock* function in StateDirectory.java

time T4 --

 *CleanupThread* came along after *cleanupDelayMs* time and decided 0_1 
directory in local 
 file system is obsolete and deleted the directory !!!
Since local directory is deleted for the task, and 0_1 is under created map, 
changelog topic-partitions won't be read for 0_1 standby task until next 
rebalance !!!


Please let us know if this is valid sequence. If not, what are the guards to 
prevent this sequence.

We see that in https://issues.apache.org/jira/browse/KAFKA-6122, retries around 
locks is removed. Please let us know why retry mechanism is removed?


Re: Potential Bug in 2.3 version (leading to deletion of state directories)

2019-11-14 Thread Giridhar Addepalli
Hi John,

i see in https://github.com/apache/kafka/pull/3653
there is discussion around swallowing of LockException and retry not being 
there.
but dguy replied saying that "The retry doesn't happen in this block of code. 
It will happen the next time the runLoop executes."

but state of thread is being changed to RUNNING, hence 
updateNewAndRestoringTasks won't be called again inside runOnce of StreamThread

In TaskManager#updateNewAndRestoringTasks at the end, there is IF condition 
which checks whether all active tasks are running.

Do you we should change 
from 
if (active.allTasksRunning()) { ... } 
to
if (active.allTasksRunning() && standby.allTasksRunning()) { ... }

Thanks,
Giridhar.

On 2019/11/15 03:09:17, Navinder Brar  wrote: 
> Hi John,
> Thanks for the response. Yeah, by "marked for deletion" I meant the unlocking 
> of the store(by which in a way it is marked for deletion). From what I have 
> seen the standby task gets stuck in Created state and doesn't move to Running 
> and is not able to recreate the directory. Also, the point is not just that. 
> With the new KIP to support serving from replicas we want to have very less 
> downtime on replicas and in this case we already have a completely built 
> state directory which is getting deleted just because of the assignment 
> change on the thread(the host is still same). We have 
> StreamsMetadataState#allMetadata() which would be common for all threads of 
> all instances. Can't we have a conditional check during unlocking which 
> checks allMetadata and finds out that the partition we are about to unlock is 
> assigned to this host(we don't care which thread of this host) and then we 
> don't unlock the task, meanwhile the Stream Thread-2 will take the lock on 
> its own when it moves to Running.
> Thanks,Navinder
> On Friday, 15 November, 2019, 02:55:40 am IST, John Roesler 
>  wrote:  
>  
>  Hey Navinder,
> 
> I think what's happening is a little different. Let's see if my
> worldview also explains your experiences.
> 
> There is no such thing as "mark for deletion". When a thread loses a
> task, it simply releases its lock on the directory. If no one else on
> the instance claims that lock within `state.cleanup.delay.ms` amount
> of milliseconds, then the state cleaner will itself grab the lock and
> delete the directory. On the other hand, if another thread (or the
> same thread) gets the task back and claims the lock before the
> cleaner, it will be able to re-open the store and use it.
> 
> The default for `state.cleanup.delay.ms` is 10 minutes, which is
> actually short enough that it could pass during a single rebalance (if
> Streams starts recovering a lot of state). I recommend you increase
> `state.cleanup.delay.ms` by a lot, like maybe set it to one hour.
> 
> One thing I'm curious about... You didn't mention if Thread-2
> eventually is able to re-create the state directory (after the cleaner
> is done) and transition to RUNNING. This should be the case. If not, I
> would consider it a bug.
> 
> Thanks,
> -John
> 
> On Thu, Nov 14, 2019 at 3:02 PM Navinder Brar
>  wrote:
> >
> > Hi,
> > We are facing a peculiar situation in the 2.3 version of Kafka Streams. 
> > First of all, I want to clarify if it is possible that a Stream Thread (say 
> > Stream Thread-1) which had got an assignment for a standby task (say 0_0) 
> > can change to Stream Thread-2 on the same host post rebalancing. The issue 
> > we are facing is this is happening for us and post rebalancing since the 
> > Stream Thread-1 had 0_0 and is not assigned back to it, it closes that task 
> > and marks it for deletion(after cleanup delay time), and meanwhile, the 
> > task gets assigned to Stream Thread-2. When the Stream Thread-2 tries to 
> > transition this task to Running, it gets a LockException which is caught in 
> > AssignedTasks#initializeNewTasks(). This makes 0_0 stay in Created state on 
> > Stream Thread-2 and after the cleanup delay is over the task directories 
> > for 0_0 get deleted.
> > Can someone please comment on this behavior.
> > Thanks,Navinder  


Re: Potential Bug in 2.3 version (leading to deletion of state directories)

2019-11-14 Thread Giridhar Addepalli
Hi John,

i see in https://github.com/apache/kafka/pull/3653
there is discussion around swallowing of LockException and retry not being 
there.
but dguy replied saying that "The retry doesn't happen in this block of code. 
It will happen the next time the runLoop executes."

but state of thread is being changed to RUNNING, hence 
updateNewAndRestoringTasks won't be called again inside runOnce of StreamThread

In TaskManager#updateNewAndRestoringTasks at the end, there is IF condition 
which checks whether all active tasks are running.

Do you we should change 
from 
if (active.allTasksRunning()) { ... } 
to
if (active.allTasksRunning() && standby.allTasksRunning()) { ... }

Thanks,
Giridhar.

On 2019/11/15 03:09:17, Navinder Brar  wrote: 
> Hi John,
> Thanks for the response. Yeah, by "marked for deletion" I meant the unlocking 
> of the store(by which in a way it is marked for deletion). From what I have 
> seen the standby task gets stuck in Created state and doesn't move to Running 
> and is not able to recreate the directory. Also, the point is not just that. 
> With the new KIP to support serving from replicas we want to have very less 
> downtime on replicas and in this case we already have a completely built 
> state directory which is getting deleted just because of the assignment 
> change on the thread(the host is still same). We have 
> StreamsMetadataState#allMetadata() which would be common for all threads of 
> all instances. Can't we have a conditional check during unlocking which 
> checks allMetadata and finds out that the partition we are about to unlock is 
> assigned to this host(we don't care which thread of this host) and then we 
> don't unlock the task, meanwhile the Stream Thread-2 will take the lock on 
> its own when it moves to Running.
> Thanks,Navinder
> On Friday, 15 November, 2019, 02:55:40 am IST, John Roesler 
>  wrote:  
>  
>  Hey Navinder,
> 
> I think what's happening is a little different. Let's see if my
> worldview also explains your experiences.
> 
> There is no such thing as "mark for deletion". When a thread loses a
> task, it simply releases its lock on the directory. If no one else on
> the instance claims that lock within `state.cleanup.delay.ms` amount
> of milliseconds, then the state cleaner will itself grab the lock and
> delete the directory. On the other hand, if another thread (or the
> same thread) gets the task back and claims the lock before the
> cleaner, it will be able to re-open the store and use it.
> 
> The default for `state.cleanup.delay.ms` is 10 minutes, which is
> actually short enough that it could pass during a single rebalance (if
> Streams starts recovering a lot of state). I recommend you increase
> `state.cleanup.delay.ms` by a lot, like maybe set it to one hour.
> 
> One thing I'm curious about... You didn't mention if Thread-2
> eventually is able to re-create the state directory (after the cleaner
> is done) and transition to RUNNING. This should be the case. If not, I
> would consider it a bug.
> 
> Thanks,
> -John
> 
> On Thu, Nov 14, 2019 at 3:02 PM Navinder Brar
>  wrote:
> >
> > Hi,
> > We are facing a peculiar situation in the 2.3 version of Kafka Streams. 
> > First of all, I want to clarify if it is possible that a Stream Thread (say 
> > Stream Thread-1) which had got an assignment for a standby task (say 0_0) 
> > can change to Stream Thread-2 on the same host post rebalancing. The issue 
> > we are facing is this is happening for us and post rebalancing since the 
> > Stream Thread-1 had 0_0 and is not assigned back to it, it closes that task 
> > and marks it for deletion(after cleanup delay time), and meanwhile, the 
> > task gets assigned to Stream Thread-2. When the Stream Thread-2 tries to 
> > transition this task to Running, it gets a LockException which is caught in 
> > AssignedTasks#initializeNewTasks(). This makes 0_0 stay in Created state on 
> > Stream Thread-2 and after the cleanup delay is over the task directories 
> > for 0_0 get deleted.
> > Can someone please comment on this behavior.
> > Thanks,Navinder  


Re: Subject: [VOTE] 2.2.2 RC2

2019-11-14 Thread Gwen Shapira
+1 (binding)

Thanks Randall. Verified signatures and tests.

On Fri, Oct 25, 2019 at 7:10 AM Randall Hauch  wrote:
>
> Hello all, we identified around three dozen bug fixes, including an update
> of a third party dependency, and wanted to release a patch release for the
> Apache Kafka 2.2.0 release.
>
> This is the *second* candidate for release of Apache Kafka 2.2.2. (RC1 did
> not include a fix for https://issues.apache.org/jira/browse/KAFKA-9053, but
> the fix appeared before RC1 was announced so it was easier to just create
> RC2.)
>
> Check out the release notes for a complete list of the changes in this
> release candidate:
> https://home.apache.org/~rhauch/kafka-2.2.2-rc2/RELEASE_NOTES.html
>
> *** Please download, test and vote by Wednesday, October 30, 9am PT>
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> https://kafka.apache.org/KEYS
>
> * Release artifacts to be voted upon (source and binary):
> https://home.apache.org/~rhauch/kafka-2.2.2-rc2/
>
> * Maven artifacts to be voted upon:
> https://repository.apache.org/content/groups/staging/org/apache/kafka/
>
> * Javadoc:
> https://home.apache.org/~rhauch/kafka-2.2.2-rc2/javadoc/
>
> * Tag to be voted upon (off 2.2 branch) is the 2.2.2 tag:
> https://github.com/apache/kafka/releases/tag/2.2.2-rc2
>
> * Documentation:
> https://kafka.apache.org/22/documentation.html
>
> * Protocol:
> https://kafka.apache.org/22/protocol.html
>
> * Successful Jenkins builds for the 2.2 branch:
> Unit/integration tests: https://builds.apache.org/job/kafka-2.2-jdk8/1/
> System tests:
> https://jenkins.confluent.io/job/system-test-kafka/job/2.2/216/
>
> /**
>
> Thanks,
>
> Randall Hauch


Re: Potential Bug in 2.3 version (leading to deletion of state directories)

2019-11-14 Thread Navinder Brar
Hi John,
Thanks for the response. Yeah, by "marked for deletion" I meant the unlocking 
of the store(by which in a way it is marked for deletion). From what I have 
seen the standby task gets stuck in Created state and doesn't move to Running 
and is not able to recreate the directory. Also, the point is not just that. 
With the new KIP to support serving from replicas we want to have very less 
downtime on replicas and in this case we already have a completely built state 
directory which is getting deleted just because of the assignment change on the 
thread(the host is still same). We have StreamsMetadataState#allMetadata() 
which would be common for all threads of all instances. Can't we have a 
conditional check during unlocking which checks allMetadata and finds out that 
the partition we are about to unlock is assigned to this host(we don't care 
which thread of this host) and then we don't unlock the task, meanwhile the 
Stream Thread-2 will take the lock on its own when it moves to Running.
Thanks,Navinder
On Friday, 15 November, 2019, 02:55:40 am IST, John Roesler 
 wrote:  
 
 Hey Navinder,

I think what's happening is a little different. Let's see if my
worldview also explains your experiences.

There is no such thing as "mark for deletion". When a thread loses a
task, it simply releases its lock on the directory. If no one else on
the instance claims that lock within `state.cleanup.delay.ms` amount
of milliseconds, then the state cleaner will itself grab the lock and
delete the directory. On the other hand, if another thread (or the
same thread) gets the task back and claims the lock before the
cleaner, it will be able to re-open the store and use it.

The default for `state.cleanup.delay.ms` is 10 minutes, which is
actually short enough that it could pass during a single rebalance (if
Streams starts recovering a lot of state). I recommend you increase
`state.cleanup.delay.ms` by a lot, like maybe set it to one hour.

One thing I'm curious about... You didn't mention if Thread-2
eventually is able to re-create the state directory (after the cleaner
is done) and transition to RUNNING. This should be the case. If not, I
would consider it a bug.

Thanks,
-John

On Thu, Nov 14, 2019 at 3:02 PM Navinder Brar
 wrote:
>
> Hi,
> We are facing a peculiar situation in the 2.3 version of Kafka Streams. First 
> of all, I want to clarify if it is possible that a Stream Thread (say Stream 
> Thread-1) which had got an assignment for a standby task (say 0_0) can change 
> to Stream Thread-2 on the same host post rebalancing. The issue we are facing 
> is this is happening for us and post rebalancing since the Stream Thread-1 
> had 0_0 and is not assigned back to it, it closes that task and marks it for 
> deletion(after cleanup delay time), and meanwhile, the task gets assigned to 
> Stream Thread-2. When the Stream Thread-2 tries to transition this task to 
> Running, it gets a LockException which is caught in 
> AssignedTasks#initializeNewTasks(). This makes 0_0 stay in Created state on 
> Stream Thread-2 and after the cleanup delay is over the task directories for 
> 0_0 get deleted.
> Can someone please comment on this behavior.
> Thanks,Navinder  

[jira] [Resolved] (KAFKA-3096) Leader is not set to -1 when it is shutdown if followers are down

2019-11-14 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-3096.

Resolution: Fixed

> Leader is not set to -1 when it is shutdown if followers are down
> -
>
> Key: KAFKA-3096
> URL: https://issues.apache.org/jira/browse/KAFKA-3096
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Major
>  Labels: reliability
>
> Assuming a cluster with 2 brokers with unclear leader election disabled:
> 1. Start brokers 0 and 1
> 2. Perform partition assignment
> 3. Broker 0 is elected leader
> 4. Produce message and wait until metadata is propagated
> 6. Shutdown follower
> 7. Produce message
> 8. Shutdown leader
> 9. Start follower
> 10. Wait for leader election
> Expected: leader is -1
> Actual: leader is 0
> We have a test for this, but a bug in `waitUntilLeaderIsElectedOrChanged` 
> means that `newLeaderOpt` is not being checked.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Build failed in Jenkins: kafka-trunk-jdk11 #956

2019-11-14 Thread Apache Jenkins Server
See 


Changes:

[bbejeck] fix typo in processor-api developer guide docs (#7689)

[bbejeck] MINOR: Add method `hasMetrics()` to class `Sensor` (#7692)

[ismael] MINOR: Clarify max.request.size and max.messsage.bytes wrt compression


--
[...truncated 5.58 MB...]
org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourcesThatMatchMultiplePattern[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldPopulateGlobalStore[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldPopulateGlobalStore[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldAllowPrePopulatingStatesStoresWithCachingEnabled[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldAllowPrePopulatingStatesStoresWithCachingEnabled[Eos enabled = false] 
PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnAllStoresNames[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnAllStoresNames[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessConsumerRecordList[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessConsumerRecordList[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSinkSpecificSerializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSinkSpecificSerializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFlushStoreForFirstInput[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFlushStoreForFirstInput[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourceThatMatchPattern[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourceThatMatchPattern[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUpdateStoreForNewKey[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUpdateStoreForNewKey[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldSendRecordViaCorrectSourceTopicDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldSendRecordViaCorrectSourceTopicDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTime[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTime[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldSetRecordMetadata[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldSetRecordMetadata[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForLargerValue[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForLargerValue[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectInMemoryStoreTypeOnly[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectInMemoryStoreTypeOnly[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldThrowForMissingTime[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldThrowForMissingTime[Eos 
enabled = false] PASSED


Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-14 Thread Guozhang Wang
10/20: I think I'm aligned with John's replies as well.

Guozhang

On Fri, Nov 15, 2019 at 1:45 AM Vinoth Chandar 
wrote:

> >during restoring state the active might have some lag
>
> Great catch, yes.. we cannot assume lag = 0 for active. Lets report active
> lag as well then. If active is too laggy, the app can then deem the store
> partition unavailable (based on what the application is willing to
> tolerate).
>
> @matthias do you agree? We can then begin the vote.
>
> On Thu, Nov 14, 2019 at 9:03 AM Navinder Brar
>  wrote:
>
> > I agree with Vinoth and John on having "allLocalStoreOffsetLags()", all
> > actives don't have 0 lag, as during restoring state the active might have
> > some lag and one of the features of this KIP is to provide an option to
> > query from active (which might be in restoring state).
> > I will update the KIP with rejected alternatives and post this will start
> > a vote if everyone agrees on this.
> > On Thursday, 14 November, 2019, 09:34:52 pm IST, John Roesler <
> > j...@confluent.io> wrote:
> >
> >  Hi all,
> >
> > Thanks for the "reset", Vinoth. It brings some needed clarity to the
> > discussion.
> >
> > 10. My 2 cents: we might as well include the lags for the active
> > copies as well. This is a more context-free API. If we only include
> > standbys, this choice won't make sense to users unless they understand
> > that the active task cannot lag in the steady state, since it's the
> > source of updates. This isn't a bad thing to realize, but it's just
> > more mental overhead for the person who wants to list the lags for
> > "all local stores".
> >
> > Another reason is that we could consider also reporting the lag for
> > actives during recovery (when they would have non-zero lag). We don't
> > have to now, but if we choose to call the method "standby lags", then
> > we can't make this choice in the future.
> >
> > That said, it's just my opinion. I'm fine either way.
> >
> > 20. Vinoth's reply makes sense to me, fwiw.
> >
> > Beyond these two points, I'm happy with the current proposal.
> >
> > Thanks again,
> > -John
> >
> > On Thu, Nov 14, 2019 at 4:48 AM Vinoth Chandar 
> > wrote:
> > >
> > > 10. I considered that. Had to pick one or the other. Can just return
> > > standby too and rename method to may be “allLocalStandbyOffsetLags()”
> to
> > > have it explicit. (Standby should implicitly convey that we are talking
> > > about stores)
> > >
> > > 20. What I meant was, we are returning HostInfo instead of
> > StreamsMetadata
> > > since thats sufficient to route query; same for “int partition “ vs
> topic
> > > partition before. Previously KeyQueryMetadata had similar structure but
> > > used StreamsMetadata and TopicPartition objects to convey same
> > information
> > >
> > > @navinder KIP is already upto date with the email I sent, except for
> the
> > > reasonings I was laying out. +1 on revisiting rejected alternatives.
> > > Please make the follow up changes
> > >
> > > On Wed, Nov 13, 2019 at 9:12 PM Matthias J. Sax  >
> > > wrote:
> > >
> > > > Thanks for the summary Vinoth!
> > > >
> > > > I buy the overall argument. Couple of clarification questions:
> > > >
> > > >
> > > > 10. Why do we need to include the active stores in
> > > > `allLocalStoreOffsetLags()`? Would it not be simpler to just return
> lag
> > > > for standbys?
> > > >
> > > >
> > > > 20: What does
> > > >
> > > > > Thin the KeyQueryMetadata object to just contain the minimum
> > information
> > > > > needed.
> > > >
> > > > exaclty mean? What is the "minimum information needed" ?
> > > >
> > > >
> > > > @Navinder: if you agree, can you update the KIP accoringly? With all
> > the
> > > > proposals, it's hard to keep track and it would be great to have the
> > > > current proposal summarized in the wiki page.
> > > >
> > > > Please also update the "Rejected alternative" sections to avoid that
> we
> > > > cycle back to old proposal (including the reason _why_ they got
> > rejected).
> > > >
> > > >
> > > > Thanks a lot!
> > > >
> > > >
> > > > -Matthias
> > > >
> > > >
> > > >
> > > > On 11/13/19 7:10 PM, Vinoth Chandar wrote:
> > > > > Given we have had a healthy discussion on this topic for a month
> now
> > and
> > > > > still with many loose ends and open ended conversations, I thought
> It
> > > > would
> > > > > be worthwhile to take a step back and re-evaluate everything in the
> > > > context
> > > > > of the very real use-case and its specific scenarios.
> > > > >
> > > > > First, let's remind ourselves of the query routing flow of the
> > streams
> > > > > application ("app" here on)
> > > > >
> > > > >1. queries get routed to any random streams instance in the
> > cluster
> > > > >("router" here on)
> > > > >2. router then uses Streams metadata to pick active/standby
> > instances
> > > > >for that key's store/partition
> > > > >3. router instance also maintains global lag information for all
> > > > stores
> > > > >and all their partitions, by a 

Re: [VOTE] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-14 Thread Guozhang Wang
+1 (binding), thanks for the KIP!

Guozhang

On Fri, Nov 15, 2019 at 4:38 AM Navinder Brar
 wrote:

> Hello all,
>
> I'd like to propose a vote for serving interactive queries during
> Rebalancing, as it is a big deal for applications looking for high
> availability. With this change, users will have control over the tradeoff
> between consistency and availability during serving.
> The full KIP is provided here:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance
>
>
> Thanks,
> Navinder



-- 
-- Guozhang


[jira] [Created] (KAFKA-9193) org.apache.kafka.common.utils.Timer should use monotonic clock

2019-11-14 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-9193:
---

 Summary: org.apache.kafka.common.utils.Timer should use monotonic 
clock
 Key: KAFKA-9193
 URL: https://issues.apache.org/jira/browse/KAFKA-9193
 Project: Kafka
  Issue Type: Bug
Reporter: Lucas Bradstreet


utils.Timer uses System.currentTimeMillis to implement blocking methods with 
timeouts. We should not rely on a non-monotonic clock and should instead switch 
this to Time.hiResClockMs() (which uses System.nanoTime).

When we do so we should revert [https://github.com/apache/kafka/pull/7683] 
which was caused by inaccuracies in our current approach (the test was good, 
the code is bad).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Potential Bug in 2.3 version (leading to deletion of state directories)

2019-11-14 Thread John Roesler
Hey Navinder,

I think what's happening is a little different. Let's see if my
worldview also explains your experiences.

There is no such thing as "mark for deletion". When a thread loses a
task, it simply releases its lock on the directory. If no one else on
the instance claims that lock within `state.cleanup.delay.ms` amount
of milliseconds, then the state cleaner will itself grab the lock and
delete the directory. On the other hand, if another thread (or the
same thread) gets the task back and claims the lock before the
cleaner, it will be able to re-open the store and use it.

The default for `state.cleanup.delay.ms` is 10 minutes, which is
actually short enough that it could pass during a single rebalance (if
Streams starts recovering a lot of state). I recommend you increase
`state.cleanup.delay.ms` by a lot, like maybe set it to one hour.

One thing I'm curious about... You didn't mention if Thread-2
eventually is able to re-create the state directory (after the cleaner
is done) and transition to RUNNING. This should be the case. If not, I
would consider it a bug.

Thanks,
-John

On Thu, Nov 14, 2019 at 3:02 PM Navinder Brar
 wrote:
>
> Hi,
> We are facing a peculiar situation in the 2.3 version of Kafka Streams. First 
> of all, I want to clarify if it is possible that a Stream Thread (say Stream 
> Thread-1) which had got an assignment for a standby task (say 0_0) can change 
> to Stream Thread-2 on the same host post rebalancing. The issue we are facing 
> is this is happening for us and post rebalancing since the Stream Thread-1 
> had 0_0 and is not assigned back to it, it closes that task and marks it for 
> deletion(after cleanup delay time), and meanwhile, the task gets assigned to 
> Stream Thread-2. When the Stream Thread-2 tries to transition this task to 
> Running, it gets a LockException which is caught in 
> AssignedTasks#initializeNewTasks(). This makes 0_0 stay in Created state on 
> Stream Thread-2 and after the cleanup delay is over the task directories for 
> 0_0 get deleted.
> Can someone please comment on this behavior.
> Thanks,Navinder


Build failed in Jenkins: kafka-trunk-jdk8 #4040

2019-11-14 Thread Apache Jenkins Server
See 


Changes:

[manikumar] MINOR: fix flaky ConsumerBounceTest.testClose

[manikumar] KAFKA-9046: Use top-level worker configs for connector admin clients


--
[...truncated 5.57 MB...]
org.apache.kafka.streams.scala.kstream.ConsumedTest > Create a Consumed with 
timestampExtractor should create a Consumed with Serdes and timestampExtractor 
STARTED

org.apache.kafka.streams.scala.kstream.ConsumedTest > Create a Consumed with 
timestampExtractor should create a Consumed with Serdes and timestampExtractor 
PASSED

org.apache.kafka.streams.scala.kstream.ConsumedTest > Create a Consumed with 
resetPolicy should create a Consumed with Serdes and resetPolicy STARTED

org.apache.kafka.streams.scala.kstream.ConsumedTest > Create a Consumed with 
resetPolicy should create a Consumed with Serdes and resetPolicy PASSED

org.apache.kafka.streams.scala.kstream.KStreamTest > filter a KStream should 
filter records satisfying the predicate STARTED

org.apache.kafka.streams.scala.kstream.KStreamTest > filter a KStream should 
filter records satisfying the predicate PASSED

org.apache.kafka.streams.scala.kstream.KStreamTest > filterNot a KStream should 
filter records not satisfying the predicate STARTED

org.apache.kafka.streams.scala.kstream.KStreamTest > filterNot a KStream should 
filter records not satisfying the predicate PASSED

org.apache.kafka.streams.scala.kstream.KStreamTest > foreach a KStream should 
run foreach actions on records STARTED

org.apache.kafka.streams.scala.kstream.KStreamTest > foreach a KStream should 
run foreach actions on records PASSED

org.apache.kafka.streams.scala.kstream.KStreamTest > peek a KStream should run 
peek actions on records STARTED

org.apache.kafka.streams.scala.kstream.KStreamTest > peek a KStream should run 
peek actions on records PASSED

org.apache.kafka.streams.scala.kstream.KStreamTest > selectKey a KStream should 
select a new key STARTED

org.apache.kafka.streams.scala.kstream.KStreamTest > selectKey a KStream should 
select a new key PASSED

org.apache.kafka.streams.scala.kstream.KStreamTest > join 2 KStreams should 
join correctly records STARTED

org.apache.kafka.streams.scala.kstream.KStreamTest > join 2 KStreams should 
join correctly records PASSED

org.apache.kafka.streams.scala.kstream.KStreamTest > transform a KStream should 
transform correctly records STARTED

org.apache.kafka.streams.scala.kstream.KStreamTest > transform a KStream should 
transform correctly records PASSED

org.apache.kafka.streams.scala.kstream.KStreamTest > flatTransform a KStream 
should flatTransform correctly records STARTED

org.apache.kafka.streams.scala.kstream.KStreamTest > flatTransform a KStream 
should flatTransform correctly records PASSED

org.apache.kafka.streams.scala.kstream.KStreamTest > flatTransformValues a 
KStream should correctly flatTransform values in records STARTED

org.apache.kafka.streams.scala.kstream.KStreamTest > flatTransformValues a 
KStream should correctly flatTransform values in records PASSED

org.apache.kafka.streams.scala.kstream.KStreamTest > flatTransformValues with 
key in a KStream should correctly flatTransformValues in records STARTED

org.apache.kafka.streams.scala.kstream.KStreamTest > flatTransformValues with 
key in a KStream should correctly flatTransformValues in records PASSED

org.apache.kafka.streams.scala.kstream.JoinedTest > Create a Joined should 
create a Joined with Serdes STARTED

org.apache.kafka.streams.scala.kstream.JoinedTest > Create a Joined should 
create a Joined with Serdes PASSED

org.apache.kafka.streams.scala.kstream.JoinedTest > Create a Joined should 
create a Joined with Serdes and repartition topic name STARTED

org.apache.kafka.streams.scala.kstream.JoinedTest > Create a Joined should 
create a Joined with Serdes and repartition topic name PASSED

org.apache.kafka.streams.scala.kstream.StreamJoinedTest > Create a StreamJoined 
should create a StreamJoined with Serdes STARTED

org.apache.kafka.streams.scala.kstream.StreamJoinedTest > Create a StreamJoined 
should create a StreamJoined with Serdes PASSED

org.apache.kafka.streams.scala.kstream.StreamJoinedTest > Create a StreamJoined 
should create a StreamJoined with Serdes and Store Suppliers STARTED

org.apache.kafka.streams.scala.kstream.StreamJoinedTest > Create a StreamJoined 
should create a StreamJoined with Serdes and Store Suppliers PASSED

org.apache.kafka.streams.scala.kstream.StreamJoinedTest > Create a StreamJoined 
should create a StreamJoined with Serdes and a State Store name STARTED

org.apache.kafka.streams.scala.kstream.StreamJoinedTest > Create a StreamJoined 
should create a StreamJoined with Serdes and a State Store name PASSED

org.apache.kafka.streams.scala.kstream.ProducedTest > Create a Produced should 
create a Produced with Serdes STARTED

org.apache.kafka.streams.scala.kstream.ProducedTest > Create a Produced 

Potential Bug in 2.3 version (leading to deletion of state directories)

2019-11-14 Thread Navinder Brar
Hi,
We are facing a peculiar situation in the 2.3 version of Kafka Streams. First 
of all, I want to clarify if it is possible that a Stream Thread (say Stream 
Thread-1) which had got an assignment for a standby task (say 0_0) can change 
to Stream Thread-2 on the same host post rebalancing. The issue we are facing 
is this is happening for us and post rebalancing since the Stream Thread-1 had 
0_0 and is not assigned back to it, it closes that task and marks it for 
deletion(after cleanup delay time), and meanwhile, the task gets assigned to 
Stream Thread-2. When the Stream Thread-2 tries to transition this task to 
Running, it gets a LockException which is caught in 
AssignedTasks#initializeNewTasks(). This makes 0_0 stay in Created state on 
Stream Thread-2 and after the cleanup delay is over the task directories for 
0_0 get deleted.
Can someone please comment on this behavior.
Thanks,Navinder

[jira] [Created] (KAFKA-9192) NullPointerException if field in schema not present in value

2019-11-14 Thread Mark Tinsley (Jira)
Mark Tinsley created KAFKA-9192:
---

 Summary: NullPointerException if field in schema not present in 
value
 Key: KAFKA-9192
 URL: https://issues.apache.org/jira/browse/KAFKA-9192
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.2.1
Reporter: Mark Tinsley


Given a message:
{code:java}
{
   "schema":{
  "type":"struct",
  "fields":[
 {
"type":"string",
"optional":true,
"field":"abc"
 }
  ],
  "optional":false,
  "name":"foobar"
   },
   "payload":{
   }
}
{code}


I would expect, given the field is optional, for the JsonConverter to still 
process this value. 

What happens is I get a null pointer exception, the stacktrace points to this 
line: 
https://github.com/apache/kafka/blob/2.1/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L701
 called by 
https://github.com/apache/kafka/blob/2.1/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L181

Issue seems to be that we need to check and see if the jsonValue is null before 
checking if the jsonValue has a null value.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[VOTE] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-14 Thread Navinder Brar
Hello all,

I'd like to propose a vote for serving interactive queries during Rebalancing, 
as it is a big deal for applications looking for high availability. With this 
change, users will have control over the tradeoff between consistency and 
availability during serving.
The full KIP is provided here:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance


Thanks,
Navinder

[jira] [Created] (KAFKA-9191) Kafka throughput suffers substantially when scaling topic partitions with small messages

2019-11-14 Thread Chris Pettitt (Jira)
Chris Pettitt created KAFKA-9191:


 Summary: Kafka throughput suffers substantially when scaling topic 
partitions with small messages
 Key: KAFKA-9191
 URL: https://issues.apache.org/jira/browse/KAFKA-9191
 Project: Kafka
  Issue Type: Bug
Reporter: Chris Pettitt


We have observed, using two entirely different tools, that a simple Kafka 
application (read 1 topic and immediately produce to another) suffers 
substantial performance degradation when scaling up topics. Below is the output 
of one of these tools, showing that going from 1 partition to 1000 partitions 
yields a ~30% throughput decrease when messages are 100 bytes long.

Using the same two tools, we observed that increasing the message size to 512 
bytes yields a throughput increase of ~20% going from 1 topic partition to 1000 
topic partitions with all other variables held constant.

 
|Kafka Core Testing| | | | | | | | | | |
|Enable Transaction|Batch Size (b)|Linger (ms)|Max Inflight|Commit Interval 
(ms)|Num Records|Record Size (b)|# Input Topics|1 Partition MB/s|1000 
Partitions MB/s|MB/s delta|
|FALSE|16384|100|5|1000|2000|100|1|45.633625|31.482193|-31.01%|
|FALSE|16384|100|5|1000|2000|512|1|70.217902|85.319107|21.51%|



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Build failed in Jenkins: kafka-trunk-jdk11 #955

2019-11-14 Thread Apache Jenkins Server
See 


Changes:

[manikumar] MINOR: fix flaky ConsumerBounceTest.testClose

[manikumar] KAFKA-9046: Use top-level worker configs for connector admin clients


--
[...truncated 5.57 MB...]

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializersDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfEvenTimeAdvances[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfEvenTimeAdvances[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldInitProcessor[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldInitProcessor[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowForUnknownTopic[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowForUnknownTopic[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnStreamsTime[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnStreamsTime[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfInMemoryBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfInMemoryBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourcesThatMatchMultiplePattern[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourcesThatMatchMultiplePattern[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldPopulateGlobalStore[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldPopulateGlobalStore[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldAllowPrePopulatingStatesStoresWithCachingEnabled[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldAllowPrePopulatingStatesStoresWithCachingEnabled[Eos enabled = false] 
PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnAllStoresNames[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnAllStoresNames[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessConsumerRecordList[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessConsumerRecordList[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSinkSpecificSerializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSinkSpecificSerializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFlushStoreForFirstInput[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFlushStoreForFirstInput[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourceThatMatchPattern[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourceThatMatchPattern[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUpdateStoreForNewKey[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUpdateStoreForNewKey[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 

Re: Preliminary blog post about the Apache Kafka 2.4.0 release

2019-11-14 Thread Chris Egerton
Hi Manikumar,

It looks like the header for KIP-440 is accurate ("KIP-440: Extend Connect
Converter to support headers") but the content appears to correspond to
KIP-481 ("SerDe Improvements for Connect Decimal type in JSON") instead.
Could we double-check and make sure that the summary for KIP-440 matches
what was contributed for it (and it nothing was, alter the summary to more
closely reflect what KIP-440 accomplished)?

Cheers,

Chris

On Thu, Nov 14, 2019 at 10:41 AM Manikumar 
wrote:

> Hi all,
>
> I've prepared a preliminary blog post about the upcoming Apache Kafka 2.4.0
> release.
> Please take a look and let me know if you want to add/modify details.
> Thanks to all who contributed to this blog post.
>
> https://blogs.apache.org/preview/kafka/?previewEntry=what-s-new-in-apache1
>
> Thanks,
> Manikumar
>


Preliminary blog post about the Apache Kafka 2.4.0 release

2019-11-14 Thread Manikumar
Hi all,

I've prepared a preliminary blog post about the upcoming Apache Kafka 2.4.0
release.
Please take a look and let me know if you want to add/modify details.
Thanks to all who contributed to this blog post.

https://blogs.apache.org/preview/kafka/?previewEntry=what-s-new-in-apache1

Thanks,
Manikumar


[VOTE] 2.4.0 RC0

2019-11-14 Thread Manikumar
Hello Kafka users, developers and client-developers,

This is the first candidate for release of Apache Kafka 2.4.0.
There is work in progress for couple blockers PRs. I am publishing RC0 to
avoid further delays in testing the release.

This release includes many new features, including:
- Allow consumers to fetch from closest replica
- Support for incremental cooperative rebalancing to the consumer rebalance
protocol
- MirrorMaker 2.0 (MM2), a new multi-cluster, cross-datacenter replication
engine
- New Java authorizer Interface
- Support for  non-key joining in KTable
- Administrative API for replica reassignment
- Sticky partitioner
- Return topic metadata and configs in CreateTopics response
- Securing Internal connect REST endpoints
- API to delete consumer offsets and expose it via the AdminClient.

Release notes for the 2.4.0 release:
https://home.apache.org/~manikumar/kafka-2.4.0-rc0/RELEASE_NOTES.html

*** Please download, test  by  Thursday, November 20, 9am PT

Kafka's KEYS file containing PGP keys we use to sign the release:
https://kafka.apache.org/KEYS

* Release artifacts to be voted upon (source and binary):
https://home.apache.org/~manikumar/kafka-2.4.0-rc0/

* Maven artifacts to be voted upon:
https://repository.apache.org/content/groups/staging/org/apache/kafka/

* Javadoc:
https://home.apache.org/~manikumar/kafka-2.4.0-rc0/javadoc/

* Tag to be voted upon (off 2.4 branch) is the 2.4.0 tag:
https://github.com/apache/kafka/releases/tag/2.4.0-rc0

* Documentation:
https://kafka.apache.org/24/documentation.html

* Protocol:
https://kafka.apache.org/24/protocol.html

Thanks,
Manikumar


[jira] [Resolved] (KAFKA-9044) Brokers occasionally (randomly?) dropping out of clusters

2019-11-14 Thread Peter Bukowinski (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Peter Bukowinski resolved KAFKA-9044.
-
Resolution: Not A Bug

Issue traced to external cause.

> Brokers occasionally (randomly?) dropping out of clusters
> -
>
> Key: KAFKA-9044
> URL: https://issues.apache.org/jira/browse/KAFKA-9044
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.3.0, 2.3.1
> Environment: Ubuntu 14.04
>Reporter: Peter Bukowinski
>Priority: Major
>
> I have several cluster running kafka 2.3.1 and 2.3.0 and this issue has 
> affected all of them. Because of replication and the size of the clusters (30 
> brokers), this bug is not causing any data loss, but it is nevertheless 
> concerning. When a broker drops out, the log gives no indication that there 
> are any zookeeper issues (and indeed the zookeepers are healthy when this 
> occurs. Here's snippet from a broker log when it occurs:
> {{[2019-10-07 11:02:27,630] INFO [GroupMetadataManager brokerId=14] Removed 0 
> expired offsets in 0 milliseconds. 
> (kafka.coordinator.group.GroupMetadataManager)}}
>  {{[2019-10-07 11:02:56,936] INFO [Log partition=internal_test-52, 
> dir=/data/3/kl] Found deletable segments with base offsets [1975332] due to 
> retention time 360ms breach (kafka.log.Log)}}
>  {{[2019-10-07 11:02:56,936] INFO [Log partition=internal_test-52, 
> dir=/data/3/kl] Scheduling log segment [baseOffset 1975332, size 92076008] 
> for deletion. (kafka.log.Log)}}
>  {{[2019-10-07 11:02:56,936] INFO [Log partition=internal_test-52, 
> dir=/data/3/kl] Incrementing log start offset to 2000317 (kafka.log.Log)}}
>  {{[2019-10-07 11:03:56,936] INFO [Log partition=internal_test-52, 
> dir=/data/3/kl] Deleting segment 1975332 (kafka.log.Log)}}
>  {{[2019-10-07 11:03:56,957] INFO Deleted log 
> /data/3/kl/internal_test-52/01975332.log.deleted. 
> (kafka.log.LogSegment)}}
>  {{[2019-10-07 11:03:56,957] INFO Deleted offset index 
> /data/3/kl/internal_test-52/01975332.index.deleted. 
> (kafka.log.LogSegment)}}
>  {{[2019-10-07 11:03:56,958] INFO Deleted time index 
> /data/3/kl/internal_test-52/01975332.timeindex.deleted. 
> (kafka.log.LogSegment)}}
>  {{[2019-10-07 11:12:27,630] INFO [GroupMetadataManager brokerId=14] Removed 
> 0 expired offsets in 0 milliseconds. 
> (kafka.coordinator.group.GroupMetadataManager)}}
>  {{[2019-10-07 11:22:27,629] INFO [GroupMetadataManager brokerId=14] Removed 
> 0 expired offsets in 0 milliseconds. 
> (kafka.coordinator.group.GroupMetadataManager)}}
>  {{[2019-10-07 11:32:27,629] INFO [GroupMetadataManager brokerId=14] Removed 
> 0 expired offsets in 0 milliseconds. 
> (kafka.coordinator.group.GroupMetadataManager)}}
>  {{[2019-10-07 11:42:27,629] INFO [GroupMetadataManager brokerId=14] Removed 
> 0 expired offsets in 0 milliseconds. 
> (kafka.coordinator.group.GroupMetadataManager)}}
>  {{[2019-10-07 11:52:27,629] INFO [GroupMetadataManager brokerId=14] Removed 
> 0 expired offsets in 0 milliseconds. 
> (kafka.coordinator.group.GroupMetadataManager)}}
>  {{[2019-10-07 12:02:27,629] INFO [GroupMetadataManager brokerId=14] Removed 
> 0 expired offsets in 0 milliseconds. 
> (kafka.coordinator.group.GroupMetadataManager)}}
>  {{[2019-10-07 12:12:27,630] INFO [GroupMetadataManager brokerId=14] Removed 
> 0 expired offsets in 0 milliseconds. 
> (kafka.coordinator.group.GroupMetadataManager)}}
>  {{[2019-10-07 12:22:27,629] INFO [GroupMetadataManager brokerId=14] Removed 
> 0 expired offsets in 0 milliseconds. 
> (kafka.coordinator.group.GroupMetadataManager)}}
>  {{[2019-10-07 12:32:27,629] INFO [GroupMetadataManager brokerId=14] Removed 
> 0 expired offsets in 0 milliseconds. 
> (kafka.coordinator.group.GroupMetadataManager)}}
>  {{[2019-10-07 12:42:27,630] INFO [GroupMetadataManager brokerId=14] Removed 
> 0 expired offsets in 1 milliseconds. 
> (kafka.coordinator.group.GroupMetadataManager)}}
>  {{[2019-10-07 12:52:27,629] INFO [GroupMetadataManager brokerId=14] Removed 
> 0 expired offsets in 0 milliseconds. 
> (kafka.coordinator.group.GroupMetadataManager)}}
>  {{[2019-10-07 13:02:27,630] INFO [GroupMetadataManager brokerId=14] Removed 
> 0 expired offsets in 0 milliseconds. 
> (kafka.coordinator.group.GroupMetadataManager)}}
>  {{[2019-10-07 13:12:27,629] INFO [GroupMetadataManager brokerId=14] Removed 
> 0 expired offsets in 0 milliseconds. 
> (kafka.coordinator.group.GroupMetadataManager)}}
>  {{[2019-10-07 13:22:27,629] INFO [GroupMetadataManager brokerId=14] Removed 
> 0 expired offsets in 0 milliseconds. 
> (kafka.coordinator.group.GroupMetadataManager)}}
>  {{[2019-10-07 13:32:27,629] INFO [GroupMetadataManager brokerId=14] Removed 
> 0 expired offsets in 0 milliseconds. 
> 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-14 Thread Vinoth Chandar
>during restoring state the active might have some lag

Great catch, yes.. we cannot assume lag = 0 for active. Lets report active
lag as well then. If active is too laggy, the app can then deem the store
partition unavailable (based on what the application is willing to
tolerate).

@matthias do you agree? We can then begin the vote.

On Thu, Nov 14, 2019 at 9:03 AM Navinder Brar
 wrote:

> I agree with Vinoth and John on having "allLocalStoreOffsetLags()", all
> actives don't have 0 lag, as during restoring state the active might have
> some lag and one of the features of this KIP is to provide an option to
> query from active (which might be in restoring state).
> I will update the KIP with rejected alternatives and post this will start
> a vote if everyone agrees on this.
> On Thursday, 14 November, 2019, 09:34:52 pm IST, John Roesler <
> j...@confluent.io> wrote:
>
>  Hi all,
>
> Thanks for the "reset", Vinoth. It brings some needed clarity to the
> discussion.
>
> 10. My 2 cents: we might as well include the lags for the active
> copies as well. This is a more context-free API. If we only include
> standbys, this choice won't make sense to users unless they understand
> that the active task cannot lag in the steady state, since it's the
> source of updates. This isn't a bad thing to realize, but it's just
> more mental overhead for the person who wants to list the lags for
> "all local stores".
>
> Another reason is that we could consider also reporting the lag for
> actives during recovery (when they would have non-zero lag). We don't
> have to now, but if we choose to call the method "standby lags", then
> we can't make this choice in the future.
>
> That said, it's just my opinion. I'm fine either way.
>
> 20. Vinoth's reply makes sense to me, fwiw.
>
> Beyond these two points, I'm happy with the current proposal.
>
> Thanks again,
> -John
>
> On Thu, Nov 14, 2019 at 4:48 AM Vinoth Chandar 
> wrote:
> >
> > 10. I considered that. Had to pick one or the other. Can just return
> > standby too and rename method to may be “allLocalStandbyOffsetLags()” to
> > have it explicit. (Standby should implicitly convey that we are talking
> > about stores)
> >
> > 20. What I meant was, we are returning HostInfo instead of
> StreamsMetadata
> > since thats sufficient to route query; same for “int partition “ vs topic
> > partition before. Previously KeyQueryMetadata had similar structure but
> > used StreamsMetadata and TopicPartition objects to convey same
> information
> >
> > @navinder KIP is already upto date with the email I sent, except for the
> > reasonings I was laying out. +1 on revisiting rejected alternatives.
> > Please make the follow up changes
> >
> > On Wed, Nov 13, 2019 at 9:12 PM Matthias J. Sax 
> > wrote:
> >
> > > Thanks for the summary Vinoth!
> > >
> > > I buy the overall argument. Couple of clarification questions:
> > >
> > >
> > > 10. Why do we need to include the active stores in
> > > `allLocalStoreOffsetLags()`? Would it not be simpler to just return lag
> > > for standbys?
> > >
> > >
> > > 20: What does
> > >
> > > > Thin the KeyQueryMetadata object to just contain the minimum
> information
> > > > needed.
> > >
> > > exaclty mean? What is the "minimum information needed" ?
> > >
> > >
> > > @Navinder: if you agree, can you update the KIP accoringly? With all
> the
> > > proposals, it's hard to keep track and it would be great to have the
> > > current proposal summarized in the wiki page.
> > >
> > > Please also update the "Rejected alternative" sections to avoid that we
> > > cycle back to old proposal (including the reason _why_ they got
> rejected).
> > >
> > >
> > > Thanks a lot!
> > >
> > >
> > > -Matthias
> > >
> > >
> > >
> > > On 11/13/19 7:10 PM, Vinoth Chandar wrote:
> > > > Given we have had a healthy discussion on this topic for a month now
> and
> > > > still with many loose ends and open ended conversations, I thought It
> > > would
> > > > be worthwhile to take a step back and re-evaluate everything in the
> > > context
> > > > of the very real use-case and its specific scenarios.
> > > >
> > > > First, let's remind ourselves of the query routing flow of the
> streams
> > > > application ("app" here on)
> > > >
> > > >1. queries get routed to any random streams instance in the
> cluster
> > > >("router" here on)
> > > >2. router then uses Streams metadata to pick active/standby
> instances
> > > >for that key's store/partition
> > > >3. router instance also maintains global lag information for all
> > > stores
> > > >and all their partitions, by a gossip/broadcast/heartbeat protocol
> > > (done
> > > >outside of Streams framework), but using
> KafkaStreams#allMetadata()
> > > for
> > > >streams instance discovery.
> > > >4. router then uses information in 2 & 3 to determine which
> instance
> > > to
> > > >send the query to  : always picks active instance if alive or the
> most
> > > >in-sync live standby otherwise.
> 

[jira] [Created] (KAFKA-9190) Server leaves connections with expired authentication sessions open

2019-11-14 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-9190:
--

 Summary: Server leaves connections with expired authentication 
sessions open 
 Key: KAFKA-9190
 URL: https://issues.apache.org/jira/browse/KAFKA-9190
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Jason Gustafson


SocketServer implements some logic to disconnect connections which have expired 
authentication sessions. At the moment, we just call `SelectionKey.cancel` in 
order to trigger this disconnect. I think the expectation is that this causes 
the channel to be closed on the next `poll`, but as far as I can tell, all it 
does is disassociate the selection key from the selector. This means that the 
key never gets selected again and we never close the connection until the 
client times out.

This was found when debugging the flaky test failure 
`EndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl`.
 I modified the code to call `Selector.close` instead of 
`TransportLayer.disconnect`. I was able to reproduce the session authentication 
expiration, but the connection properly closes and the test does no longer 
times out.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-14 Thread Navinder Brar
I agree with Vinoth and John on having "allLocalStoreOffsetLags()", all actives 
don't have 0 lag, as during restoring state the active might have some lag and 
one of the features of this KIP is to provide an option to query from active 
(which might be in restoring state). 
I will update the KIP with rejected alternatives and post this will start a 
vote if everyone agrees on this.
On Thursday, 14 November, 2019, 09:34:52 pm IST, John Roesler 
 wrote:  
 
 Hi all,

Thanks for the "reset", Vinoth. It brings some needed clarity to the discussion.

10. My 2 cents: we might as well include the lags for the active
copies as well. This is a more context-free API. If we only include
standbys, this choice won't make sense to users unless they understand
that the active task cannot lag in the steady state, since it's the
source of updates. This isn't a bad thing to realize, but it's just
more mental overhead for the person who wants to list the lags for
"all local stores".

Another reason is that we could consider also reporting the lag for
actives during recovery (when they would have non-zero lag). We don't
have to now, but if we choose to call the method "standby lags", then
we can't make this choice in the future.

That said, it's just my opinion. I'm fine either way.

20. Vinoth's reply makes sense to me, fwiw.

Beyond these two points, I'm happy with the current proposal.

Thanks again,
-John

On Thu, Nov 14, 2019 at 4:48 AM Vinoth Chandar  wrote:
>
> 10. I considered that. Had to pick one or the other. Can just return
> standby too and rename method to may be “allLocalStandbyOffsetLags()” to
> have it explicit. (Standby should implicitly convey that we are talking
> about stores)
>
> 20. What I meant was, we are returning HostInfo instead of StreamsMetadata
> since thats sufficient to route query; same for “int partition “ vs topic
> partition before. Previously KeyQueryMetadata had similar structure but
> used StreamsMetadata and TopicPartition objects to convey same information
>
> @navinder KIP is already upto date with the email I sent, except for the
> reasonings I was laying out. +1 on revisiting rejected alternatives.
> Please make the follow up changes
>
> On Wed, Nov 13, 2019 at 9:12 PM Matthias J. Sax 
> wrote:
>
> > Thanks for the summary Vinoth!
> >
> > I buy the overall argument. Couple of clarification questions:
> >
> >
> > 10. Why do we need to include the active stores in
> > `allLocalStoreOffsetLags()`? Would it not be simpler to just return lag
> > for standbys?
> >
> >
> > 20: What does
> >
> > > Thin the KeyQueryMetadata object to just contain the minimum information
> > > needed.
> >
> > exaclty mean? What is the "minimum information needed" ?
> >
> >
> > @Navinder: if you agree, can you update the KIP accoringly? With all the
> > proposals, it's hard to keep track and it would be great to have the
> > current proposal summarized in the wiki page.
> >
> > Please also update the "Rejected alternative" sections to avoid that we
> > cycle back to old proposal (including the reason _why_ they got rejected).
> >
> >
> > Thanks a lot!
> >
> >
> > -Matthias
> >
> >
> >
> > On 11/13/19 7:10 PM, Vinoth Chandar wrote:
> > > Given we have had a healthy discussion on this topic for a month now and
> > > still with many loose ends and open ended conversations, I thought It
> > would
> > > be worthwhile to take a step back and re-evaluate everything in the
> > context
> > > of the very real use-case and its specific scenarios.
> > >
> > > First, let's remind ourselves of the query routing flow of the streams
> > > application ("app" here on)
> > >
> > >    1. queries get routed to any random streams instance in the cluster
> > >    ("router" here on)
> > >    2. router then uses Streams metadata to pick active/standby instances
> > >    for that key's store/partition
> > >    3. router instance also maintains global lag information for all
> > stores
> > >    and all their partitions, by a gossip/broadcast/heartbeat protocol
> > (done
> > >    outside of Streams framework), but using KafkaStreams#allMetadata()
> > for
> > >    streams instance discovery.
> > >    4. router then uses information in 2 & 3 to determine which instance
> > to
> > >    send the query to  : always picks active instance if alive or the most
> > >    in-sync live standby otherwise.
> > >
> > > Few things to note :
> > >
> > > A) We choose to decouple how the lag information is obtained (control
> > > plane) from query path (data plane), since that provides more flexibility
> > > in designing the control plane. i.e pick any or combination of gossip,
> > > N-way broadcast, control the rate of propagation, piggybacking on request
> > > responses
> > > B) Since the app needs to do its own control plane, talking to other
> > > instances directly for failure detection & exchanging other metadata, we
> > > can leave the lag APIs added to KafkaStreams class itself local and
> > simply
> > > return lag for all store/partitions 

[jira] [Created] (KAFKA-9189) Shutdown is blocked if connection to Zookeeper is lost

2019-11-14 Thread Boris Granveaud (Jira)
Boris Granveaud created KAFKA-9189:
--

 Summary: Shutdown is blocked if connection to Zookeeper is lost
 Key: KAFKA-9189
 URL: https://issues.apache.org/jira/browse/KAFKA-9189
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.3.0
 Environment: Linux, Docker 19.03.4
Reporter: Boris Granveaud


We are using Kafka and Zookeeper in Docker swarm stacks. When we undeploy a 
stack, sometimes Kafka doesn't shutdown properly and is finally killed by 
Docker (thus leaving potentially corrupted files).

Here are the steps to reproduce (simple Docker, no swarm):

 
{code:java}
docker network create test
docker run -d --network test --name zk --rm zookeeper:3.5.6
docker run --network test --name kf --rm -e "KAFKA_ZOOKEEPER_CONNECT=zk:2181" 
-e "KAFKA_ADVERTISED_LISTENERS=INSIDE://:9091" -e 
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT" -e 
"KAFKA_INTER_BROKER_LISTENER_NAME=INSIDE" confluentinc/cp-kafka:5.3.1
{code}
 

In another shell:

 
{code:java}
docker stop zk
docker stop kf
{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9188) Flaky Test SslAdminClientIntegrationTest.testSynchronousAuthorizerAclUpdatesBlockRequestThreads

2019-11-14 Thread Bill Bejeck (Jira)
Bill Bejeck created KAFKA-9188:
--

 Summary: Flaky Test 
SslAdminClientIntegrationTest.testSynchronousAuthorizerAclUpdatesBlockRequestThreads
 Key: KAFKA-9188
 URL: https://issues.apache.org/jira/browse/KAFKA-9188
 Project: Kafka
  Issue Type: Test
  Components: core
Reporter: Bill Bejeck


Failed in 
[https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/9373/testReport/junit/kafka.api/SslAdminClientIntegrationTest/testSynchronousAuthorizerAclUpdatesBlockRequestThreads/]

 
{noformat}
Error Messagejava.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TimeoutException: Aborted due to 
timeout.Stacktracejava.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TimeoutException: Aborted due to timeout.
at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at 
kafka.api.SslAdminClientIntegrationTest.$anonfun$testSynchronousAuthorizerAclUpdatesBlockRequestThreads$1(SslAdminClientIntegrationTest.scala:201)
at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at 
kafka.api.SslAdminClientIntegrationTest.testSynchronousAuthorizerAclUpdatesBlockRequestThreads(SslAdminClientIntegrationTest.scala:201)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.common.errors.TimeoutException: Aborted due to 
timeout.
Standard Output[2019-11-14 15:13:51,489] ERROR [ReplicaFetcher replicaId=1, 
leaderId=2, fetcherId=0] Error for partition mytopic1-1 at offset 0 
(kafka.server.ReplicaFetcherThread:76)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition.
[2019-11-14 15:13:51,490] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
fetcherId=0] Error for partition mytopic1-0 at offset 0 
(kafka.server.ReplicaFetcherThread:76)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition.
[2019-11-14 15:14:04,686] ERROR [KafkaApi-2] Error when handling request: 
clientId=adminclient-644, correlationId=4, api=CREATE_ACLS, version=1, 
body={creations=[{resource_type=2,resource_name=foobar,resource_pattern_type=3,principal=User:ANONYMOUS,host=*,operation=3,permission_type=3},{resource_type=5,resource_name=transactional_id,resource_pattern_type=3,principal=User:ANONYMOUS,host=*,operation=4,permission_type=3}]}
 (kafka.server.KafkaApis:76)
org.apache.kafka.common.errors.ClusterAuthorizationException: Request 
Request(processor=1, connectionId=127.0.0.1:41993-127.0.0.1:34770-0, 
session=Session(User:ANONYMOUS,/127.0.0.1), listenerName=ListenerName(SSL), 
securityProtocol=SSL, buffer=null) is not authorized.
[2019-11-14 15:14:04,689] ERROR [KafkaApi-2] Error when handling request: 
clientId=adminclient-644, correlationId=5, api=DELETE_ACLS, version=1, 
body={filters=[{resource_type=2,resource_name=foobar,resource_pattern_type_filter=3,principal=User:ANONYMOUS,host=*,operation=3,permission_type=3},{resource_type=5,resource_name=transactional_id,resource_pattern_type_filter=3,principal=User:ANONYMOUS,host=*,operation=4,permission_type=3}]}
 (kafka.server.KafkaApis:76)

Re: [DISCUSS] KIP-221: Repartition Topic Hints in Streams

2019-11-14 Thread John Roesler
Thanks, all,

I can get behind just totally leaving out reparation-via-groupBy. If
we only introduce `repartition()` for now, we're making the minimal
change to gain the desired capability.

Plus, since we agree that `repartition()` should never be optimizable,
it's a future-compatible proposal. I.e., if we were to add a
non-optimizable groupBy(partitions) operation now, and want to make it
optimizable in the future, we have to worry about topology
compatibility. Better to just do non-optimizable `repartition()` now,
and add an optimizable `groupBy(partitions)` in the future (maybe).

About joins, yes, it's a concern, and IMO we should just do the same
thing we do now... check at runtime that the partition counts on both
sides match and throw an exception otherwise. What this means as a
user is that if you explicitly repartition the left side to 100
partitions, and then join with the right side at 10 partitions, you
get an exception, since this operation is not possible. You'd either
have to "step down" the left side again, back to 10 partitions, or you
could repartition the right side to 100 partitions before the join.
The choice has to be the user's, since it depends on their desired
execution parallelism.

Thanks,
-John

On Thu, Nov 14, 2019 at 12:55 AM Matthias J. Sax  wrote:
>
> Thanks a lot John. I think the way you decompose the operators is super
> helpful for this discussion.
>
> What you suggest with regard to using `Grouped` and enforcing
> repartitioning if the number of partitions is specified is certainly
> possible. However, I am not sure if we _should_ do this. My reasoning is
> that an enforce repartitioning as introduced via `repartition()` is an
> expensive operations, and it seems better to demand an more explicit
> user opt-in to trigger it. Just setting an optional parameter might be
> too subtle to trigger such a heavy "side effect".
>
> While I agree about "usability" in general, I would prefer a more
> conservative appraoch to introduce this feature, see how it goes, and
> maybe make it more advance later on. This also applies to what
> optimzation we may or may not allow (or are able to perform at all).
>
> @Levani: Reflecting about my suggestion about `Repartioned extends
> Grouped`, I agree that it might not be a good idea.
>
> Atm, I see an enforces repartitioning as non-optimizable and as a good
> first step and I would suggest to not intoruce anything else for now.
> Introducing optimizable enforce repartitioning via `groupBy(...,
> Grouped)` is something we could add later.
>
>
> Therefore, I would not change `Grouped` but only introduce
> `repartition()`. Users that use `grouBy()` atm, and want to opt-in to
> set the number of partitions, would need to rewrite their code to
> `selectKey(...).repartition(...).groupByKey()`. It's less convinient but
> also less risky from an API and optimization point of view.
>
>
> @Levani: about joins -> yes, we will need to check the specified number
> of partitions (if any) and if they don't match, throw an exception. We
> can discuss this on the PR -- I am just trying to get the PR for KIP-466
> merged -- your is next on the list :)
>
>
> Thoughts?
>
>
> -Matthias
>
>
> On 11/12/19 4:51 PM, Levani Kokhreidze wrote:
> > Thank you all for an interesting discussion. This is very enlightening.
> >
> > Thank you Matthias for your explanation. Your arguments are very true. It 
> > makes sense that if user specifies number of partitions he/she really cares 
> > that those specifications are applied to internal topics.
> > Unfortunately, in current implementation this is not true during `join` 
> > operation. As I’ve written in the PR comment, currently, when `Stream#join` 
> > is used, `CopartitionedTopicsEnforcer` chooses max number of partitions 
> > from the two source topics.
> > I’m not really sure what would be the other way around this situation. 
> > Maybe fail the stream altogether and inform the user to specify same number 
> > of partitions?
> > Or we should treat join operations in a same way as it is right now and 
> > basically choose max number of partitions even when `repartition` operation 
> > is specified, because Kafka Streams “knows the best” how to handle joins?
> > You can check integration tests how it’s being handled currently. Open to 
> > suggestions on that part.
> >
> > As for groupBy, I agree and John raised very interesting points. My 
> > arguments for allowing users to specify number of partitions during groupBy 
> > operations mainly was coming from the usability perspective.
> > So building on top of what John said, maybe it makes sense to make 
> > `groupBy` operations smarter and whenever user specifies 
> > `numberOfPartitions` configuration, repartitioning will be enforced, wdyt?
> > I’m not going into optimization part yet :) I think it will be part of 
> > separate PR and task, but overall it makes sense to apply optimizations 
> > where number of partitions are the same.
> >
> > As for Repartitioned 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-14 Thread John Roesler
Hi all,

Thanks for the "reset", Vinoth. It brings some needed clarity to the discussion.

10. My 2 cents: we might as well include the lags for the active
copies as well. This is a more context-free API. If we only include
standbys, this choice won't make sense to users unless they understand
that the active task cannot lag in the steady state, since it's the
source of updates. This isn't a bad thing to realize, but it's just
more mental overhead for the person who wants to list the lags for
"all local stores".

Another reason is that we could consider also reporting the lag for
actives during recovery (when they would have non-zero lag). We don't
have to now, but if we choose to call the method "standby lags", then
we can't make this choice in the future.

That said, it's just my opinion. I'm fine either way.

20. Vinoth's reply makes sense to me, fwiw.

Beyond these two points, I'm happy with the current proposal.

Thanks again,
-John

On Thu, Nov 14, 2019 at 4:48 AM Vinoth Chandar  wrote:
>
> 10. I considered that. Had to pick one or the other. Can just return
> standby too and rename method to may be “allLocalStandbyOffsetLags()” to
> have it explicit. (Standby should implicitly convey that we are talking
> about stores)
>
> 20. What I meant was, we are returning HostInfo instead of StreamsMetadata
> since thats sufficient to route query; same for “int partition “ vs topic
> partition before. Previously KeyQueryMetadata had similar structure but
> used StreamsMetadata and TopicPartition objects to convey same information
>
> @navinder KIP is already upto date with the email I sent, except for the
> reasonings I was laying out. +1 on revisiting rejected alternatives.
> Please make the follow up changes
>
> On Wed, Nov 13, 2019 at 9:12 PM Matthias J. Sax 
> wrote:
>
> > Thanks for the summary Vinoth!
> >
> > I buy the overall argument. Couple of clarification questions:
> >
> >
> > 10. Why do we need to include the active stores in
> > `allLocalStoreOffsetLags()`? Would it not be simpler to just return lag
> > for standbys?
> >
> >
> > 20: What does
> >
> > > Thin the KeyQueryMetadata object to just contain the minimum information
> > > needed.
> >
> > exaclty mean? What is the "minimum information needed" ?
> >
> >
> > @Navinder: if you agree, can you update the KIP accoringly? With all the
> > proposals, it's hard to keep track and it would be great to have the
> > current proposal summarized in the wiki page.
> >
> > Please also update the "Rejected alternative" sections to avoid that we
> > cycle back to old proposal (including the reason _why_ they got rejected).
> >
> >
> > Thanks a lot!
> >
> >
> > -Matthias
> >
> >
> >
> > On 11/13/19 7:10 PM, Vinoth Chandar wrote:
> > > Given we have had a healthy discussion on this topic for a month now and
> > > still with many loose ends and open ended conversations, I thought It
> > would
> > > be worthwhile to take a step back and re-evaluate everything in the
> > context
> > > of the very real use-case and its specific scenarios.
> > >
> > > First, let's remind ourselves of the query routing flow of the streams
> > > application ("app" here on)
> > >
> > >1. queries get routed to any random streams instance in the cluster
> > >("router" here on)
> > >2. router then uses Streams metadata to pick active/standby instances
> > >for that key's store/partition
> > >3. router instance also maintains global lag information for all
> > stores
> > >and all their partitions, by a gossip/broadcast/heartbeat protocol
> > (done
> > >outside of Streams framework), but using KafkaStreams#allMetadata()
> > for
> > >streams instance discovery.
> > >4. router then uses information in 2 & 3 to determine which instance
> > to
> > >send the query to  : always picks active instance if alive or the most
> > >in-sync live standby otherwise.
> > >
> > > Few things to note :
> > >
> > > A) We choose to decouple how the lag information is obtained (control
> > > plane) from query path (data plane), since that provides more flexibility
> > > in designing the control plane. i.e pick any or combination of gossip,
> > > N-way broadcast, control the rate of propagation, piggybacking on request
> > > responses
> > > B) Since the app needs to do its own control plane, talking to other
> > > instances directly for failure detection & exchanging other metadata, we
> > > can leave the lag APIs added to KafkaStreams class itself local and
> > simply
> > > return lag for all store/partitions on that instance.
> > > C) Streams preserves its existing behavior of instances only talking to
> > > each other through the Kafka brokers.
> > > D) Since the router treats active/standby differently, it would be good
> > for
> > > the KafkaStreams APIs to hand them back explicitly, with no additional
> > > logic needed for computing them. Specifically, the router only knows two
> > > things - key and store and if we just return a
> > Collection
> > > back, it 

Build failed in Jenkins: kafka-2.4-jdk8 #74

2019-11-14 Thread Apache Jenkins Server
See 


Changes:

[manikumar] MINOR: fix flaky ConsumerBounceTest.testClose

[manikumar] KAFKA-9046: Use top-level worker configs for connector admin clients


--
[...truncated 5.45 MB...]

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestampWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfTimestampIsDifferentForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfTimestampIsDifferentForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualWithNullForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualWithNullForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
STARTED


[jira] [Created] (KAFKA-9187) kafka.api.SaslGssapiSslEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl

2019-11-14 Thread Bill Bejeck (Jira)
Bill Bejeck created KAFKA-9187:
--

 Summary: 
kafka.api.SaslGssapiSslEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl
 Key: KAFKA-9187
 URL: https://issues.apache.org/jira/browse/KAFKA-9187
 Project: Kafka
  Issue Type: Test
  Components: core
Reporter: Bill Bejeck


Failed in [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/26593/]

 
{noformat}
Error Messageorg.scalatest.exceptions.TestFailedException: Consumed 0 records 
before timeout instead of the expected 1 
recordsStacktraceorg.scalatest.exceptions.TestFailedException: Consumed 0 
records before timeout instead of the expected 1 records
at 
org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:530)
at 
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389)
at org.scalatest.Assertions$class.fail(Assertions.scala:1091)
at org.scalatest.Assertions$.fail(Assertions.scala:1389)
at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:842)
at kafka.utils.TestUtils$.pollRecordsUntilTrue(TestUtils.scala:793)
at 
kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:1334)
at kafka.utils.TestUtils$.consumeRecords(TestUtils.scala:1343)
at 
kafka.api.EndToEndAuthorizationTest.consumeRecords(EndToEndAuthorizationTest.scala:530)
at 
kafka.api.EndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl(EndToEndAuthorizationTest.scala:369)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
at org.junit.runners.ParentRunner.run(ParentRunner.java:412)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
at 
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at 
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
at 
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
at 

Build failed in Jenkins: kafka-trunk-jdk8 #4039

2019-11-14 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-9169: fix standby checkpoint initialization (#7681)


--
[...truncated 5.56 MB...]

org.apache.kafka.streams.scala.kstream.MaterializedTest > Create a Materialize 
with a key value store supplier should create a Materialized with Serdes and a 
store supplier PASSED

org.apache.kafka.streams.scala.kstream.MaterializedTest > Create a Materialize 
with a session store supplier should create a Materialized with Serdes and a 
store supplier STARTED

org.apache.kafka.streams.scala.kstream.MaterializedTest > Create a Materialize 
with a session store supplier should create a Materialized with Serdes and a 
store supplier PASSED

org.apache.kafka.streams.scala.kstream.ProducedTest > Create a Produced should 
create a Produced with Serdes STARTED

org.apache.kafka.streams.scala.kstream.ProducedTest > Create a Produced should 
create a Produced with Serdes PASSED

org.apache.kafka.streams.scala.kstream.ProducedTest > Create a Produced with 
timestampExtractor and resetPolicy should create a Consumed with Serdes, 
timestampExtractor and resetPolicy STARTED

org.apache.kafka.streams.scala.kstream.ProducedTest > Create a Produced with 
timestampExtractor and resetPolicy should create a Consumed with Serdes, 
timestampExtractor and resetPolicy PASSED

org.apache.kafka.streams.scala.kstream.ConsumedTest > Create a Consumed should 
create a Consumed with Serdes STARTED

org.apache.kafka.streams.scala.kstream.ConsumedTest > Create a Consumed should 
create a Consumed with Serdes PASSED

org.apache.kafka.streams.scala.kstream.ConsumedTest > Create a Consumed with 
timestampExtractor and resetPolicy should create a Consumed with Serdes, 
timestampExtractor and resetPolicy STARTED

org.apache.kafka.streams.scala.kstream.ConsumedTest > Create a Consumed with 
timestampExtractor and resetPolicy should create a Consumed with Serdes, 
timestampExtractor and resetPolicy PASSED

org.apache.kafka.streams.scala.kstream.ConsumedTest > Create a Consumed with 
timestampExtractor should create a Consumed with Serdes and timestampExtractor 
STARTED

org.apache.kafka.streams.scala.kstream.ConsumedTest > Create a Consumed with 
timestampExtractor should create a Consumed with Serdes and timestampExtractor 
PASSED

org.apache.kafka.streams.scala.kstream.ConsumedTest > Create a Consumed with 
resetPolicy should create a Consumed with Serdes and resetPolicy STARTED

org.apache.kafka.streams.scala.kstream.ConsumedTest > Create a Consumed with 
resetPolicy should create a Consumed with Serdes and resetPolicy PASSED

org.apache.kafka.streams.scala.kstream.KTableTest > filter a KTable should 
filter records satisfying the predicate STARTED

org.apache.kafka.streams.scala.kstream.KTableTest > filter a KTable should 
filter records satisfying the predicate PASSED

org.apache.kafka.streams.scala.kstream.KTableTest > filterNot a KTable should 
filter records not satisfying the predicate STARTED

org.apache.kafka.streams.scala.kstream.KTableTest > filterNot a KTable should 
filter records not satisfying the predicate PASSED

org.apache.kafka.streams.scala.kstream.KTableTest > join 2 KTables should join 
correctly records STARTED

org.apache.kafka.streams.scala.kstream.KTableTest > join 2 KTables should join 
correctly records PASSED

org.apache.kafka.streams.scala.kstream.KTableTest > join 2 KTables with a 
Materialized should join correctly records and state store STARTED

org.apache.kafka.streams.scala.kstream.KTableTest > join 2 KTables with a 
Materialized should join correctly records and state store PASSED

org.apache.kafka.streams.scala.kstream.KTableTest > windowed KTable#suppress 
should correctly suppress results using Suppressed.untilTimeLimit STARTED

org.apache.kafka.streams.scala.kstream.KTableTest > windowed KTable#suppress 
should correctly suppress results using Suppressed.untilTimeLimit PASSED

org.apache.kafka.streams.scala.kstream.KTableTest > windowed KTable#suppress 
should correctly suppress results using Suppressed.untilWindowCloses STARTED

org.apache.kafka.streams.scala.kstream.KTableTest > windowed KTable#suppress 
should correctly suppress results using Suppressed.untilWindowCloses PASSED

org.apache.kafka.streams.scala.kstream.KTableTest > session windowed 
KTable#suppress should correctly suppress results using 
Suppressed.untilWindowCloses STARTED

org.apache.kafka.streams.scala.kstream.KTableTest > session windowed 
KTable#suppress should correctly suppress results using 
Suppressed.untilWindowCloses PASSED

org.apache.kafka.streams.scala.kstream.KTableTest > non-windowed 
KTable#suppress should correctly suppress results using 
Suppressed.untilTimeLimit STARTED

org.apache.kafka.streams.scala.kstream.KTableTest > non-windowed 
KTable#suppress should correctly suppress results using 
Suppressed.untilTimeLimit PASSED

org.apache.kafka.streams.scala.kstream.JoinedTest > 

Jenkins build is back to normal : kafka-2.3-jdk8 #135

2019-11-14 Thread Apache Jenkins Server
See 




[jira] [Created] (KAFKA-9186) Kafka Connect floods logs with probably bogus error messages from DelegatingClassLoader

2019-11-14 Thread Piotr Szczepanik (Jira)
Piotr Szczepanik created KAFKA-9186:
---

 Summary: Kafka Connect floods logs with probably bogus error 
messages from DelegatingClassLoader
 Key: KAFKA-9186
 URL: https://issues.apache.org/jira/browse/KAFKA-9186
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.3.1
Reporter: Piotr Szczepanik


After upgrading Kafka Connect from 2.3.0 to 2.3.1 we discovered a lot of 
recurring ERROR messages in Connect's logs.
{noformat}
Plugin class loader for connector: 
'com.google.pubsub.kafka.sink.CloudPubSubSinkConnector' was not found. 
Returning: 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader@58f437b0
logger: org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader

{noformat}
Kafka Connect continues processing the topics as it should be.

We are not using plugin classloader isolation feature by not specifying 
plugin.path property because we were seeing classloading deadlocks similar to 
ones described in [KAFKA-7421|https://issues.apache.org/jira/browse/KAFKA-7421]

Maybe the level of this message should be lowered?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-14 Thread Vinoth Chandar
10. I considered that. Had to pick one or the other. Can just return
standby too and rename method to may be “allLocalStandbyOffsetLags()” to
have it explicit. (Standby should implicitly convey that we are talking
about stores)

20. What I meant was, we are returning HostInfo instead of StreamsMetadata
since thats sufficient to route query; same for “int partition “ vs topic
partition before. Previously KeyQueryMetadata had similar structure but
used StreamsMetadata and TopicPartition objects to convey same information

@navinder KIP is already upto date with the email I sent, except for the
reasonings I was laying out. +1 on revisiting rejected alternatives.
Please make the follow up changes

On Wed, Nov 13, 2019 at 9:12 PM Matthias J. Sax 
wrote:

> Thanks for the summary Vinoth!
>
> I buy the overall argument. Couple of clarification questions:
>
>
> 10. Why do we need to include the active stores in
> `allLocalStoreOffsetLags()`? Would it not be simpler to just return lag
> for standbys?
>
>
> 20: What does
>
> > Thin the KeyQueryMetadata object to just contain the minimum information
> > needed.
>
> exaclty mean? What is the "minimum information needed" ?
>
>
> @Navinder: if you agree, can you update the KIP accoringly? With all the
> proposals, it's hard to keep track and it would be great to have the
> current proposal summarized in the wiki page.
>
> Please also update the "Rejected alternative" sections to avoid that we
> cycle back to old proposal (including the reason _why_ they got rejected).
>
>
> Thanks a lot!
>
>
> -Matthias
>
>
>
> On 11/13/19 7:10 PM, Vinoth Chandar wrote:
> > Given we have had a healthy discussion on this topic for a month now and
> > still with many loose ends and open ended conversations, I thought It
> would
> > be worthwhile to take a step back and re-evaluate everything in the
> context
> > of the very real use-case and its specific scenarios.
> >
> > First, let's remind ourselves of the query routing flow of the streams
> > application ("app" here on)
> >
> >1. queries get routed to any random streams instance in the cluster
> >("router" here on)
> >2. router then uses Streams metadata to pick active/standby instances
> >for that key's store/partition
> >3. router instance also maintains global lag information for all
> stores
> >and all their partitions, by a gossip/broadcast/heartbeat protocol
> (done
> >outside of Streams framework), but using KafkaStreams#allMetadata()
> for
> >streams instance discovery.
> >4. router then uses information in 2 & 3 to determine which instance
> to
> >send the query to  : always picks active instance if alive or the most
> >in-sync live standby otherwise.
> >
> > Few things to note :
> >
> > A) We choose to decouple how the lag information is obtained (control
> > plane) from query path (data plane), since that provides more flexibility
> > in designing the control plane. i.e pick any or combination of gossip,
> > N-way broadcast, control the rate of propagation, piggybacking on request
> > responses
> > B) Since the app needs to do its own control plane, talking to other
> > instances directly for failure detection & exchanging other metadata, we
> > can leave the lag APIs added to KafkaStreams class itself local and
> simply
> > return lag for all store/partitions on that instance.
> > C) Streams preserves its existing behavior of instances only talking to
> > each other through the Kafka brokers.
> > D) Since the router treats active/standby differently, it would be good
> for
> > the KafkaStreams APIs to hand them back explicitly, with no additional
> > logic needed for computing them. Specifically, the router only knows two
> > things - key and store and if we just return a
> Collection
> > back, it cannot easily tease apart active and standby. Say, a streams
> > instance hosts the same store as both active and standby for different
> > partitions, matching by just storename the app will find it in both
> active
> > and standby lists.
> > E) From above, we assume the global lag estimate (lag per store topic
> > partition) are continuously reported amongst application instances and
> > already available on the router during step 2 above. Hence, attaching lag
> > APIs to StreamsMetadata is unnecessary and does not solve the needs
> anyway.
> > F) Currently returned StreamsMetadata object is really information about
> a
> > streams instance, that is not very specific to the key being queried.
> > Specifically, router has no knowledge of the topic partition a given key
> > belongs, this is needed for matching to the global lag information in
> step
> > 4 above (and as also the example code in the KIP showed before). The
> > StreamsMetadata, since it's about the instance itself, would contain all
> > topic partitions and stores on that instance, not specific to the given
> key.
> > G) A cleaner API would thin the amount of information returned to
> > specifically the given key and 

[jira] [Resolved] (KAFKA-9046) Connect worker configs require undocumented 'admin.' prefix to configure DLQ for connectors

2019-11-14 Thread Manikumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-9046.
--
Fix Version/s: 2.4.0
   2.3.2
   Resolution: Fixed

Issue resolved by pull request 7525
[https://github.com/apache/kafka/pull/7525]

> Connect worker configs require undocumented 'admin.' prefix to configure DLQ 
> for connectors
> ---
>
> Key: KAFKA-9046
> URL: https://issues.apache.org/jira/browse/KAFKA-9046
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0, 2.4.0, 2.3.1
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Blocker
> Fix For: 2.3.2, 2.4.0
>
>
> The changes for KAFKA-8265 involved [adding a prefix of "admin." to Connect 
> worker 
> configs|https://github.com/apache/kafka/pull/6624/files#diff-316d2c222b623ee65e8065863bf4b9ceR606]
>  that would be used to configure the admin client that's used for connector 
> DLQs. However, this was never documented in the [corresponding 
> KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy]
>  and has broken backwards compatibility with prior Connect releases since 
> workers without the necessary {{"admin."}}-prefixed properties in their 
> configuration files will now fail in some circumstances (e.g., when 
> interacting with a secured Kafka cluster that requires authentication from 
> all admin clients that interact with it).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Apache Kafka 2.4.0 release

2019-11-14 Thread Manikumar
Hi Randall and Konstantine,

Thanks for the explanation. Since it block users upgrading from older
Connect versions to
2.4 release, we will consider the JIRA as blocker for 2.4.0.

Thanks


On Wed, Nov 13, 2019 at 11:58 PM Konstantine Karantasis <
konstant...@confluent.io> wrote:

> Hi Manikumar,
>
> thanks for your quick response on the Connect issue.
>
> I'd like to help shedding some more light here. The issue is clearly a
> regression. It may block users upgrading from older Connect versions to the
> most recent one. Unfortunately it was introduced in an area of the
> configuration that was hard to detect and went unnoticed during the release
> of 2.3.0. However, in my opinion, the fact that this regression went
> unnoticed for a release is not sufficient precedent to downgrade this issue
> from blocker status.
>
> Of course, I'd like to thank Chris Egerton for investigating and issuing a
> straightforward PR to fix this.
> I'm also not happy that this comes up right before RC0 is posted, but, all
> things considered, I'd be in favor of retaining our increased focus on
> compatibility even in the presence of misses such as this one, and
> therefore, I'd recommend considering this issue as a blocker.
>
> Cheers,
> Konstantine
>
> On Tue, Nov 12, 2019 at 8:01 PM Sophie Blee-Goldman 
> wrote:
>
> > Hi Manikumar,
> >
> > We have another potential blocker in 2.4, which affects Streams members
> > that
> > fall out of the group and may cause them to die upon rejoin.
> >
> > The ticket is here
> > https://issues.apache.org/jira/browse/KAFKA-9178
> >
> > The fix is very straightforward, and we have a small PR out for it here
> > https://github.com/apache/kafka/pull/7686
> >
> > Thanks,
> > Sophie
> >
> >
> > On Tue, Nov 12, 2019 at 7:33 PM Manikumar 
> > wrote:
> >
> > > Hi Chris,
> > >
> > > Since this is existing issue, I am hesitant to include as blocker for
> 2.4
> > > release.
> > > I am in the process of publishing RC0 for 2.4. We will try to include
> > > KAFKA-9046,
> > > if there any other blockers, which requires roll out of new RC.
> > >
> > >
> > > Thanks,
> > >
> > > On Wed, Nov 13, 2019 at 5:46 AM John Roesler 
> wrote:
> > >
> > > > Hi again Manikumar,
> > > >
> > > > Thanks for looking at the PR. It's not easy to pin this particular
> bug
> > > > down to a specific commit, but I've ported my test to the 2.3 branch.
> > > >
> > > > It looks like this bug pre-dates 2.4, so it's not a regression.
> > > > Therefore, after reflection, we don't need to block the 2.4 release
> on
> > > > https://issues.apache.org/jira/browse/KAFKA-9169 .
> > > >
> > > > I've downgraded the bug from "Blocker" to "Critical" accordingly.
> > > >
> > > > Thanks,
> > > > -John
> > > >
> > > > On Tue, Nov 12, 2019 at 11:00 AM John Roesler 
> > wrote:
> > > > >
> > > > > Hi Manikumar,
> > > > >
> > > > > There is a new bug report that we think is a blocker:
> > > > >
> > > > > https://issues.apache.org/jira/browse/KAFKA-9169
> > > > > https://github.com/apache/kafka/pull/7681
> > > > >
> > > > > See also the dev@ thread entitled "Why standby tasks read from the
> > > > > StandbyTasks::checkpointedOffsets in assignStandbyPartitions()" for
> > > > > context.
> > > > >
> > > > > I'm working on a test now.
> > > > >
> > > > > Thanks,
> > > > > -John
> > > > >
> > > > >
> > > > > On Mon, Nov 11, 2019 at 10:42 AM Manikumar <
> > manikumar.re...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > I am trying to get stable system test runs on 2.4 branch. While
> > > > analyzing
> > > > > > the ReassignPartitionsTest failure,
> > > > > > Rajini identified a issue, which can cause a successful produce
> to
> > be
> > > > > > failed. Thanks to Rajini for identifying the issue.
> > > > > >
> > > > > > https://issues.apache.org/jira/browse/KAFKA-9171
> > > > > > https://github.com/apache/kafka/pull/7678
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > On Mon, Nov 4, 2019 at 10:03 PM Manikumar <
> > manikumar.re...@gmail.com
> > > >
> > > > wrote:
> > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > > The underlying issue of KAFKA-8677 can cause data loss in
> > > consumers.
> > > > I
> > > > > > > have included KAFKA-8677 as blocker to 2.4 release.
> > > > > > > Thanks to Guozhang for identifying the issue.
> > > > > > >
> > > > > > > https://issues.apache.org/jira/browse/KAFKA-8677
> > > > > > > PR: https://github.com/apache/kafka/pull/7613
> > > > > > >
> > > > > > > Thanks
> > > > > > >
> > > > > > > On Fri, Nov 1, 2019 at 9:20 PM Manikumar <
> > > manikumar.re...@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > >> Hi All,
> > > > > > >>
> > > > > > >> We still have couple of blockers to close.  PRs available for
> > both
> > > > the
> > > > > > >> blockers.
> > > > > > >>
> > > > > > >> https://issues.apache.org/jira/browse/KAFKA-8972
> > > > > > >> https://issues.apache.org/jira/browse/KAFKA-9080
> > > > > > >>
> > > > > > >>
> > > > > > >> Thanks,
> > > > > > >>
> > > > > > >>
>