[jira] [Commented] (KAFKA-7928) Deprecate WindowStore.put(key, value)

2019-05-22 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16846417#comment-16846417
 ] 

Matthias J. Sax commented on KAFKA-7928:


This sees to be a duplicate of https://issues.apache.org/jira/browse/KAFKA-7245 
– because this ticket does not show any progress, I would suggest to close this 
as duplicate of 7245, and proceed with 7245. \cc [~vvcephei] [~ouertani] 
Thoughts?

> Deprecate WindowStore.put(key, value)
> -
>
> Key: KAFKA-7928
> URL: https://issues.apache.org/jira/browse/KAFKA-7928
> Project: Kafka
>  Issue Type: Improvement
>Reporter: John Roesler
>Assignee: Slim Ouertani
>Priority: Major
>  Labels: beginner, easy-fix, needs-kip, newbie
>
> Specifically, `org.apache.kafka.streams.state.WindowStore#put(K, V)`
> This method is strange... A window store needs to have a timestamp associated 
> with the key, so if you do a put without a timestamp, it's up to the store to 
> just make one up.
> Even the javadoc on the method recommends not to use it, due to this 
> confusing behavior.
> We should just deprecate it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7245) Deprecate WindowStore#put(key, value)

2019-05-22 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16846416#comment-16846416
 ] 

Matthias J. Sax commented on KAFKA-7245:


All public API changes need a KIP. Hence, if we want to remove `put()` we need 
a KIP (even if we will deprecate it first and do the actual remove is a later 
release).

Seems KAFKA-7928 duplicates this ticket. There was not progress on the other 
ticket and not KIP was provided. Hence, I tend to close 7928 as duplicate and 
we can proceed with this ticket as planned. I'll leave comment on 7928 about 
this.

> Deprecate WindowStore#put(key, value)
> -
>
> Key: KAFKA-7245
> URL: https://issues.apache.org/jira/browse/KAFKA-7245
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Omkar Mestry
>Priority: Minor
>  Labels: needs-kip, newbie
>
> We want to remove `WindowStore#put(key, value)` – for this, we first need to 
> deprecate is via a KIP and remove later.
> Instead of using `WindowStore#put(key, value)` we need to migrate code to 
> specify the timestamp explicitly using `WindowStore#put(key, value, 
> timestamp)`. The current code base use the explicit call to set the timestamp 
> in production code already. The simplified `put(key, value)` is only used in 
> tests, and thus, we would need to update those tests.
> [https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7245) Deprecate WindowStore#put(key, value)

2019-05-22 Thread Omkar Mestry (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16846402#comment-16846402
 ] 

Omkar Mestry commented on KAFKA-7245:
-

Jira :- https://issues.apache.org/jira/browse/KAFKA-7928

[~mjsax]  is the above Jira related. 

> Deprecate WindowStore#put(key, value)
> -
>
> Key: KAFKA-7245
> URL: https://issues.apache.org/jira/browse/KAFKA-7245
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Omkar Mestry
>Priority: Minor
>  Labels: needs-kip, newbie
>
> We want to remove `WindowStore#put(key, value)` – for this, we first need to 
> deprecate is via a KIP and remove later.
> Instead of using `WindowStore#put(key, value)` we need to migrate code to 
> specify the timestamp explicitly using `WindowStore#put(key, value, 
> timestamp)`. The current code base use the explicit call to set the timestamp 
> in production code already. The simplified `put(key, value)` is only used in 
> tests, and thus, we would need to update those tests.
> [https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8311) Better consumer timeout exception handling

2019-05-22 Thread WooYoung (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16846356#comment-16846356
 ] 

WooYoung commented on KAFKA-8311:
-

[~bchen225242] , I have written merge request message on 
[https://github.com/apache/kafka/pull/6792]

However you had suggested me that I need to try to this Jira ticket after 
[https://github.com/apache/kafka/pull/6662] was merged first.

 

I have waited that the ticket was merged, but there is nothing changed.

So I tried that I think this is not appropriate to give merge request about 
same Jira ticket.

 

If this is a problem, plz tell me cancel the ticket of 
[https://github.com/apache/kafka/pull/6792].

 

 

> Better consumer timeout exception handling 
> ---
>
> Key: KAFKA-8311
> URL: https://issues.apache.org/jira/browse/KAFKA-8311
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, streams
>Reporter: Boyang Chen
>Assignee: WooYoung
>Priority: Major
>  Labels: newbie
>
> When stream application crashed due to underlying consumer commit timeout, we 
> have seen following gaps:
> 1. The current timeout exception doesn't provide meaningful tuning 
> instructions. We should augment the error message to let user change 
> `default.api.timeout.ms` in order to tolerate longer reaction time.
> 2. Currently we have 3 different types of consumers on KStream: 
> thread-consumer, global-consumer and restore-consumer. Although we don't plan 
> to explicitly handle this consumer timeout on stream level, we could wrap it 
> with more meaningful message either on consumer or stream level to let user 
> be aware which consumer is having trouble.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8402) bin/kafka-preferred-replica-election.sh fails if generated json is bigger than 1MB

2019-05-22 Thread huxihx (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16846315#comment-16846315
 ] 

huxihx commented on KAFKA-8402:
---

 A possible solution is to throw an exception when the length of the encoded 
byte array hits the 1MB threshold.

> bin/kafka-preferred-replica-election.sh fails if generated json is bigger 
> than 1MB
> --
>
> Key: KAFKA-8402
> URL: https://issues.apache.org/jira/browse/KAFKA-8402
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 1.1.1
>Reporter: Vyacheslav Stepanov
>Priority: Major
>
> If I run script {{bin/kafka-preferred-replica-election.sh}} without 
> specifying the list of topics/partitions - it will get all topics/partitions 
> from zookeeper and transform that to json, then it will create zookeeper node 
> at {{/admin/preferred_replica_election}} using this json as data for that 
> zookeeper node. If the generated json is bigger than 1MB (default max size of 
> data of zookeeper node) - the script will fail without giving a good 
> description of failure. The size of 1MB can be reached if the amount of 
> topics/partitions is high enough.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8389) Duplicated MockProcessorSupplier / MockProcessor in TopologyTestDriverTest

2019-05-22 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-8389.
--
Resolution: Won't Fix

> Duplicated MockProcessorSupplier / MockProcessor in TopologyTestDriverTest
> --
>
> Key: KAFKA-8389
> URL: https://issues.apache.org/jira/browse/KAFKA-8389
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
>
> We have stand-alone classes of MockProcessorSupplier / MockProcessor classes, 
> yet we have those in TopologyTestDriverTest as well. We should consider 
> removing them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7995) Augment singleton protocol type to list for Kafka Consumer

2019-05-22 Thread SuryaTeja Duggi (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16846263#comment-16846263
 ] 

SuryaTeja Duggi commented on KAFKA-7995:


[~bchen225242] Can i pick this up I am new. Could you add me to mailing list. 

> Augment singleton protocol type to list for Kafka Consumer  
> 
>
> Key: KAFKA-7995
> URL: https://issues.apache.org/jira/browse/KAFKA-7995
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, core
>Reporter: Boyang Chen
>Priority: Major
>  Labels: newbie
>
> Right now Kafka consumer protocol uses a singleton marker to distinguish 
> Kafka Connect worker and normal consumer. This is not upgrade-friendly 
> approach since the protocol type could potential change over time. A better 
> approach is to support multiple candidacies so that the no downtime protocol 
> type switch could achieve.
> For example, if we are trying to upgrade a Kafka Streams application towards 
> a protocol type called "stream", right now there is no way to do this without 
> downtime since broker will reject changing protocol type to a different one 
> unless the group is back to empty. If we allow new member to provide a list 
> of protocol type instead ("consumer", "stream"), there would be no 
> compatibility issue.
> Alternative approach is to invent an admin API to change group's protocol 
> type on runtime. However, the burden introduced on administrator is not 
> trivial, since we need to guarantee the operation series to be correct, 
> otherwise we will see limp-upgrade experience in the midpoint, for example 
> while we are changing protocol type there was unexpected rebalance that 
> causes old members join failure.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8410) Strengthen the types of Processors, at least in the DSL, maybe in the PAPI as well

2019-05-22 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16846261#comment-16846261
 ] 

Guozhang Wang commented on KAFKA-8410:
--

[~vvcephei] Thanks for creating this ticket. I think it is good to tighten up 
the typing system in DSL.

For PAPI though, remember that we need to allow a processor node to flexibly 
send to multiple children which expect to take different types of input, so we 
need to be careful when trying to enforcing bounding it to specific types.

> Strengthen the types of Processors, at least in the DSL, maybe in the PAPI as 
> well
> --
>
> Key: KAFKA-8410
> URL: https://issues.apache.org/jira/browse/KAFKA-8410
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: tech-debt
>
> Presently, it's very difficult to have confidence when adding to or modifying 
> processors in the DSL. There's a lot of raw types, duck-typing, and casting 
> that contribute to this problem.
> The root, though, is that the generic types on `Processor` refer only to 
> the _input_ key and value types. No information is captured or verified about 
> what the _output_ types of a processor are. For example, this leads to 
> widespread confusion in the code base about whether a processor produces `V`s 
> or `Change`s. The type system actually makes matters worse, since we use 
> casts to make the processors conform to declared types that are in fact 
> wrong, but are never checked due to erasure.
> We can start to make some headway on this tech debt by adding some types to 
> the ProcessorContext that bound the `` that may be passed to 
> `context.forward`. Then, we can build on this by fully specifying the input 
> and output types of the Processors, which in turn would let us eliminate the 
> majority of unchecked casts in the DSL operators.
> I'm not sure whether adding these generic types to the existing 
> ProcessorContext and Processor interfaces, which would also affect the PAPI 
> has any utility, or whether we should make this purely an internal change by 
> introducing GenericProcessorContext and GenericProcessor peer interfaces for 
> the DSL to use.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8396) Clean up Transformer API

2019-05-22 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16846206#comment-16846206
 ] 

John Roesler commented on KAFKA-8396:
-

Created: https://issues.apache.org/jira/browse/KAFKA-8410

> Clean up Transformer API
> 
>
> Key: KAFKA-8396
> URL: https://issues.apache.org/jira/browse/KAFKA-8396
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-kip, user-experience
>
> Currently, KStream operators transformValues and flatTransformValues disable 
> context forwarding, and force operators to just return the new values.
> The reason is that we wanted to prevent the key from changing, since the 
> whole point of a `xValues` transformation is that we _do not_ change the key, 
> and hence don't need to repartition.
> However, the chosen mechanism has some drawbacks: The Transform concept is 
> basically a way to plug in a custom Processor within the Streams DSL, but 
> these restrictions make it more like a MapValues with access to the context. 
> For example, even though you can still schedule punctuations, there's no way 
> to forward values as a result of them. So, as a user, it's hard to build a 
> mental model of how to use a TransformValues (because it's not quite a 
> Transformer and not quite a Mapper).
> Also, logically, a Transformer can call forward as much as it wants, so a 
> Transformer and a FlatTransformer are effectively the same thing. Then, we 
> also have TransformValues and FlatTransformValues that are also two more 
> versions of the same thing, just to implement the key restrictions. 
> Internally, some of these can send downstream by returning OR forwarding, and 
> others can only return. It's a lot for users to keep in mind.
> We can clean up this API significantly by just allowing all transformers to 
> call `forward`. In the `Values` case, we can wrap the ProcessorContext in one 
> that checks the key is `equal` to the one that got passed in (i.e., saves a 
> reference and enforces equality with that reference in any call to 
> `forward`). Then, we can actually deprecate the `*ValueTransformer*` 
> interfaces and remove the restriction about calling forward.
> We can consider a further cleanup (TBD) to deprecate the existing Transformer 
> interface entirely, and replace it with one with a `void` return type. Then, 
> the Transform and FlatTransform cases collapse together, and we just need 
> Transform and TransformValues.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8410) Strengthen the types of Processors, at least in the DSL, maybe in the PAPI as well

2019-05-22 Thread John Roesler (JIRA)
John Roesler created KAFKA-8410:
---

 Summary: Strengthen the types of Processors, at least in the DSL, 
maybe in the PAPI as well
 Key: KAFKA-8410
 URL: https://issues.apache.org/jira/browse/KAFKA-8410
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


Presently, it's very difficult to have confidence when adding to or modifying 
processors in the DSL. There's a lot of raw types, duck-typing, and casting 
that contribute to this problem.

The root, though, is that the generic types on `Processor` refer only to 
the _input_ key and value types. No information is captured or verified about 
what the _output_ types of a processor are. For example, this leads to 
widespread confusion in the code base about whether a processor produces `V`s 
or `Change`s. The type system actually makes matters worse, since we use 
casts to make the processors conform to declared types that are in fact wrong, 
but are never checked due to erasure.

We can start to make some headway on this tech debt by adding some types to the 
ProcessorContext that bound the `` that may be passed to 
`context.forward`. Then, we can build on this by fully specifying the input and 
output types of the Processors, which in turn would let us eliminate the 
majority of unchecked casts in the DSL operators.

I'm not sure whether adding these generic types to the existing 
ProcessorContext and Processor interfaces, which would also affect the PAPI has 
any utility, or whether we should make this purely an internal change by 
introducing GenericProcessorContext and GenericProcessor peer interfaces for 
the DSL to use.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8396) Clean up Transformer API

2019-05-22 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16846202#comment-16846202
 ] 

John Roesler commented on KAFKA-8396:
-

Thanks [~guozhang]. Agreed. I've recently had several run-ins where a strongly 
typed context-forward would have saved significant time.

I was thinking about proposing that change separately, as a precondition to the 
"further cleanup" idea. I'll create a new ticket for that.

> Clean up Transformer API
> 
>
> Key: KAFKA-8396
> URL: https://issues.apache.org/jira/browse/KAFKA-8396
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-kip, user-experience
>
> Currently, KStream operators transformValues and flatTransformValues disable 
> context forwarding, and force operators to just return the new values.
> The reason is that we wanted to prevent the key from changing, since the 
> whole point of a `xValues` transformation is that we _do not_ change the key, 
> and hence don't need to repartition.
> However, the chosen mechanism has some drawbacks: The Transform concept is 
> basically a way to plug in a custom Processor within the Streams DSL, but 
> these restrictions make it more like a MapValues with access to the context. 
> For example, even though you can still schedule punctuations, there's no way 
> to forward values as a result of them. So, as a user, it's hard to build a 
> mental model of how to use a TransformValues (because it's not quite a 
> Transformer and not quite a Mapper).
> Also, logically, a Transformer can call forward as much as it wants, so a 
> Transformer and a FlatTransformer are effectively the same thing. Then, we 
> also have TransformValues and FlatTransformValues that are also two more 
> versions of the same thing, just to implement the key restrictions. 
> Internally, some of these can send downstream by returning OR forwarding, and 
> others can only return. It's a lot for users to keep in mind.
> We can clean up this API significantly by just allowing all transformers to 
> call `forward`. In the `Values` case, we can wrap the ProcessorContext in one 
> that checks the key is `equal` to the one that got passed in (i.e., saves a 
> reference and enforces equality with that reference in any call to 
> `forward`). Then, we can actually deprecate the `*ValueTransformer*` 
> interfaces and remove the restriction about calling forward.
> We can consider a further cleanup (TBD) to deprecate the existing Transformer 
> interface entirely, and replace it with one with a `void` return type. Then, 
> the Transform and FlatTransform cases collapse together, and we just need 
> Transform and TransformValues.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8219) Add web documentation for static membership

2019-05-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16846199#comment-16846199
 ] 

ASF GitHub Bot commented on KAFKA-8219:
---

abbccdda commented on pull request #6790: KAFKA-8219: add doc changes for 
static membership release
URL: https://github.com/apache/kafka/pull/6790
 
 
   This is a doc-change only PR to add upgrade notes for static membership in 
2.3
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add web documentation for static membership
> ---
>
> Key: KAFKA-8219
> URL: https://issues.apache.org/jira/browse/KAFKA-8219
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer, streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> Need official documentation update.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8409) Seperate controller thread/manager events from controller events

2019-05-22 Thread Jose Armando Garcia Sancio (JIRA)
Jose Armando Garcia Sancio created KAFKA-8409:
-

 Summary: Seperate controller thread/manager events from controller 
events
 Key: KAFKA-8409
 URL: https://issues.apache.org/jira/browse/KAFKA-8409
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 2.4.0
Reporter: Jose Armando Garcia Sancio
Assignee: Jose Armando Garcia Sancio


The _ControllerEventManager_ and the _KafkaController_ are driven by processing 
_ControllerEvent_ in the queue. As of right now the _ControllerEventManager_ is 
responsible for handling _ShutdownEventThread_ while the _KafkaController_ 
handles all other events. Since we are using a _sealed trait_ the scala 
compiler is able to check that all cases are handle in match expressions.

To by pass this compiler feature the _KafkaController_ is matching on 
_ShutdownEventThread_ and ignoring the result. We should instead  create two 
type of events. For example:

{code}
sealed trait ControllerThreadEvent
final case object ShutdownEventThread extends ControllerThreadEvent
final case class ControllerEvent(event: ControllerEvent) extends 
ControllerThreadEvent
{code}

And remove _ShutdownEventThread_ from the _ControlleveEvent_ _sealed trait_. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8408) Create one request purgator for all the controller requests

2019-05-22 Thread Jose Armando Garcia Sancio (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16846157#comment-16846157
 ] 

Jose Armando Garcia Sancio commented on KAFKA-8408:
---

What do you think [~cmccabe] [~hachikuji]?

> Create one request purgator for all the controller requests
> ---
>
> Key: KAFKA-8408
> URL: https://issues.apache.org/jira/browse/KAFKA-8408
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Affects Versions: 2.4
>Reporter: Jose Armando Garcia Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Critical
>
> Some RPCs (i.e. leader election) for the controller are going through the 
> replica manager. It looks like the reason for this is because currently the 
> replica manager has all of the purgatory queues. Another other issue is that 
> we have one purgatory per request type; this makes it harder (more work) to 
> add requests that go directly to the controller by passing zk.
> In the case of the controller it should be enough to have one purgatory queue 
> for all requests. This should make it easier to add more requests for the 
> controller. And to migrate all of the requests that go through ZK to instead 
> get routed to the controller's event queue.
> To resolve this issue we should:
> 1. create one purgatory can be used for all of the controller's RPC.
> 1. move this purgatory out of the replica manager
> 1. refactor the current leader election purgatory to instead use the generic 
> purgatory



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8408) Create one request purgator for all the controller requests

2019-05-22 Thread Jose Armando Garcia Sancio (JIRA)
Jose Armando Garcia Sancio created KAFKA-8408:
-

 Summary: Create one request purgator for all the controller 
requests
 Key: KAFKA-8408
 URL: https://issues.apache.org/jira/browse/KAFKA-8408
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 2.4
Reporter: Jose Armando Garcia Sancio
Assignee: Jose Armando Garcia Sancio


Some RPCs (i.e. leader election) for the controller are going through the 
replica manager. It looks like the reason for this is because currently the 
replica manager has all of the purgatory queues. Another other issue is that we 
have one purgatory per request type; this makes it harder (more work) to add 
requests that go directly to the controller by passing zk.

In the case of the controller it should be enough to have one purgatory queue 
for all requests. This should make it easier to add more requests for the 
controller. And to migrate all of the requests that go through ZK to instead 
get routed to the controller's event queue.

To resolve this issue we should:
1. create one purgatory can be used for all of the controller's RPC.
1. move this purgatory out of the replica manager
1. refactor the current leader election purgatory to instead use the generic 
purgatory



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8407) Connector client overrides broken on client configs with type 'Class' or 'List'

2019-05-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16846152#comment-16846152
 ] 

ASF GitHub Bot commented on KAFKA-8407:
---

C0urante commented on pull request #6789: KAFKA-8407: Fix validation of class 
and list configs in connector client overrides
URL: https://github.com/apache/kafka/pull/6789
 
 
   [Jira](https://issues.apache.org/jira/browse/KAFKA-8407)
   
   Because of how config values are converted into strings in the 
`AbstractHerder.validateClientOverrides()` method after being validated by the 
client override policy, an exception is thrown if the value returned by the 
policy isn't already parsed as the type expected by the client `ConfigDef`. A 
more thorough writeup of how this happens is available in the linked Jira 
ticket.
   
   The fix here involves parsing client override properties before passing them 
to the override policy.
   
   A unit test is added to ensure that several different types of configs are 
validated properly by the herder.
   
   This bug fix should be included in the recently-cut 2.3 branch.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Connector client overrides broken on client configs with type 'Class' or 
> 'List'
> ---
>
> Key: KAFKA-8407
> URL: https://issues.apache.org/jira/browse/KAFKA-8407
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Blocker
>  Labels: connect
>
> When a connector request is submitted that overrides a client configuration 
> that is meant to contain the name of a class (such as 
> {{sasl.login.callback.handler.class}}), a 500 response is generated and the 
> following stack trace can be found in the logs for Connect:
>  
> {quote}[2019-05-22 14:51:36,123] ERROR Uncaught exception in REST call to 
> /connectors 
> (org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper:61)
> java.lang.ClassCastException: java.lang.String cannot be cast to 
> java.lang.Class
> at 
> org.apache.kafka.common.config.ConfigDef.convertToString(ConfigDef.java:774)
> at 
> org.apache.kafka.connect.runtime.AbstractHerder.convertConfigValue(AbstractHerder.java:491)
> at 
> org.apache.kafka.connect.runtime.AbstractHerder.validateClientOverrides(AbstractHerder.java:426)
> at 
> org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:342)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:565)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:562)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:292)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:241)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {quote}
> This appears to be limited only to client configs that are meant to be 
> classes or lists due to the fact that {{ConfigDef.convertToString(...)}} 
> assumes its first argument is an instance of {{Class}} when its second 
> argument is {{ConfigDef.Type.CLASS}} and then casts accordingly, and acts 
> similarly for lists. If the second argument is anything else, {{toString()}} 
> is invoked on it without any casting, avoiding any problems.
>  
> The cause of this is due to the fact that the newly-introduced 
> {{ConnectorClientConfigOverridePolicy}} interface returns a list of 
> {{ConfigValue}} instances for its validation. The {{value()}} for each of 
> these can be any type, although with the default implementations available 
> ({{All}}, {{None}}, {{Principal}}) if one is returned at all it's just the 
> same type of what was passed in for that particular config. In the case of 
> the {{AbstractHerder.validateClientOverrides(...)}} method, the raw strings 
> for the client configs are used. However, the 
> 

[jira] [Assigned] (KAFKA-8400) Do not update follower replica state if the log read failed

2019-05-22 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8400?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson reassigned KAFKA-8400:
--

Assignee: Jason Gustafson

> Do not update follower replica state if the log read failed
> ---
>
> Key: KAFKA-8400
> URL: https://issues.apache.org/jira/browse/KAFKA-8400
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> In {{ReplicaManager.fetchMessages}}, we have the following logic to read from 
> the log and update follower state:
> {code}
> val result = readFromLocalLog(
> replicaId = replicaId,
> fetchOnlyFromLeader = fetchOnlyFromLeader,
> fetchIsolation = fetchIsolation,
> fetchMaxBytes = fetchMaxBytes,
> hardMaxBytesLimit = hardMaxBytesLimit,
> readPartitionInfo = fetchInfos,
> quota = quota)
>   if (isFromFollower) updateFollowerLogReadResults(replicaId, result)
>   else result
> {code}
> The call to {{readFromLocalLog}} could fail for many reasons, in which case 
> we return a LogReadResult with an error set and all fields set to -1. The 
> problem is that we do not check for the error when updating the follower 
> state. As far as I can tell, this does not cause any correctness issues, but 
> we're just asking for trouble. It would be better to check the error before 
> proceeding to `Partition.updateReplicaLogReadResult`. 
> Perhaps even better would be to have {{readFromLocalLog}} return something 
> like {{Either[LogReadResult, Errors]}} so that we are forced to handle the 
> error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8407) Connector client overrides broken on client configs with type 'Class' or 'List'

2019-05-22 Thread Chris Egerton (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-8407:
-
Description: 
When a connector request is submitted that overrides a client configuration 
that is meant to contain the name of a class (such as 
{{sasl.login.callback.handler.class}}), a 500 response is generated and the 
following stack trace can be found in the logs for Connect:

 
{quote}[2019-05-22 14:51:36,123] ERROR Uncaught exception in REST call to 
/connectors 
(org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper:61)

java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Class

at org.apache.kafka.common.config.ConfigDef.convertToString(ConfigDef.java:774)

at 
org.apache.kafka.connect.runtime.AbstractHerder.convertConfigValue(AbstractHerder.java:491)

at 
org.apache.kafka.connect.runtime.AbstractHerder.validateClientOverrides(AbstractHerder.java:426)

at 
org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:342)

at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:565)

at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:562)

at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:292)

at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:241)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)
{quote}
This appears to be limited only to client configs that are meant to be classes 
or lists due to the fact that {{ConfigDef.convertToString(...)}} assumes its 
first argument is an instance of {{Class}} when its second argument is 
{{ConfigDef.Type.CLASS}} and then casts accordingly, and acts similarly for 
lists. If the second argument is anything else, {{toString()}} is invoked on it 
without any casting, avoiding any problems.

 

The cause of this is due to the fact that the newly-introduced 
{{ConnectorClientConfigOverridePolicy}} interface returns a list of 
{{ConfigValue}} instances for its validation. The {{value()}} for each of these 
can be any type, although with the default implementations available ({{All}}, 
{{None}}, {{Principal}}) if one is returned at all it's just the same type of 
what was passed in for that particular config. In the case of the 
{{AbstractHerder.validateClientOverrides(...)}} method, the raw strings for the 
client configs are used. However, the 
{{AbstractHerder.convertConfigValue(...)}} is then called for those raw strings 
but with the {{ConfigDef.Type}} of the config based on the relevant client 
{{ConfigDef}} (i.e., {{ProducerConfig.configDef()}}, 
{{ConsumerConfig.configDef()}}, or {{AdminClientConfig.configDef()}}). This in 
turn can and will result in {{ConfigDef.convertToString(someClassNameAsAString, 
ConfigDef.Type.CLASS)}} being invoked.

 

Although this isn't technically a comprehensive fix, a quick option would be to 
invoke {{ConfigDef.parse(...)}} using the relevant client {{ConfigDef}} before 
passing overrides to the policy. Technically, this would still lead to problems 
if the policy decided to return just the name of a class for a config that of 
type class instead, so we may want to investigate other options as well.

  was:
When a connector request is submitted that overrides a client configuration 
that is meant to contain the name of a class (such as 
{{sasl.login.callback.handler.class}}), a 500 response is generated and the 
following stack trace can be found in the logs for Connect:

 
{quote}[2019-05-22 14:51:36,123] ERROR Uncaught exception in REST call to 
/connectors 
(org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper:61)

java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Class

 at org.apache.kafka.common.config.ConfigDef.convertToString(ConfigDef.java:774)

 at 
org.apache.kafka.connect.runtime.AbstractHerder.convertConfigValue(AbstractHerder.java:491)

 at 
org.apache.kafka.connect.runtime.AbstractHerder.validateClientOverrides(AbstractHerder.java:426)

 at 
org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:342)

 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:565)

 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:562)

 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:292)

 at 

[jira] [Updated] (KAFKA-8407) Connector client overrides broken on client configs with type 'Class'

2019-05-22 Thread Chris Egerton (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-8407:
-
Labels: connect  (was: )

> Connector client overrides broken on client configs with type 'Class'
> -
>
> Key: KAFKA-8407
> URL: https://issues.apache.org/jira/browse/KAFKA-8407
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Blocker
>  Labels: connect
>
> When a connector request is submitted that overrides a client configuration 
> that is meant to contain the name of a class (such as 
> {{sasl.login.callback.handler.class}}), a 500 response is generated and the 
> following stack trace can be found in the logs for Connect:
>  
> {quote}[2019-05-22 14:51:36,123] ERROR Uncaught exception in REST call to 
> /connectors 
> (org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper:61)
> java.lang.ClassCastException: java.lang.String cannot be cast to 
> java.lang.Class
>  at 
> org.apache.kafka.common.config.ConfigDef.convertToString(ConfigDef.java:774)
>  at 
> org.apache.kafka.connect.runtime.AbstractHerder.convertConfigValue(AbstractHerder.java:491)
>  at 
> org.apache.kafka.connect.runtime.AbstractHerder.validateClientOverrides(AbstractHerder.java:426)
>  at 
> org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:342)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:565)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:562)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:292)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:241)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> {quote}
> This appears to be limited only to client configs that are meant to be 
> classes due to the fact that {{ConfigDef.convertToString(...)}} assumes its 
> first argument is an instance of {{Class}} when its second argument is 
> {{ConfigDef.Type.CLASS}} and then casts accordingly. If the second argument 
> is anything else (besides {{ConfigDef.Type.LIST}}, which is handled 
> separately by the {{AbstractHerder}} during client override validation), then 
> {{toString()}} is invoked on it without any casting, avoiding any problems.
>  
> The cause of this is due to the fact that the newly-introduced 
> {{ConnectorClientConfigOverridePolicy}} interface returns a list of 
> {{ConfigValue}} instances for its validation. The {{value()}} for each of 
> these can be any type, although with the default implementations available 
> ({{All}}, {{None}}, {{Principal}}) if one is returned at all it's just the 
> same type of what was passed in for that particular config. In the case of 
> the {{AbstractHerder.validateClientOverrides(...)}} method, the raw strings 
> for the client configs are used. However, the 
> {{AbstractHerder.convertConfigValue(...)}} is then called for those raw 
> strings but with the {{ConfigDef.Type}} of the config based on the relevant 
> client {{ConfigDef}} (i.e., {{ProducerConfig.configDef()}}, 
> {{ConsumerConfig.configDef()}}, or {{AdminClientConfig.configDef()}}). This 
> in turn can and will result in 
> {{ConfigDef.convertToString(someClassNameAsAString, ConfigDef.Type.CLASS)}} 
> being invoked.
>  
> Although this isn't technically a comprehensive fix, a quick option would be 
> to invoke {{ConfigDef.parse(...)}} using the relevant client {{ConfigDef}} 
> before passing overrides to the policy. Technically, this would still lead to 
> problems if the policy decided to return just the name of a class for a 
> config that of type class instead, so we may want to investigate other 
> options as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8407) Connector client overrides broken on client configs with type 'Class'

2019-05-22 Thread Chris Egerton (JIRA)
Chris Egerton created KAFKA-8407:


 Summary: Connector client overrides broken on client configs with 
type 'Class'
 Key: KAFKA-8407
 URL: https://issues.apache.org/jira/browse/KAFKA-8407
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.3.0
Reporter: Chris Egerton
Assignee: Chris Egerton


When a connector request is submitted that overrides a client configuration 
that is meant to contain the name of a class (such as 
{{sasl.login.callback.handler.class}}), a 500 response is generated and the 
following stack trace can be found in the logs for Connect:

 
{quote}[2019-05-22 14:51:36,123] ERROR Uncaught exception in REST call to 
/connectors 
(org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper:61)

java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Class

 at org.apache.kafka.common.config.ConfigDef.convertToString(ConfigDef.java:774)

 at 
org.apache.kafka.connect.runtime.AbstractHerder.convertConfigValue(AbstractHerder.java:491)

 at 
org.apache.kafka.connect.runtime.AbstractHerder.validateClientOverrides(AbstractHerder.java:426)

 at 
org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:342)

 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:565)

 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:562)

 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:292)

 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:241)

 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

 at java.util.concurrent.FutureTask.run(FutureTask.java:266)

 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

 at java.lang.Thread.run(Thread.java:748)
{quote}
This appears to be limited only to client configs that are meant to be classes 
due to the fact that {{ConfigDef.convertToString(...)}} assumes its first 
argument is an instance of {{Class}} when its second argument is 
{{ConfigDef.Type.CLASS}} and then casts accordingly. If the second argument is 
anything else (besides {{ConfigDef.Type.LIST}}, which is handled separately by 
the {{AbstractHerder}} during client override validation), then {{toString()}} 
is invoked on it without any casting, avoiding any problems.

 

The cause of this is due to the fact that the newly-introduced 
{{ConnectorClientConfigOverridePolicy}} interface returns a list of 
{{ConfigValue}} instances for its validation. The {{value()}} for each of these 
can be any type, although with the default implementations available ({{All}}, 
{{None}}, {{Principal}}) if one is returned at all it's just the same type of 
what was passed in for that particular config. In the case of the 
{{AbstractHerder.validateClientOverrides(...)}} method, the raw strings for the 
client configs are used. However, the 
{{AbstractHerder.convertConfigValue(...)}} is then called for those raw strings 
but with the {{ConfigDef.Type}} of the config based on the relevant client 
{{ConfigDef}} (i.e., {{ProducerConfig.configDef()}}, 
{{ConsumerConfig.configDef()}}, or {{AdminClientConfig.configDef()}}). This in 
turn can and will result in {{ConfigDef.convertToString(someClassNameAsAString, 
ConfigDef.Type.CLASS)}} being invoked.

 

Although this isn't technically a comprehensive fix, a quick option would be to 
invoke {{ConfigDef.parse(...)}} using the relevant client {{ConfigDef}} before 
passing overrides to the policy. Technically, this would still lead to problems 
if the policy decided to return just the name of a class for a config that of 
type class instead, so we may want to investigate other options as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8399) Add back `internal.leave.group.on.close` config for KStreams

2019-05-22 Thread Bill Bejeck (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck resolved KAFKA-8399.

Resolution: Fixed

> Add back `internal.leave.group.on.close` config for KStreams
> 
>
> Key: KAFKA-8399
> URL: https://issues.apache.org/jira/browse/KAFKA-8399
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> The behavior for KStream rebalance default has changed from no leave group to 
> leave group. We should add it back for system test pass, reduce the risk of 
> being detected not working in other public cases.
> Reference: [https://github.com/apache/kafka/pull/6673]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8399) Add back `internal.leave.group.on.close` config for KStreams

2019-05-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16845963#comment-16845963
 ] 

ASF GitHub Bot commented on KAFKA-8399:
---

bbejeck commented on pull request #6779: KAFKA-8399: bring back 
internal.leave.group.on.close config for KStream
URL: https://github.com/apache/kafka/pull/6779
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add back `internal.leave.group.on.close` config for KStreams
> 
>
> Key: KAFKA-8399
> URL: https://issues.apache.org/jira/browse/KAFKA-8399
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> The behavior for KStream rebalance default has changed from no leave group to 
> leave group. We should add it back for system test pass, reduce the risk of 
> being detected not working in other public cases.
> Reference: [https://github.com/apache/kafka/pull/6673]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7245) Deprecate WindowStore#put(key, value)

2019-05-22 Thread Omkar Mestry (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16845839#comment-16845839
 ] 

Omkar Mestry commented on KAFKA-7245:
-

[~mjsax], can you please elaborate on the issue as the KIP is needed to be 
generated for it. Is this changes only required because the core part uses the 
new put method (with three parameters) and we need to upgrade the test to use 
the put method (with three parameters).

> Deprecate WindowStore#put(key, value)
> -
>
> Key: KAFKA-7245
> URL: https://issues.apache.org/jira/browse/KAFKA-7245
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Omkar Mestry
>Priority: Minor
>  Labels: needs-kip, newbie
>
> We want to remove `WindowStore#put(key, value)` – for this, we first need to 
> deprecate is via a KIP and remove later.
> Instead of using `WindowStore#put(key, value)` we need to migrate code to 
> specify the timestamp explicitly using `WindowStore#put(key, value, 
> timestamp)`. The current code base use the explicit call to set the timestamp 
> in production code already. The simplified `put(key, value)` is only used in 
> tests, and thus, we would need to update those tests.
> [https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7678) Failed to close producer due to java.lang.NullPointerException

2019-05-22 Thread Sebastiaan (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16845824#comment-16845824
 ] 

Sebastiaan commented on KAFKA-7678:
---

Hi guys! I think this fix is not complete yet. In version 2.1.1 we are getting 
a very similar exception, but in the 'flush' method that is called pre-close. 
This is the full stacktrace:


{code:java}
message: stream-thread 
[webhook-poster-7034dbb0-7423-476b-98f3-d18db675d6d6-StreamThread-1] Failed 
while closing StreamTask 1_26 due to the following error:
logger_name: org.apache.kafka.streams.processor.internals.AssignedStreamsTasks

java.lang.NullPointerException: null
    at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
    at 
org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397)
    at 
org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758)
{code}


Followed by:

 
{code:java}
message: task [1_26] Could not close task due to the following error:
logger_name: org.apache.kafka.streams.processor.internals.StreamTask

java.lang.NullPointerException: null
    at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
    at 
org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397)
    at 
org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){code}

If I look at the source code at this point, I see a nice null check in the 
close method, but not in the flush method that is called just before that:
{code:java}
public void flush() {
this.log.debug("Flushing producer");
this.producer.flush();
this.checkForException();
}

public void close() {
this.log.debug("Closing producer");
if (this.producer != null) {
this.producer.close();
this.producer = null;
}

this.checkForException();
}{code}
{color:#80} {color}

Seems to my (ignorant) eye that the flush method should also be wrapped in a 
null check.

> Failed to close producer due to java.lang.NullPointerException
> --
>
> Key: KAFKA-7678
> URL: https://issues.apache.org/jira/browse/KAFKA-7678
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.1, 2.0.1, 2.1.0
>Reporter: Jonathan Santilli
>Assignee: Jonathan Santilli
>Priority: Minor
> Fix For: 1.1.2, 2.2.0, 2.1.1, 2.0.2
>
>
> This occurs when the group is rebalancing in a Kafka Stream application and 
> the process (the Kafka Stream application) receives a *SIGTERM* to stop it 
> gracefully.
>  
>  
> {noformat}
> ERROR org.apache.kafka.streams.processor.internals.StreamTask - task [1_46] 
> Failed to close producer due to the following error:
> java.lang.NullPointerException
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.close(RecordCollectorImpl.java:252)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.maybeAbortTransactionAndCloseRecordCollector(StreamTask.java:607)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:584)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
>  at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.closeUnclean(AssignedTasks.java:428)
>  at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:408)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
>  at 
> 

[jira] [Comment Edited] (KAFKA-7678) Failed to close producer due to java.lang.NullPointerException

2019-05-22 Thread Sebastiaan (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16845824#comment-16845824
 ] 

Sebastiaan edited comment on KAFKA-7678 at 5/22/19 12:16 PM:
-

Hi guys! I think this fix is not complete yet. In version 2.1.1 we are getting 
a very similar exception, but in the 'flush' method that is called pre-close. 
This is the full stacktrace:
{code:java}
message: stream-thread 
[webhook-poster-7034dbb0-7423-476b-98f3-d18db675d6d6-StreamThread-1] Failed 
while closing StreamTask 1_26 due to the following error:
logger_name: org.apache.kafka.streams.processor.internals.AssignedStreamsTasks

java.lang.NullPointerException: null
    at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
    at 
org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397)
    at 
org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758)
{code}
Followed by:

 
{code:java}
message: task [1_26] Could not close task due to the following error:
logger_name: org.apache.kafka.streams.processor.internals.StreamTask

java.lang.NullPointerException: null
    at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
    at 
org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397)
    at 
org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){code}
If I look at the source code at this point, I see a nice null check in the 
close method, but not in the flush method that is called just before that:
{code:java}
public void flush() {
this.log.debug("Flushing producer");
this.producer.flush();
this.checkForException();
}

public void close() {
this.log.debug("Closing producer");
if (this.producer != null) {
this.producer.close();
this.producer = null;
}

this.checkForException();
}{code}
{color:#80} {color}

Seems to my (ignorant) eye that the flush method should also be wrapped in a 
null check.

Should this issue be re-opened? Or should I make a new issue and refer to this 
one?


was (Author: sebastiaan83):
Hi guys! I think this fix is not complete yet. In version 2.1.1 we are getting 
a very similar exception, but in the 'flush' method that is called pre-close. 
This is the full stacktrace:


{code:java}
message: stream-thread 
[webhook-poster-7034dbb0-7423-476b-98f3-d18db675d6d6-StreamThread-1] Failed 
while closing StreamTask 1_26 due to the following error:
logger_name: org.apache.kafka.streams.processor.internals.AssignedStreamsTasks

java.lang.NullPointerException: null
    at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
    at 
org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397)
    at 
org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758)
{code}


Followed by:

 
{code:java}
message: task [1_26] Could not close task due to the following error:
logger_name: org.apache.kafka.streams.processor.internals.StreamTask


[jira] [Commented] (KAFKA-7245) Deprecate WindowStore#put(key, value)

2019-05-22 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16845782#comment-16845782
 ] 

Matthias J. Sax commented on KAFKA-7245:


Well. First we need a KIP to deprecate the method. Not sure, if there are any 
docs that need to be updated.

Rewriting tests (and actually code – in case there is any oversight) would be 
part of this work, too, after the KIP is accepted.

> Deprecate WindowStore#put(key, value)
> -
>
> Key: KAFKA-7245
> URL: https://issues.apache.org/jira/browse/KAFKA-7245
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Omkar Mestry
>Priority: Minor
>  Labels: needs-kip, newbie
>
> We want to remove `WindowStore#put(key, value)` – for this, we first need to 
> deprecate is via a KIP and remove later.
> Instead of using `WindowStore#put(key, value)` we need to migrate code to 
> specify the timestamp explicitly using `WindowStore#put(key, value, 
> timestamp)`. The current code base use the explicit call to set the timestamp 
> in production code already. The simplified `put(key, value)` is only used in 
> tests, and thus, we would need to update those tests.
> [https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8406) kafka-topics throws wrong error on invalid configuration with bootstrap-server and alter config

2019-05-22 Thread Stanislav Kozlovski (JIRA)
Stanislav Kozlovski created KAFKA-8406:
--

 Summary: kafka-topics throws wrong error on invalid configuration 
with bootstrap-server and alter config
 Key: KAFKA-8406
 URL: https://issues.apache.org/jira/browse/KAFKA-8406
 Project: Kafka
  Issue Type: Improvement
Reporter: Stanislav Kozlovski
Assignee: Stanislav Kozlovski


Running
{code:java}
./kafka-topics --bootstrap-server  --alter --config 
retention.ms=360 --topic topic{code}
Results in
{code:java}
Missing required argument "[partitions]"{code}
Running
{code:java}
./kafka-topics --bootstrap-server  --alter --config 
retention.ms=360 --topic topic --partitions 25{code}
Results in
{code:java}
Option combination "[bootstrap-server],[config]" can't be used with option 
"[alter]"{code}

For better clarity, we should just throw the last error outright.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)