[jira] [Commented] (KAFKA-7652) Kafka Streams Session store performance degradation from 0.10.2.2 to 0.11.0.0

2019-05-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16850501#comment-16850501
 ] 

ASF GitHub Bot commented on KAFKA-7652:
---

guozhangwang commented on pull request #6349: KAFKA-7652: [WIP] Peel off the 
segmenting layer on session store caching
URL: https://github.com/apache/kafka/pull/6349
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka Streams Session store performance degradation from 0.10.2.2 to 0.11.0.0
> -
>
> Key: KAFKA-7652
> URL: https://issues.apache.org/jira/browse/KAFKA-7652
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0, 0.11.0.1, 0.11.0.2, 0.11.0.3, 1.1.1, 2.0.0, 
> 2.0.1
>Reporter: Jonathan Gordon
>Assignee: Guozhang Wang
>Priority: Major
>  Labels: kip
> Fix For: 2.3.0
>
> Attachments: 0.10.2.1-NamedCache.txt, 2.2.0-rc0_b-NamedCache.txt, 
> 2.3.0-7652-NamedCache.txt, kafka_10_2_1_flushes.txt, kafka_11_0_3_flushes.txt
>
>
> I'm creating this issue in response to [~guozhang]'s request on the mailing 
> list:
> [https://lists.apache.org/thread.html/97d620f4fd76be070ca4e2c70e2fda53cafe051e8fc4505dbcca0321@%3Cusers.kafka.apache.org%3E]
> We are attempting to upgrade our Kafka Streams application from 0.10.2.1 but 
> experience a severe performance degradation. The highest amount of CPU time 
> seems spent in retrieving from the local cache. Here's an example thread 
> profile with 0.11.0.0:
> [https://i.imgur.com/l5VEsC2.png]
> When things are running smoothly we're gated by retrieving from the state 
> store with acceptable performance. Here's an example thread profile with 
> 0.10.2.1:
> [https://i.imgur.com/IHxC2cZ.png]
> Some investigation reveals that it appears we're performing about 3 orders 
> magnitude more lookups on the NamedCache over a comparable time period. I've 
> attached logs of the NamedCache flush logs for 0.10.2.1 and 0.11.0.3.
> We're using session windows and have the app configured for 
> commit.interval.ms = 30 * 1000 and cache.max.bytes.buffering = 10485760
> I'm happy to share more details if they would be helpful. Also happy to run 
> tests on our data.
> I also found this issue, which seems like it may be related:
> https://issues.apache.org/jira/browse/KAFKA-4904
>  
> KIP-420: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-420%3A+Add+Single+Value+Fetch+in+Session+Stores]
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8428) Cleanup LogValidator#validateMessagesAndAssignOffsetsCompressed to assume single record batch only

2019-05-28 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-8428.
--
   Resolution: Fixed
Fix Version/s: 2.3.0

> Cleanup LogValidator#validateMessagesAndAssignOffsetsCompressed to assume 
> single record batch only
> --
>
> Key: KAFKA-8428
> URL: https://issues.apache.org/jira/browse/KAFKA-8428
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 2.3.0
>
>
> Today, the client -> server record batching protocol works like this:
> 1. With magic v2, we always require a single batch within compressed set. And 
> inside the LogValidator#validateMessagesAndAssignOffsetsCompressed we assume 
> so already.
> 2. With magic v1, our code actually also assumes one record batch, since 
> whenever inPlaceAssignment is true we assume one batch only; however with 
> magic v1 it is still possible that inPlaceAssignment == true.
> 3. With magic v0, our code does handle the case with multiple record batch, 
> since with v0 inPlaceAssignment is always false.
> This makes the logic of 
> LogValidator#validateMessagesAndAssignOffsetsCompressed quite twisted and 
> complicated.
> Since all standard clients implementation we've known so far actually all 
> wrap a single batch with compressed (of course, we cannot guarantee this is 
> the case for all clients in the wild, but I think the chance of multiple 
> batches with compressed records should really be rare), I think it's better 
> just to make it as a universal requirement for all versions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8428) Cleanup LogValidator#validateMessagesAndAssignOffsetsCompressed to assume single record batch only

2019-05-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16850495#comment-16850495
 ] 

ASF GitHub Bot commented on KAFKA-8428:
---

guozhangwang commented on pull request #6816: KAFKA-8428: Always require a 
single batch with compressed messages
URL: https://github.com/apache/kafka/pull/6816
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Cleanup LogValidator#validateMessagesAndAssignOffsetsCompressed to assume 
> single record batch only
> --
>
> Key: KAFKA-8428
> URL: https://issues.apache.org/jira/browse/KAFKA-8428
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
>
> Today, the client -> server record batching protocol works like this:
> 1. With magic v2, we always require a single batch within compressed set. And 
> inside the LogValidator#validateMessagesAndAssignOffsetsCompressed we assume 
> so already.
> 2. With magic v1, our code actually also assumes one record batch, since 
> whenever inPlaceAssignment is true we assume one batch only; however with 
> magic v1 it is still possible that inPlaceAssignment == true.
> 3. With magic v0, our code does handle the case with multiple record batch, 
> since with v0 inPlaceAssignment is always false.
> This makes the logic of 
> LogValidator#validateMessagesAndAssignOffsetsCompressed quite twisted and 
> complicated.
> Since all standard clients implementation we've known so far actually all 
> wrap a single batch with compressed (of course, we cannot guarantee this is 
> the case for all clients in the wild, but I think the chance of multiple 
> batches with compressed records should really be rare), I think it's better 
> just to make it as a universal requirement for all versions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-8442) Inconsistent ISR output in topic command when using --bootstrap-server

2019-05-28 Thread huxihx (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx reassigned KAFKA-8442:
-

Assignee: huxihx

> Inconsistent ISR output in topic command when using --bootstrap-server
> --
>
> Key: KAFKA-8442
> URL: https://issues.apache.org/jira/browse/KAFKA-8442
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: huxihx
>Priority: Major
>
> If there is no leader for a partition, the Metadata API returns an empty ISR. 
> When using the `--bootstrap-server` option with `kafka-topics.sh`, this leads 
> to the following output:
> {code}
> Topic:foo   PartitionCount:1ReplicationFactor:2 
> Configs:segment.bytes=1073741824
> Topic: foo  Partition: 0Leader: noneReplicas: 1,3   Isr: 
> {code}
> When using `--zookeeper`, we display the current ISR correctly:
> {code}
> Topic:foo   PartitionCount:1ReplicationFactor:2 Configs:
> Topic: foo  Partition: 0Leader: -1  Replicas: 1,3   Isr: 1
> {code}
> To avoid confusion, we should make this output consistent or at least not 
> misleading. We should either change the Metadata API to print the ISR when we 
> have it or we can change the output of the topic command to `N/A` or 
> something like that.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6029) Controller should wait for the leader migration to finish before ack a ControlledShutdownRequest

2019-05-28 Thread Jiangjie Qin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-6029:

Fix Version/s: 2.2.0

> Controller should wait for the leader migration to finish before ack a 
> ControlledShutdownRequest
> 
>
> Key: KAFKA-6029
> URL: https://issues.apache.org/jira/browse/KAFKA-6029
> Project: Kafka
>  Issue Type: Sub-task
>  Components: controller, core
>Affects Versions: 1.0.0
>Reporter: Jiangjie Qin
>Assignee: Zhanxiang (Patrick) Huang
>Priority: Major
> Fix For: 2.2.0
>
>
> In the controlled shutdown process, the controller will return the 
> ControlledShutdownResponse immediately after the state machine is updated. 
> Because the LeaderAndIsrRequests and UpdateMetadataRequests may not have been 
> successfully processed by the brokers, the leader migration and active ISR 
> shrink may not have done when the shutting down broker proceeds to shut down. 
> This will cause some of the leaders to take up to replica.lag.time.max.ms to 
> kick the broker out of ISR. Meanwhile the produce purgatory size will grow.
> Ideally, the controller should wait until all the LeaderAndIsrRequests and 
> UpdateMetadataRequests has been acked before sending back the 
> ControlledShutdownResponse.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6029) Controller should wait for the leader migration to finish before ack a ControlledShutdownRequest

2019-05-28 Thread Jiangjie Qin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin resolved KAFKA-6029.
-
Resolution: Fixed

> Controller should wait for the leader migration to finish before ack a 
> ControlledShutdownRequest
> 
>
> Key: KAFKA-6029
> URL: https://issues.apache.org/jira/browse/KAFKA-6029
> Project: Kafka
>  Issue Type: Sub-task
>  Components: controller, core
>Affects Versions: 1.0.0
>Reporter: Jiangjie Qin
>Assignee: Zhanxiang (Patrick) Huang
>Priority: Major
>
> In the controlled shutdown process, the controller will return the 
> ControlledShutdownResponse immediately after the state machine is updated. 
> Because the LeaderAndIsrRequests and UpdateMetadataRequests may not have been 
> successfully processed by the brokers, the leader migration and active ISR 
> shrink may not have done when the shutting down broker proceeds to shut down. 
> This will cause some of the leaders to take up to replica.lag.time.max.ms to 
> kick the broker out of ISR. Meanwhile the produce purgatory size will grow.
> Ideally, the controller should wait until all the LeaderAndIsrRequests and 
> UpdateMetadataRequests has been acked before sending back the 
> ControlledShutdownResponse.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8430) Unit test to make sure `group.id` and `group.instance.id` won't affect each other

2019-05-28 Thread Boyang Chen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-8430.

Resolution: Fixed

> Unit test to make sure `group.id` and `group.instance.id` won't affect each 
> other
> -
>
> Key: KAFKA-8430
> URL: https://issues.apache.org/jira/browse/KAFKA-8430
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache

2019-05-28 Thread Del Bao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16850241#comment-16850241
 ] 

Del Bao commented on KAFKA-4212:


vote for this. Windowed store makes the logic complex and confusing. At least a 
TTL based wrapping would be great. 

> Add a key-value store that is a TTL persistent cache
> 
>
> Key: KAFKA-4212
> URL: https://issues.apache.org/jira/browse/KAFKA-4212
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Priority: Major
>  Labels: api
>
> Some jobs needs to maintain as state a large set of key-values for some 
> period of time.  I.e. they need to maintain a TTL cache of values potentially 
> larger than memory. 
> Currently Kafka Streams provides non-windowed and windowed key-value stores.  
> Neither is an exact fit to this use case.  
> The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as 
> required, but does not support expiration.  The TTL option of RocksDB is 
> explicitly not used.
> The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment 
> dropping, but it stores multiple items per key, based on their timestamp.  
> But this store can be repurposed as a cache by fetching the items in reverse 
> chronological order and returning the first item found.
> KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here 
> we desire a variable-capacity memory-overflowing TTL caching store.
> Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be 
> useful to have an official and proper TTL cache API and implementation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8438) Add API to allow user to define end behavior of consumer failure

2019-05-28 Thread Richard Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu updated KAFKA-8438:
--
Description: 
Recently, in a concerted effort to make Kafka's rebalances less painful, 
various approaches has been used to reduce the number of and impact of 
rebalances. Often, the trigger of a rebalance is a failure of some sort or a 
thrown exception during processing, in which case, the workload will be 
redistributed among surviving threads. Working to reduce rebalances due to 
random consumer crashes, a recent change to Kafka internals had been made 
(which introduces the concept of static membership) that prevents a rebalance 
from occurring within {{session.timeout.ms}} in the hope that the consumer 
thread which crashed would recover in that time interval and rejoin the group.

However, in some cases, some consumer threads would permanently go down or 
remain dead for long periods of time. In these scenarios, users of Kafka would 
possibly not be aware of such a crash until hours later after it happened which 
forces Kafka users to manually start a new KafkaConsumer process a considerable 
period of time after the failure had occurred. That is where the addition of a 
callback such as {{onConsumerFailure}} would help. There are multiple use cases 
for this callback (which is defined by the user). {{onConsumerFailure}} is 
called when a particular consumer thread goes under for some specified time 
interval (i.e. a config called {{acceptable.consumer.failure.timeout.ms}}). 
When called, this method could be used to log a consumer failure or should the 
user wish it, create a new thread which would then rejoin the consumer group 
(which could also include the required {{group.instance.id}} so that a 
rebalance wouldn't be re-triggered –- we would need to think about that). 

Should the old thread recover and attempt to rejoin the consumer group (with 
the substitute thread being part of the group), the old thread will be denied 
access and an exception would be thrown (to indicate that another process has 
already taken its place).

 

 

 

  was:
Recently, in a concerted effort to make Kafka's rebalances less painful, 
various approaches has been used to reduce the number of and impact of 
rebalances. Often, the trigger of a rebalance is a failure of some sort, in 
which case, the workload will be redistributed among surviving threads. Working 
to reduce rebalances due to random consumer crashes, a recent change to Kafka 
internals had been made (which introduces the concept of static membership) 
that prevents a rebalance from occurring within {{session.timeout.ms}} in the 
hope that the consumer thread which crashed would recover in that time interval 
and rejoin the group.

However, in some cases, some consumer threads would permanently go down or 
remain dead for long periods of time. In these scenarios, users of Kafka would 
possibly not be aware of such a crash until hours later after it happened which 
forces Kafka users to manually start a new KafkaConsumer process a considerable 
period of time after the failure had occurred. That is where the addition of a 
callback such as {{onConsumerFailure}} would help. There are multiple use cases 
for this callback (which is defined by the user). {{onConsumerFailure}} is 
called when a particular consumer thread goes under for some specified time 
interval (i.e. a config called {{acceptable.consumer.failure.timeout.ms}}). 
When called, this method could be used to log a consumer failure or should the 
user wish it, create a new thread which would then rejoin the consumer group 
(which could also include the required {{group.instance.id}} so that a 
rebalance wouldn't be re-triggered –- we would need to think about that). 

Should the old thread recover and attempt to rejoin the consumer group (with 
the substitute thread being part of the group), the old thread will be denied 
access and an exception would be thrown (to indicate that another process has 
already taken its place).

 

 

 


> Add API to allow user to define end behavior of consumer failure
> 
>
> Key: KAFKA-8438
> URL: https://issues.apache.org/jira/browse/KAFKA-8438
> Project: Kafka
>  Issue Type: New Feature
>  Components: consumer
>Reporter: Richard Yu
>Priority: Major
>  Labels: needs-dicussion, needs-kip
>
> Recently, in a concerted effort to make Kafka's rebalances less painful, 
> various approaches has been used to reduce the number of and impact of 
> rebalances. Often, the trigger of a rebalance is a failure of some sort or a 
> thrown exception during processing, in which case, the workload will be 
> redistributed among surviving threads. Working to reduce rebalances due to 
> random consumer crashes, a recent change to 

[jira] [Commented] (KAFKA-8438) Add API to allow user to define end behavior of consumer failure

2019-05-28 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16850239#comment-16850239
 ] 

Richard Yu commented on KAFKA-8438:
---

Hi [~bchen225242] Thanks for sharing your thoughts. :)  I didn't realize that 
processing exceptions will probably be the more likely cause for consumer crash 
(will change the issue description).  

I have done some thinking about this, and I think it could be done in a 
straightforward manner. Heartbeat thread does exist for a reason, and 
periodically, by design, Kafka server/broker will send a heartbeat request to 
check that the consumer is alive (otherwise, how would broker know when to kick 
a consumer out of the group and cause a rebalance?). We shouldn't need an extra 
daemon thread. Instead, I think it would be possible for Kafka broker to 
trigger this callback if it has detected that a consumer has fallen out of the 
group. 

Also, you mentioned something about the external monitoring service. This issue 
could help save the user some programming time of implementing this service 
because Kafka is in charge of detecting the failure instead of the client. (I 
mean it makes sense: we have more metrics and resources available for reference 
than the user since they couldn't query anything in internals). In summary, the 
user does not have to do the monitoring part, we do. I think that would 
significantly simplify the program they need to write to account for consumer 
failure.

> Add API to allow user to define end behavior of consumer failure
> 
>
> Key: KAFKA-8438
> URL: https://issues.apache.org/jira/browse/KAFKA-8438
> Project: Kafka
>  Issue Type: New Feature
>  Components: consumer
>Reporter: Richard Yu
>Priority: Major
>  Labels: needs-dicussion, needs-kip
>
> Recently, in a concerted effort to make Kafka's rebalances less painful, 
> various approaches has been used to reduce the number of and impact of 
> rebalances. Often, the trigger of a rebalance is a failure of some sort, in 
> which case, the workload will be redistributed among surviving threads. 
> Working to reduce rebalances due to random consumer crashes, a recent change 
> to Kafka internals had been made (which introduces the concept of static 
> membership) that prevents a rebalance from occurring within 
> {{session.timeout.ms}} in the hope that the consumer thread which crashed 
> would recover in that time interval and rejoin the group.
> However, in some cases, some consumer threads would permanently go down or 
> remain dead for long periods of time. In these scenarios, users of Kafka 
> would possibly not be aware of such a crash until hours later after it 
> happened which forces Kafka users to manually start a new KafkaConsumer 
> process a considerable period of time after the failure had occurred. That is 
> where the addition of a callback such as {{onConsumerFailure}} would help. 
> There are multiple use cases for this callback (which is defined by the 
> user). {{onConsumerFailure}} is called when a particular consumer thread goes 
> under for some specified time interval (i.e. a config called 
> {{acceptable.consumer.failure.timeout.ms}}). When called, this method could 
> be used to log a consumer failure or should the user wish it, create a new 
> thread which would then rejoin the consumer group (which could also include 
> the required {{group.instance.id}} so that a rebalance wouldn't be 
> re-triggered –- we would need to think about that). 
> Should the old thread recover and attempt to rejoin the consumer group (with 
> the substitute thread being part of the group), the old thread will be denied 
> access and an exception would be thrown (to indicate that another process has 
> already taken its place).
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8430) Unit test to make sure `group.id` and `group.instance.id` won't affect each other

2019-05-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16850238#comment-16850238
 ] 

ASF GitHub Bot commented on KAFKA-8430:
---

guozhangwang commented on pull request #6830: KAFKA-8430: unit test to make 
sure null `group.id` and valid `group.instance.id` are valid combo
URL: https://github.com/apache/kafka/pull/6830
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Unit test to make sure `group.id` and `group.instance.id` won't affect each 
> other
> -
>
> Key: KAFKA-8430
> URL: https://issues.apache.org/jira/browse/KAFKA-8430
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8410) Strengthen the types of Processors, at least in the DSL, maybe in the PAPI as well

2019-05-28 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16850204#comment-16850204
 ] 

John Roesler edited comment on KAFKA-8410 at 5/28/19 10:35 PM:
---

Thanks for the consideration, [~guozhang]. I've created a POC so you can see 
more details about what this thing would actually look like.

One selling point is that by adding these restrictions, I surfaced one bug in 
which a ValueGetter tries to look up an incoming-type key, not the 
outgoing-type key. If this method were ever actually used, it would result in a 
class-cast exception.

I also surfaced multiple instances in which the generic type parameters in our 
DSL builder code is simply incorrect. Common mistakes include:
1. confusing the incoming and outgoing types
2. confusing V and Change
3. confusing K and Windowed

These aren't properly "bugs" because these generic type parameters are erased 
before run-time, and we never attempt to use incorrectly typed values, so we 
wouldn't surface any exceptions. But it's a significant impediment to reading 
and maintaining the code if we have incorrect type information on our 
variables. Similar to having an incorrect variable name (like naming a variable 
"average" when it's actually holding the count of something).

Regarding your concern about heterogeneous output PAPI code: It should be 
pretty straightforward to declare. Currently, the declaration would be 
`implements Processor`, which would become `implements Processor`. This allows the same range of output values that we currently 
allow.


was (Author: vvcephei):
Thanks for the consideration, [~guozhang]. I've created a POC so you can see 
more details about what this thing would actually look like.

One selling point is that by adding these restrictions, I surfaced one bug in 
which a ValueGetter tries to look up an incoming-type key, not the 
outgoing-type key. If this method were ever actually used, it would result in a 
class-cast exception.

I also surfaced multiple instances in which the generic type parameters in our 
DSL builder code is simply incorrect. Common mistakes include:
1. confusing the incoming and outgoing types
2. confusing V and Change
3. confusing K and Windowed

These aren't properly "bugs" because these generic type parameters are erased 
before run-time, and we never attempt to use incorrectly typed values, so we 
wouldn't surface any exceptions. But it's a significant impediment to reading 
and maintaining the code if we have incorrect type information on our 
variables. Similar to having an incorrect variable name (like naming a variable 
"average" when it's actually holding the count of something).

> Strengthen the types of Processors, at least in the DSL, maybe in the PAPI as 
> well
> --
>
> Key: KAFKA-8410
> URL: https://issues.apache.org/jira/browse/KAFKA-8410
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: tech-debt
>
> Presently, it's very difficult to have confidence when adding to or modifying 
> processors in the DSL. There's a lot of raw types, duck-typing, and casting 
> that contribute to this problem.
> The root, though, is that the generic types on `Processor` refer only to 
> the _input_ key and value types. No information is captured or verified about 
> what the _output_ types of a processor are. For example, this leads to 
> widespread confusion in the code base about whether a processor produces `V`s 
> or `Change`s. The type system actually makes matters worse, since we use 
> casts to make the processors conform to declared types that are in fact 
> wrong, but are never checked due to erasure.
> We can start to make some headway on this tech debt by adding some types to 
> the ProcessorContext that bound the `` that may be passed to 
> `context.forward`. Then, we can build on this by fully specifying the input 
> and output types of the Processors, which in turn would let us eliminate the 
> majority of unchecked casts in the DSL operators.
> I'm not sure whether adding these generic types to the existing 
> ProcessorContext and Processor interfaces, which would also affect the PAPI 
> has any utility, or whether we should make this purely an internal change by 
> introducing GenericProcessorContext and GenericProcessor peer interfaces for 
> the DSL to use.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8410) Strengthen the types of Processors, at least in the DSL, maybe in the PAPI as well

2019-05-28 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16850204#comment-16850204
 ] 

John Roesler commented on KAFKA-8410:
-

Thanks for the consideration, [~guozhang]. I've created a POC so you can see 
more details about what this thing would actually look like.

One selling point is that by adding these restrictions, I surfaced one bug in 
which a ValueGetter tries to look up an incoming-type key, not the 
outgoing-type key. If this method were ever actually used, it would result in a 
class-cast exception.

I also surfaced multiple instances in which the generic type parameters in our 
DSL builder code is simply incorrect. Common mistakes include:
1. confusing the incoming and outgoing types
2. confusing V and Change
3. confusing K and Windowed

These aren't properly "bugs" because these generic type parameters are erased 
before run-time, and we never attempt to use incorrectly typed values, so we 
wouldn't surface any exceptions. But it's a significant impediment to reading 
and maintaining the code if we have incorrect type information on our 
variables. Similar to having an incorrect variable name (like naming a variable 
"average" when it's actually holding the count of something).

> Strengthen the types of Processors, at least in the DSL, maybe in the PAPI as 
> well
> --
>
> Key: KAFKA-8410
> URL: https://issues.apache.org/jira/browse/KAFKA-8410
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: tech-debt
>
> Presently, it's very difficult to have confidence when adding to or modifying 
> processors in the DSL. There's a lot of raw types, duck-typing, and casting 
> that contribute to this problem.
> The root, though, is that the generic types on `Processor` refer only to 
> the _input_ key and value types. No information is captured or verified about 
> what the _output_ types of a processor are. For example, this leads to 
> widespread confusion in the code base about whether a processor produces `V`s 
> or `Change`s. The type system actually makes matters worse, since we use 
> casts to make the processors conform to declared types that are in fact 
> wrong, but are never checked due to erasure.
> We can start to make some headway on this tech debt by adding some types to 
> the ProcessorContext that bound the `` that may be passed to 
> `context.forward`. Then, we can build on this by fully specifying the input 
> and output types of the Processors, which in turn would let us eliminate the 
> majority of unchecked casts in the DSL operators.
> I'm not sure whether adding these generic types to the existing 
> ProcessorContext and Processor interfaces, which would also affect the PAPI 
> has any utility, or whether we should make this purely an internal change by 
> introducing GenericProcessorContext and GenericProcessor peer interfaces for 
> the DSL to use.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8410) Strengthen the types of Processors, at least in the DSL, maybe in the PAPI as well

2019-05-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16850199#comment-16850199
 ] 

ASF GitHub Bot commented on KAFKA-8410:
---

vvcephei commented on pull request #6833: KAFKA-8410: [POC] strong typing for 
processors
URL: https://github.com/apache/kafka/pull/6833
 
 
   Demonstrate what stronger typing on the processor/context would look like.
   
   Not fully implemented, so not all tests will compile.
   
   Note that this change breaks PAPI backward compatibility. A 
backward-compatible version of this work is possible, but requires further 
design.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Strengthen the types of Processors, at least in the DSL, maybe in the PAPI as 
> well
> --
>
> Key: KAFKA-8410
> URL: https://issues.apache.org/jira/browse/KAFKA-8410
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: tech-debt
>
> Presently, it's very difficult to have confidence when adding to or modifying 
> processors in the DSL. There's a lot of raw types, duck-typing, and casting 
> that contribute to this problem.
> The root, though, is that the generic types on `Processor` refer only to 
> the _input_ key and value types. No information is captured or verified about 
> what the _output_ types of a processor are. For example, this leads to 
> widespread confusion in the code base about whether a processor produces `V`s 
> or `Change`s. The type system actually makes matters worse, since we use 
> casts to make the processors conform to declared types that are in fact 
> wrong, but are never checked due to erasure.
> We can start to make some headway on this tech debt by adding some types to 
> the ProcessorContext that bound the `` that may be passed to 
> `context.forward`. Then, we can build on this by fully specifying the input 
> and output types of the Processors, which in turn would let us eliminate the 
> majority of unchecked casts in the DSL operators.
> I'm not sure whether adding these generic types to the existing 
> ProcessorContext and Processor interfaces, which would also affect the PAPI 
> has any utility, or whether we should make this purely an internal change by 
> introducing GenericProcessorContext and GenericProcessor peer interfaces for 
> the DSL to use.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8443) Allow broker to select a preferred read replica for consumer

2019-05-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16850159#comment-16850159
 ] 

ASF GitHub Bot commented on KAFKA-8443:
---

mumrah commented on pull request #6832: KAFKA-8443 Broker support for fetch 
from followers
URL: https://github.com/apache/kafka/pull/6832
 
 
   TODO
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow broker to select a preferred read replica for consumer
> 
>
> Key: KAFKA-8443
> URL: https://issues.apache.org/jira/browse/KAFKA-8443
> Project: Kafka
>  Issue Type: Task
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Minor
>
> This tracks the broker-side implementation of 
> [KIP-392|https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica]
>  (fetch from follower)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8445) Flaky Test UncleanLeaderElectionTest#testUncleanLeaderElectionDisabledByTopicOverride

2019-05-28 Thread Boyang Chen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-8445:
---
Description: 
[https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/8/console]

Trace:

``` 
 *11:30:48* kafka.integration.UncleanLeaderElectionTest > 
testUncleanLeaderElectionDisabledByTopicOverride STARTED*11:31:30* 
kafka.integration.UncleanLeaderElectionTest.testUncleanLeaderElectionDisabledByTopicOverride
 failed, log available in 
/home/jenkins/jenkins-slave/workspace/kafka-pr-jdk8-scala2.11/core/build/reports/testOutput/kafka.integration.UncleanLeaderElectionTest.testUncleanLeaderElectionDisabledByTopicOverride.test.stdout*11:31:30*
 *11:31:30* kafka.integration.UncleanLeaderElectionTest > 
testUncleanLeaderElectionDisabledByTopicOverride FAILED*11:31:30* 
org.scalatest.exceptions.TestFailedException: Timing out after 3 ms since 
expected new leader 1 was not elected for partition topic4252061157831715077-0, 
leader is Some(-1)*11:31:30* at 
org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:530)*11:31:30*
 at 
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389)*11:31:30*
 at org.scalatest.Assertions$class.fail(Assertions.scala:1091)*11:31:30* at 
org.scalatest.Assertions$.fail(Assertions.scala:1389)*11:31:30* at 
kafka.utils.TestUtils$$anonfun$waitUntilLeaderIsElectedOrChanged$8.apply(TestUtils.scala:721)*11:31:30*
 at 
kafka.utils.TestUtils$$anonfun$waitUntilLeaderIsElectedOrChanged$8.apply(TestUtils.scala:711)*11:31:30*
 at scala.Option.getOrElse(Option.scala:121)*11:31:30* at 
kafka.utils.TestUtils$.waitUntilLeaderIsElectedOrChanged(TestUtils.scala:711)*11:31:30*
 at 
kafka.integration.UncleanLeaderElectionTest.verifyUncleanLeaderElectionDisabled(UncleanLeaderElectionTest.scala:258)*11:31:30*
 at 
kafka.integration.UncleanLeaderElectionTest.testUncleanLeaderElectionDisabledByTopicOverride(UncleanLeaderElectionTest.scala:153)

```

  was:
[https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/8/console]

Trace:

 
*11:30:48* kafka.integration.UncleanLeaderElectionTest > 
testUncleanLeaderElectionDisabledByTopicOverride STARTED*11:31:30* 
kafka.integration.UncleanLeaderElectionTest.testUncleanLeaderElectionDisabledByTopicOverride
 failed, log available in 
/home/jenkins/jenkins-slave/workspace/kafka-pr-jdk8-scala2.11/core/build/reports/testOutput/kafka.integration.UncleanLeaderElectionTest.testUncleanLeaderElectionDisabledByTopicOverride.test.stdout*11:31:30*
 *11:31:30* kafka.integration.UncleanLeaderElectionTest > 
testUncleanLeaderElectionDisabledByTopicOverride FAILED*11:31:30* 
org.scalatest.exceptions.TestFailedException: Timing out after 3 ms since 
expected new leader 1 was not elected for partition topic4252061157831715077-0, 
leader is Some(-1)*11:31:30* at 
org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:530)*11:31:30*
 at 
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389)*11:31:30*
 at 
org.scalatest.Assertions$class.fail(Assertions.scala:1091)*11:31:30* at 
org.scalatest.Assertions$.fail(Assertions.scala:1389)*11:31:30* at 
kafka.utils.TestUtils$$anonfun$waitUntilLeaderIsElectedOrChanged$8.apply(TestUtils.scala:721)*11:31:30*
 at 
kafka.utils.TestUtils$$anonfun$waitUntilLeaderIsElectedOrChanged$8.apply(TestUtils.scala:711)*11:31:30*
 at scala.Option.getOrElse(Option.scala:121)*11:31:30* at 
kafka.utils.TestUtils$.waitUntilLeaderIsElectedOrChanged(TestUtils.scala:711)*11:31:30*
 at 
kafka.integration.UncleanLeaderElectionTest.verifyUncleanLeaderElectionDisabled(UncleanLeaderElectionTest.scala:258)*11:31:30*
 at 
kafka.integration.UncleanLeaderElectionTest.testUncleanLeaderElectionDisabledByTopicOverride(UncleanLeaderElectionTest.scala:153)


> Flaky Test  
> UncleanLeaderElectionTest#testUncleanLeaderElectionDisabledByTopicOverride
> --
>
> Key: KAFKA-8445
> URL: https://issues.apache.org/jira/browse/KAFKA-8445
> Project: Kafka
>  Issue Type: Bug
>Reporter: Boyang Chen
>Priority: Major
>
> [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/8/console]
> Trace:
> ``` 
>  *11:30:48* kafka.integration.UncleanLeaderElectionTest > 
> testUncleanLeaderElectionDisabledByTopicOverride STARTED*11:31:30* 
> kafka.integration.UncleanLeaderElectionTest.testUncleanLeaderElectionDisabledByTopicOverride
>  failed, log available in 
> /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk8-scala2.11/core/build/reports/testOutput/kafka.integration.UncleanLeaderElectionTest.testUncleanLeaderElectionDisabledByTopicOverride.test.stdout*11:31:30*
>  *11:31:30* kafka.integration.UncleanLeaderElectionTest > 
> 

[jira] [Updated] (KAFKA-8445) Flaky Test UncleanLeaderElectionTest#testUncleanLeaderElectionDisabledByTopicOverride

2019-05-28 Thread Boyang Chen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-8445:
---
Description: 
[https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/8/console]

Trace:

*11:30:48* kafka.integration.UncleanLeaderElectionTest > 
testUncleanLeaderElectionDisabledByTopicOverride STARTED*11:31:30* 
kafka.integration.UncleanLeaderElectionTest.testUncleanLeaderElectionDisabledByTopicOverride
 failed, log available in 
/home/jenkins/jenkins-slave/workspace/kafka-pr-jdk8-scala2.11/core/build/reports/testOutput/kafka.integration.UncleanLeaderElectionTest.testUncleanLeaderElectionDisabledByTopicOverride.test.stdout*11:31:30*
 *11:31:30* kafka.integration.UncleanLeaderElectionTest > 
testUncleanLeaderElectionDisabledByTopicOverride FAILED*11:31:30* 
org.scalatest.exceptions.TestFailedException: Timing out after 3 ms since 
expected new leader 1 was not elected for partition topic4252061157831715077-0, 
leader is Some(-1)*11:31:30* at 
org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:530)*11:31:30*
 at 
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389)*11:31:30*
 at org.scalatest.Assertions$class.fail(Assertions.scala:1091)*11:31:30* at 
org.scalatest.Assertions$.fail(Assertions.scala:1389)*11:31:30* at 
kafka.utils.TestUtils$$anonfun$waitUntilLeaderIsElectedOrChanged$8.apply(TestUtils.scala:721)*11:31:30*
 at 
kafka.utils.TestUtils$$anonfun$waitUntilLeaderIsElectedOrChanged$8.apply(TestUtils.scala:711)*11:31:30*
 at scala.Option.getOrElse(Option.scala:121)*11:31:30* at 
kafka.utils.TestUtils$.waitUntilLeaderIsElectedOrChanged(TestUtils.scala:711)*11:31:30*
 at 
kafka.integration.UncleanLeaderElectionTest.verifyUncleanLeaderElectionDisabled(UncleanLeaderElectionTest.scala:258)*11:31:30*
 at 
kafka.integration.UncleanLeaderElectionTest.testUncleanLeaderElectionDisabledByTopicOverride(UncleanLeaderElectionTest.scala:153)

  was:
[https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/8/console]

Trace:

``` 
 *11:30:48* kafka.integration.UncleanLeaderElectionTest > 
testUncleanLeaderElectionDisabledByTopicOverride STARTED*11:31:30* 
kafka.integration.UncleanLeaderElectionTest.testUncleanLeaderElectionDisabledByTopicOverride
 failed, log available in 
/home/jenkins/jenkins-slave/workspace/kafka-pr-jdk8-scala2.11/core/build/reports/testOutput/kafka.integration.UncleanLeaderElectionTest.testUncleanLeaderElectionDisabledByTopicOverride.test.stdout*11:31:30*
 *11:31:30* kafka.integration.UncleanLeaderElectionTest > 
testUncleanLeaderElectionDisabledByTopicOverride FAILED*11:31:30* 
org.scalatest.exceptions.TestFailedException: Timing out after 3 ms since 
expected new leader 1 was not elected for partition topic4252061157831715077-0, 
leader is Some(-1)*11:31:30* at 
org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:530)*11:31:30*
 at 
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389)*11:31:30*
 at org.scalatest.Assertions$class.fail(Assertions.scala:1091)*11:31:30* at 
org.scalatest.Assertions$.fail(Assertions.scala:1389)*11:31:30* at 
kafka.utils.TestUtils$$anonfun$waitUntilLeaderIsElectedOrChanged$8.apply(TestUtils.scala:721)*11:31:30*
 at 
kafka.utils.TestUtils$$anonfun$waitUntilLeaderIsElectedOrChanged$8.apply(TestUtils.scala:711)*11:31:30*
 at scala.Option.getOrElse(Option.scala:121)*11:31:30* at 
kafka.utils.TestUtils$.waitUntilLeaderIsElectedOrChanged(TestUtils.scala:711)*11:31:30*
 at 
kafka.integration.UncleanLeaderElectionTest.verifyUncleanLeaderElectionDisabled(UncleanLeaderElectionTest.scala:258)*11:31:30*
 at 
kafka.integration.UncleanLeaderElectionTest.testUncleanLeaderElectionDisabledByTopicOverride(UncleanLeaderElectionTest.scala:153)

```


> Flaky Test  
> UncleanLeaderElectionTest#testUncleanLeaderElectionDisabledByTopicOverride
> --
>
> Key: KAFKA-8445
> URL: https://issues.apache.org/jira/browse/KAFKA-8445
> Project: Kafka
>  Issue Type: Bug
>Reporter: Boyang Chen
>Priority: Major
>
> [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/8/console]
> Trace:
> *11:30:48* kafka.integration.UncleanLeaderElectionTest > 
> testUncleanLeaderElectionDisabledByTopicOverride STARTED*11:31:30* 
> kafka.integration.UncleanLeaderElectionTest.testUncleanLeaderElectionDisabledByTopicOverride
>  failed, log available in 
> /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk8-scala2.11/core/build/reports/testOutput/kafka.integration.UncleanLeaderElectionTest.testUncleanLeaderElectionDisabledByTopicOverride.test.stdout*11:31:30*
>  *11:31:30* kafka.integration.UncleanLeaderElectionTest > 
> testUncleanLeaderElectionDisabledByTopicOverride FAILED*11:31:30* 
> org.scalatest.exceptions.TestFailedException: Timing out after 

[jira] [Created] (KAFKA-8445) Flaky Test UncleanLeaderElectionTest#testUncleanLeaderElectionDisabledByTopicOverride

2019-05-28 Thread Boyang Chen (JIRA)
Boyang Chen created KAFKA-8445:
--

 Summary: Flaky Test  
UncleanLeaderElectionTest#testUncleanLeaderElectionDisabledByTopicOverride
 Key: KAFKA-8445
 URL: https://issues.apache.org/jira/browse/KAFKA-8445
 Project: Kafka
  Issue Type: Bug
Reporter: Boyang Chen


[https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/8/console]

Trace:

 
*11:30:48* kafka.integration.UncleanLeaderElectionTest > 
testUncleanLeaderElectionDisabledByTopicOverride STARTED*11:31:30* 
kafka.integration.UncleanLeaderElectionTest.testUncleanLeaderElectionDisabledByTopicOverride
 failed, log available in 
/home/jenkins/jenkins-slave/workspace/kafka-pr-jdk8-scala2.11/core/build/reports/testOutput/kafka.integration.UncleanLeaderElectionTest.testUncleanLeaderElectionDisabledByTopicOverride.test.stdout*11:31:30*
 *11:31:30* kafka.integration.UncleanLeaderElectionTest > 
testUncleanLeaderElectionDisabledByTopicOverride FAILED*11:31:30* 
org.scalatest.exceptions.TestFailedException: Timing out after 3 ms since 
expected new leader 1 was not elected for partition topic4252061157831715077-0, 
leader is Some(-1)*11:31:30* at 
org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:530)*11:31:30*
 at 
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389)*11:31:30*
 at 
org.scalatest.Assertions$class.fail(Assertions.scala:1091)*11:31:30* at 
org.scalatest.Assertions$.fail(Assertions.scala:1389)*11:31:30* at 
kafka.utils.TestUtils$$anonfun$waitUntilLeaderIsElectedOrChanged$8.apply(TestUtils.scala:721)*11:31:30*
 at 
kafka.utils.TestUtils$$anonfun$waitUntilLeaderIsElectedOrChanged$8.apply(TestUtils.scala:711)*11:31:30*
 at scala.Option.getOrElse(Option.scala:121)*11:31:30* at 
kafka.utils.TestUtils$.waitUntilLeaderIsElectedOrChanged(TestUtils.scala:711)*11:31:30*
 at 
kafka.integration.UncleanLeaderElectionTest.verifyUncleanLeaderElectionDisabled(UncleanLeaderElectionTest.scala:258)*11:31:30*
 at 
kafka.integration.UncleanLeaderElectionTest.testUncleanLeaderElectionDisabledByTopicOverride(UncleanLeaderElectionTest.scala:153)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8444) If broker sense that it is unhealthy, broker should remove itself as leader from all partitions that it is hosting as partition leader.

2019-05-28 Thread bijaya (JIRA)
bijaya created KAFKA-8444:
-

 Summary: If broker sense that it is unhealthy, broker should 
remove itself as leader from all partitions that it is hosting as partition 
leader.
 Key: KAFKA-8444
 URL: https://issues.apache.org/jira/browse/KAFKA-8444
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 2.1.1
Reporter: bijaya


A broker can have partition leader for more than one partition.

If broker sense any panic with the host like disk write, memory (crashing), 
network. 

It should send eject me ( like ejecting from fighter jet) from cluster message 
to cluster controller/zookeeper. Which means its not a part of cluster anymore.

This makes the next leader election quicker and can prevent many issues before 
it happens.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8443) Allow broker to select a preferred read replica for consumer

2019-05-28 Thread David Arthur (JIRA)
David Arthur created KAFKA-8443:
---

 Summary: Allow broker to select a preferred read replica for 
consumer
 Key: KAFKA-8443
 URL: https://issues.apache.org/jira/browse/KAFKA-8443
 Project: Kafka
  Issue Type: Task
Reporter: David Arthur
Assignee: David Arthur


This tracks the broker-side implementation of 
[KIP-392|https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica]
 (fetch from follower)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7206) Enable batching in FindCoordinator

2019-05-28 Thread Yishun Guan (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16850051#comment-16850051
 ] 

Yishun Guan commented on KAFKA-7206:


[~sagarrao] yes, that's basically the reason.

> Enable batching in FindCoordinator
> --
>
> Key: KAFKA-7206
> URL: https://issues.apache.org/jira/browse/KAFKA-7206
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Reporter: Yishun Guan
>Assignee: Yishun Guan
>Priority: Critical
>  Labels: needs-discussion, needs-kip, newbie++
>
> To quote [~guozhang] :
> "The proposal is that, we extend FindCoordinatorRequest to have multiple 
> consumer ids: today each FindCoordinatorRequest only contains a single 
> consumer id, so in our scenario we need to send N request for N consumer 
> groups still. If we can request for coordinators in a single request, then 
> the workflow could be simplified to:
>  # send a single FindCoordinatorRequest to a broker asking for coordinators 
> of all consumer groups.
>  1.a) note that the response may still succeed in finding some coordinators 
> while error on others, and we need to handle them on that granularity (see 
> below).
>  # and then for the collected coordinator, group them by coordinator id and 
> send one request per coordinator destination.
> Note that this change would require the version to be bumped up, to 
> FIND_COORDINATOR_REQUEST_V3 for such protocol changes, also the RESPONSE 
> version should be bumped up in order to include multiple coordinators."
> A KIP is needed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8442) Inconsistent ISR output in topic command when using --bootstrap-server

2019-05-28 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-8442:
---
Description: 
If there is no leader for a partition, the Metadata API returns an empty ISR. 
When using the `--bootstrap-server` option with `kafka-topics.sh`, this leads 
to the following output:
{code}
Topic:foo   PartitionCount:1ReplicationFactor:2 
Configs:segment.bytes=1073741824
Topic: foo  Partition: 0Leader: noneReplicas: 1,3   Isr: 
{code}
When using `--zookeeper`, we display the current ISR correctly:
{code}
Topic:foo   PartitionCount:1ReplicationFactor:2 Configs:
Topic: foo  Partition: 0Leader: -1  Replicas: 1,3   Isr: 1
{code}
To avoid confusion, we should make this output consistent or at least not 
misleading. We should either change the Metadata API to print the ISR when we 
have it or we can change the output of the topic command to `N/A` or something 
like that.

  was:
If there is no leader for a partition, the Metadata API returns an empty set of 
replicas and an empty ISR. When using the `--bootstrap-server` option with 
`kafka-topics.sh`, this leads to the following output:
{code}
Topic:foo   PartitionCount:1ReplicationFactor:2 
Configs:segment.bytes=1073741824
Topic: foo  Partition: 0Leader: noneReplicas: 1,3   Isr: 
{code}
When using `--zookeeper`, we display the current ISR correctly:
{code}
Topic:foo   PartitionCount:1ReplicationFactor:2 Configs:
Topic: foo  Partition: 0Leader: -1  Replicas: 1,3   Isr: 1
{code}
To avoid confusion, we should make this output consistent or at least not 
misleading. We should either change the Metadata API to print the ISR when we 
have it or we can change the output of the topic command to `N/A` or something 
like that.


> Inconsistent ISR output in topic command when using --bootstrap-server
> --
>
> Key: KAFKA-8442
> URL: https://issues.apache.org/jira/browse/KAFKA-8442
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Major
>
> If there is no leader for a partition, the Metadata API returns an empty ISR. 
> When using the `--bootstrap-server` option with `kafka-topics.sh`, this leads 
> to the following output:
> {code}
> Topic:foo   PartitionCount:1ReplicationFactor:2 
> Configs:segment.bytes=1073741824
> Topic: foo  Partition: 0Leader: noneReplicas: 1,3   Isr: 
> {code}
> When using `--zookeeper`, we display the current ISR correctly:
> {code}
> Topic:foo   PartitionCount:1ReplicationFactor:2 Configs:
> Topic: foo  Partition: 0Leader: -1  Replicas: 1,3   Isr: 1
> {code}
> To avoid confusion, we should make this output consistent or at least not 
> misleading. We should either change the Metadata API to print the ISR when we 
> have it or we can change the output of the topic command to `N/A` or 
> something like that.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8442) Inconsistent ISR output in topic command when using --bootstrap-server

2019-05-28 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-8442:
--

 Summary: Inconsistent ISR output in topic command when using 
--bootstrap-server
 Key: KAFKA-8442
 URL: https://issues.apache.org/jira/browse/KAFKA-8442
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson


If there is no leader for a partition, the Metadata API returns an empty set of 
replicas and an empty ISR. When using the `--bootstrap-server` option with 
`kafka-topics.sh`, this leads to the following output:
{code}
Topic:foo   PartitionCount:1ReplicationFactor:2 
Configs:segment.bytes=1073741824
Topic: foo  Partition: 0Leader: noneReplicas: 1,3   Isr: 
{code}
When using `--zookeeper`, we display the current ISR correctly:
{code}
Topic:foo   PartitionCount:1ReplicationFactor:2 Configs:
Topic: foo  Partition: 0Leader: -1  Replicas: 1,3   Isr: 1
{code}
To avoid confusion, we should make this output consistent or at least not 
misleading. We should either change the Metadata API to print the ISR when we 
have it or we can change the output of the topic command to `N/A` or something 
like that.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8425) KIP 421 Bug: Modifying Immutable Originals Map results in Java exception

2019-05-28 Thread TEJAL ADSUL (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8425?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TEJAL ADSUL updated KAFKA-8425:
---
Priority: Major  (was: Minor)

> KIP 421 Bug: Modifying Immutable Originals Map results in Java exception 
> -
>
> Key: KAFKA-8425
> URL: https://issues.apache.org/jira/browse/KAFKA-8425
> Project: Kafka
>  Issue Type: Bug
>  Components: config
>Affects Versions: 2.3.0
>Reporter: TEJAL ADSUL
>Priority: Major
> Fix For: 2.3.0
>
>
> The originals map passed to the AbstractConfig class can be immutable. The 
> resolveConfigVariable function was modifying the originals map. If this map 
> is immutable it will result in an exception.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8286) KIP-460 Admin Leader Election RPC

2019-05-28 Thread Jose Armando Garcia Sancio (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8286?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio updated KAFKA-8286:
--
Description: 
Tracking issue for implementing KIP-460. Tasks:
 # [Done] Design KIP
 # [Done] Review KIP
 # [Done] Approve KIP
 # [Done] Update RPC to support KIP
 # [Done] Update controller to support KIP
 # [Done] Create CLI command (kafka-leader-election) that implement KIP
 # [Done] Search and replace any usage of “preferred” in the code
 # [Done] Add test for controller functionality
 # [Done] Revisit all of the documentation - generate and audit the new javadocs
 # [Done] Deprecated since... needs to be update
 # [Done] Review PR
 # Merge PR
 # Update the KIP based on the latest implementation
 # Add test for command

  was:
Tracking issue for implementing KIP-460. Tasks:
 # [Done] Design KIP
 # [Done] Review KIP
 # [Done] Approve KIP
 # [Done] Update RPC to support KIP
 # [Done] Update controller to support KIP
 # [Done] Create CLI command (kafka-leader-election) that implement KIP
 # [Done] Search and replace any usage of “preferred” in the code
 # Add test for command
 # [Done] Add test for controller functionality
 # [Done] Revisit all of the documentation - generate and audit the new javadocs
 # [Done] Deprecated since... needs to be update
 # Review PR
 # Merge PR
 # Update the KIP based on the latest implementation


> KIP-460 Admin Leader Election RPC
> -
>
> Key: KAFKA-8286
> URL: https://issues.apache.org/jira/browse/KAFKA-8286
> Project: Kafka
>  Issue Type: New Feature
>  Components: admin, clients, core
>Reporter: Jose Armando Garcia Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>  Labels: admin, client, controller
>
> Tracking issue for implementing KIP-460. Tasks:
>  # [Done] Design KIP
>  # [Done] Review KIP
>  # [Done] Approve KIP
>  # [Done] Update RPC to support KIP
>  # [Done] Update controller to support KIP
>  # [Done] Create CLI command (kafka-leader-election) that implement KIP
>  # [Done] Search and replace any usage of “preferred” in the code
>  # [Done] Add test for controller functionality
>  # [Done] Revisit all of the documentation - generate and audit the new 
> javadocs
>  # [Done] Deprecated since... needs to be update
>  # [Done] Review PR
>  # Merge PR
>  # Update the KIP based on the latest implementation
>  # Add test for command



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8433) Give the opportunity to use serializers and deserializers with IntegrationTestUtils

2019-05-28 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8433?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16849970#comment-16849970
 ] 

Guozhang Wang commented on KAFKA-8433:
--

I'd agree with [~mjsax] about recommending to use the official test-utils 
package.

About `IntegrationTestUtils` itself, which is used for internal unit test, I'd 
suggest we do not make it more "feature-rich" until it is required by some 
newly added test cases.

> Give the opportunity to use serializers and deserializers with 
> IntegrationTestUtils
> ---
>
> Key: KAFKA-8433
> URL: https://issues.apache.org/jira/browse/KAFKA-8433
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Anthony C
>Priority: Minor
>
> Currently, each static method using a producer or a consumer don't allow to 
> pass serializers or deserializers as arguments.
> Because of that we are not able to mock schema registry (for example), or 
> other producer / consumer specific attributs.
> To resolve that we just need to add methods using serializers or 
> deserializers as arguments.
> Kafka producer and consumer constructors already accept null serializers or 
> deserializers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-8430) Unit test to make sure `group.id` and `group.instance.id` won't affect each other

2019-05-28 Thread Boyang Chen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen reassigned KAFKA-8430:
--

Assignee: Boyang Chen

> Unit test to make sure `group.id` and `group.instance.id` won't affect each 
> other
> -
>
> Key: KAFKA-8430
> URL: https://issues.apache.org/jira/browse/KAFKA-8430
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7991) Add StreamsUpgradeTest for 2.2 release

2019-05-28 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-7991.
-
Resolution: Duplicate

Yep. Closing as a duplicate.

> Add StreamsUpgradeTest for 2.2 release
> --
>
> Key: KAFKA-7991
> URL: https://issues.apache.org/jira/browse/KAFKA-7991
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Affects Versions: 2.3.0
>Reporter: Matthias J. Sax
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.3.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8078) Flaky Test TableTableJoinIntegrationTest#testInnerInner

2019-05-28 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8078?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-8078:

Priority: Major  (was: Critical)

> Flaky Test TableTableJoinIntegrationTest#testInnerInner
> ---
>
> Key: KAFKA-8078
> URL: https://issues.apache.org/jira/browse/KAFKA-8078
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.3.0
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: flaky-test
> Fix For: 2.3.0
>
>
> [https://builds.apache.org/blue/organizations/jenkins/kafka-trunk-jdk8/detail/kafka-trunk-jdk8/3445/tests]
> {quote}java.lang.AssertionError: Condition not met within timeout 15000. 
> Never received expected final result.
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:365)
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:325)
> at 
> org.apache.kafka.streams.integration.AbstractJoinIntegrationTest.runTest(AbstractJoinIntegrationTest.java:246)
> at 
> org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testInnerInner(TableTableJoinIntegrationTest.java:196){quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8263) Flaky Test MetricsIntegrationTest#testStreamMetricOfWindowStore

2019-05-28 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8263?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-8263:

Priority: Major  (was: Critical)

> Flaky Test MetricsIntegrationTest#testStreamMetricOfWindowStore
> ---
>
> Key: KAFKA-8263
> URL: https://issues.apache.org/jira/browse/KAFKA-8263
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.3.0
>Reporter: Matthias J. Sax
>Assignee: Bruno Cadonna
>Priority: Major
>  Labels: flaky-test
> Fix For: 2.3.0
>
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/3900/testReport/junit/org.apache.kafka.streams.integration/MetricsIntegrationTest/testStreamMetricOfWindowStore/]
> {quote}java.lang.AssertionError: Condition not met within timeout 1. 
> testStoreMetricWindow -> Size of metrics of type:'put-latency-avg' must be 
> equal to:2 but it's equal to 0 expected:<2> but was:<0> at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:361){quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8122) Flaky Test EosIntegrationTest#shouldNotViolateEosIfOneTaskFailsWithState

2019-05-28 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-8122:

Priority: Major  (was: Critical)

> Flaky Test EosIntegrationTest#shouldNotViolateEosIfOneTaskFailsWithState
> 
>
> Key: KAFKA-8122
> URL: https://issues.apache.org/jira/browse/KAFKA-8122
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.3.0
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Major
>  Labels: flaky-test
> Fix For: 2.3.0
>
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/3285/testReport/junit/org.apache.kafka.streams.integration/EosIntegrationTest/shouldNotViolateEosIfOneTaskFailsWithState/]
> {quote}java.lang.AssertionError: Expected: <[KeyValue(0, 0), KeyValue(0, 1), 
> KeyValue(0, 3), KeyValue(0, 6), KeyValue(0, 10), KeyValue(0, 15), KeyValue(0, 
> 21), KeyValue(0, 28), KeyValue(0, 36), KeyValue(0, 45)]> but: was 
> <[KeyValue(0, 0), KeyValue(0, 1), KeyValue(0, 3), KeyValue(0, 6), KeyValue(0, 
> 10), KeyValue(0, 15), KeyValue(0, 21), KeyValue(0, 28), KeyValue(0, 36), 
> KeyValue(0, 45), KeyValue(0, 55), KeyValue(0, 66), KeyValue(0, 78), 
> KeyValue(0, 91), KeyValue(0, 105)]> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18) at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at 
> org.apache.kafka.streams.integration.EosIntegrationTest.checkResultPerKey(EosIntegrationTest.java:212)
>  at 
> org.apache.kafka.streams.integration.EosIntegrationTest.shouldNotViolateEosIfOneTaskFailsWithState(EosIntegrationTest.java:414){quote}
> STDOUT
> {quote}[2019-03-17 01:19:51,971] INFO Created server with tickTime 800 
> minSessionTimeout 1600 maxSessionTimeout 16000 datadir 
> /tmp/kafka-10997967593034298484/version-2 snapdir 
> /tmp/kafka-5184295822696533708/version-2 
> (org.apache.zookeeper.server.ZooKeeperServer:174) [2019-03-17 01:19:51,971] 
> INFO binding to port /127.0.0.1:0 
> (org.apache.zookeeper.server.NIOServerCnxnFactory:89) [2019-03-17 
> 01:19:51,973] INFO KafkaConfig values: advertised.host.name = null 
> advertised.listeners = null advertised.port = null 
> alter.config.policy.class.name = null 
> alter.log.dirs.replication.quota.window.num = 11 
> alter.log.dirs.replication.quota.window.size.seconds = 1 
> authorizer.class.name = auto.create.topics.enable = false 
> auto.leader.rebalance.enable = true background.threads = 10 broker.id = 0 
> broker.id.generation.enable = true broker.rack = null 
> client.quota.callback.class = null compression.type = producer 
> connection.failed.authentication.delay.ms = 100 connections.max.idle.ms = 
> 60 connections.max.reauth.ms = 0 control.plane.listener.name = null 
> controlled.shutdown.enable = true controlled.shutdown.max.retries = 3 
> controlled.shutdown.retry.backoff.ms = 5000 controller.socket.timeout.ms = 
> 3 create.topic.policy.class.name = null default.replication.factor = 1 
> delegation.token.expiry.check.interval.ms = 360 
> delegation.token.expiry.time.ms = 8640 delegation.token.master.key = null 
> delegation.token.max.lifetime.ms = 60480 
> delete.records.purgatory.purge.interval.requests = 1 delete.topic.enable = 
> true fetch.purgatory.purge.interval.requests = 1000 
> group.initial.rebalance.delay.ms = 0 group.max.session.timeout.ms = 30 
> group.max.size = 2147483647 group.min.session.timeout.ms = 0 host.name = 
> localhost inter.broker.listener.name = null inter.broker.protocol.version = 
> 2.2-IV1 kafka.metrics.polling.interval.secs = 10 kafka.metrics.reporters = [] 
> leader.imbalance.check.interval.seconds = 300 
> leader.imbalance.per.broker.percentage = 10 listener.security.protocol.map = 
> PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL 
> listeners = null log.cleaner.backoff.ms = 15000 
> log.cleaner.dedupe.buffer.size = 2097152 log.cleaner.delete.retention.ms = 
> 8640 log.cleaner.enable = true log.cleaner.io.buffer.load.factor = 0.9 
> log.cleaner.io.buffer.size = 524288 log.cleaner.io.max.bytes.per.second = 
> 1.7976931348623157E308 log.cleaner.min.cleanable.ratio = 0.5 
> log.cleaner.min.compaction.lag.ms = 0 log.cleaner.threads = 1 
> log.cleanup.policy = [delete] log.dir = 
> /tmp/junit16020146621422955757/junit17406374597406011269 log.dirs = null 
> log.flush.interval.messages = 9223372036854775807 log.flush.interval.ms = 
> null log.flush.offset.checkpoint.interval.ms = 6 
> log.flush.scheduler.interval.ms = 9223372036854775807 
> log.flush.start.offset.checkpoint.interval.ms = 6 
> log.index.interval.bytes = 4096 log.index.size.max.bytes = 10485760 
> log.message.downconversion.enable = true log.message.format.version = 2.2-IV1 
> 

[jira] [Updated] (KAFKA-8262) Flaky Test MetricsIntegrationTest#testStreamMetric

2019-05-28 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-8262:

Priority: Major  (was: Critical)

> Flaky Test MetricsIntegrationTest#testStreamMetric
> --
>
> Key: KAFKA-8262
> URL: https://issues.apache.org/jira/browse/KAFKA-8262
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.3.0
>Reporter: Matthias J. Sax
>Assignee: Bruno Cadonna
>Priority: Major
>  Labels: flaky-test
> Fix For: 2.3.0
>
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/3900/testReport/junit/org.apache.kafka.streams.integration/MetricsIntegrationTest/testStreamMetric/]
> {quote}java.lang.AssertionError: Condition not met within timeout 1. 
> testTaskMetric -> Size of metrics of type:'commit-latency-avg' must be equal 
> to:5 but it's equal to 0 expected:<5> but was:<0> at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:361) at 
> org.apache.kafka.streams.integration.MetricsIntegrationTest.testStreamMetric(MetricsIntegrationTest.java:228){quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8441) Flaky Test RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenCreated

2019-05-28 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-8441:

Priority: Major  (was: Critical)

> Flaky Test RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenCreated
> 
>
> Key: KAFKA-8441
> URL: https://issues.apache.org/jira/browse/KAFKA-8441
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.3.0
>Reporter: Bruno Cadonna
>Priority: Major
>  Labels: flaky-test
>
> h1. Stacktrace:
> {noformat}
> java.lang.AssertionError: Condition not met within timeout 3. Topics not 
> deleted after 3 milli seconds.
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:375)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:352)
>   at 
> org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteTopicsAndWait(EmbeddedKafkaCluster.java:265)
>   at 
> org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteAndRecreateTopics(EmbeddedKafkaCluster.java:288)
>   at 
> org.apache.kafka.streams.integration.RegexSourceIntegrationTest.setUp(RegexSourceIntegrationTest.java:118)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.RunBefores.invokeMethod(RunBefores.java:33)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:412)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.GeneratedMethodAccessor14.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>   at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
>   at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>   at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
>   

[jira] [Updated] (KAFKA-8193) Flaky Test MetricsIntegrationTest#testStreamMetricOfWindowStore

2019-05-28 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8193?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-8193:

Priority: Major  (was: Critical)

> Flaky Test MetricsIntegrationTest#testStreamMetricOfWindowStore
> ---
>
> Key: KAFKA-8193
> URL: https://issues.apache.org/jira/browse/KAFKA-8193
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.3.0
>Reporter: Konstantine Karantasis
>Assignee: Guozhang Wang
>Priority: Major
>  Labels: flaky-test
> Fix For: 2.3.0
>
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/3576/console]
>  *14:14:48* org.apache.kafka.streams.integration.MetricsIntegrationTest > 
> testStreamMetricOfWindowStore STARTED
> *14:14:59* 
> org.apache.kafka.streams.integration.MetricsIntegrationTest.testStreamMetricOfWindowStore
>  failed, log available in 
> /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.12/streams/build/reports/testOutput/org.apache.kafka.streams.integration.MetricsIntegrationTest.testStreamMetricOfWindowStore.test.stdout
> *14:14:59* 
> *14:14:59* org.apache.kafka.streams.integration.MetricsIntegrationTest > 
> testStreamMetricOfWindowStore FAILED
> *14:14:59* java.lang.AssertionError: Condition not met within timeout 1. 
> testStoreMetricWindow -> Size of metrics of type:'put-latency-avg' must be 
> equal to:2 but it's equal to 0 expected:<2> but was:<0>
> *14:14:59* at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:361)
> *14:14:59* at 
> org.apache.kafka.streams.integration.MetricsIntegrationTest.testStreamMetricOfWindowStore(MetricsIntegrationTest.java:260)
> *14:15:01*



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8440) Flaky Test KStreamAggregationIntegrationTest#shouldReduceSessionWindows

2019-05-28 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-8440:

Priority: Major  (was: Critical)

> Flaky Test KStreamAggregationIntegrationTest#shouldReduceSessionWindows
> ---
>
> Key: KAFKA-8440
> URL: https://issues.apache.org/jira/browse/KAFKA-8440
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.3.0
>Reporter: Bruno Cadonna
>Priority: Major
>  Labels: flaky-test
>
> h1. Stacktrace:
> {noformat}
> java.lang.AssertionError: 
> Expected: 
>  but: was null
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18)
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
>   at 
> org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.shouldReduceSessionWindows(KStreamAggregationIntegrationTest.java:663)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:412)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>   at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
>   at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>   at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
>   at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> 

[jira] [Commented] (KAFKA-8430) Unit test to make sure `group.id` and `group.instance.id` won't affect each other

2019-05-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16849927#comment-16849927
 ] 

ASF GitHub Bot commented on KAFKA-8430:
---

abbccdda commented on pull request #6830: KAFKA-8430: unit test to make sure 
null `group.id` and valid `group.instance.id` are valid combo
URL: https://github.com/apache/kafka/pull/6830
 
 
   as title suggests, this unit test is just a double check. No need to push in 
2.3
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Unit test to make sure `group.id` and `group.instance.id` won't affect each 
> other
> -
>
> Key: KAFKA-8430
> URL: https://issues.apache.org/jira/browse/KAFKA-8430
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8430) Unit test to make sure `group.id` and `group.instance.id` won't affect each other

2019-05-28 Thread Boyang Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16849923#comment-16849923
 ] 

Boyang Chen commented on KAFKA-8430:


After checking the code in `KafkaConsumer`, I found out that the logic for null 
`group.id` is like
```// no coordinator will be constructed for the default (null) group id
 this.coordinator = groupId == null ? null :(
 new ConsumerCoordinator(logContext,```
So if `ConsumerCoordinator` is not constructed, `group.instance.id` is no use 
at all.

> Unit test to make sure `group.id` and `group.instance.id` won't affect each 
> other
> -
>
> Key: KAFKA-8430
> URL: https://issues.apache.org/jira/browse/KAFKA-8430
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7321) ensure timely processing of deletion requests in Kafka topic (Time-based log compaction)

2019-05-28 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-7321.

   Resolution: Fixed
Fix Version/s: 2.3.0

This was merged some time ago and will be in 2.3.0.

> ensure timely processing of deletion requests in Kafka topic (Time-based log 
> compaction)
> 
>
> Key: KAFKA-7321
> URL: https://issues.apache.org/jira/browse/KAFKA-7321
> Project: Kafka
>  Issue Type: Improvement
>  Components: log
>Reporter: xiongqi wu
>Assignee: xiongqi wu
>Priority: Major
> Fix For: 2.3.0
>
>
> _Compaction enables Kafka to remove old messages that are flagged for 
> deletion while other messages can be retained for a relatively longer time.  
> Today, a log segment may remain un-compacted for a long time since the 
> eligibility for log compaction is determined based on compaction ratio 
> (“min.cleanable.dirty.ratio”) and min compaction lag 
> ("min.compaction.lag.ms") setting.  Ability to delete a log message through 
> compaction in a timely manner has become an important requirement in some use 
> cases (e.g., GDPR).  For example,  one use case is to delete PII (Personal 
> Identifiable information) data within 7 days while keeping non-PII 
> indefinitely in compacted format.  The goal of this change is to provide a 
> time-based compaction policy that ensures the cleanable section is compacted 
> after the specified time interval regardless of dirty ratio and “min 
> compaction lag”.  However, dirty ratio and “min compaction lag” are still 
> honored if the time based compaction rule is not violated. In other words, if 
> Kafka receives a deletion request on a key (e..g, a key with null value), the 
> corresponding log segment will be picked up for compaction after the 
> configured time interval to remove the key._
>  
> _This is to track effort in KIP 354:_
> _https://cwiki.apache.org/confluence/display/KAFKA/KIP-354%3A+Time-based+log+compaction+policy_



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8187) State store record loss across multiple reassignments when using standby tasks

2019-05-28 Thread Bill Bejeck (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16849915#comment-16849915
 ] 

Bill Bejeck edited comment on KAFKA-8187 at 5/28/19 4:42 PM:
-

Hi [~hustclf]

I've assigned the ticket to you.  Just for future reference, you'll want to 
check the "Assignee" field on the upper left-hand side and if the field is 
populated leave a comment asking the current user if they are still working on 
this ticket as you'd like to pick it up.

 

I'll also add to you to the contributors list in Jira so you can self-assign 
tickets in the future as well.
Ignore my previous statement, you are already listed there!

 

Thanks.


was (Author: bbejeck):
Hi [~hustclf]

I've assigned the ticket to you.  Just for future reference, you'll want to 
check the "Assignee" field on the upper left-hand side and if the field is 
populated leave a comment asking the current user if they are still working on 
this ticket as you'd like to pick it up.

 

I'll also add to you to the contributors list in Jira so you can self-assign 
tickets in the future as well.

 

Thanks.

> State store record loss across multiple reassignments when using standby tasks
> --
>
> Key: KAFKA-8187
> URL: https://issues.apache.org/jira/browse/KAFKA-8187
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.1
>Reporter: William Greer
>Assignee: Lifei Chen
>Priority: Major
>
> Overview:
> There is a race condition that can cause a partitioned state store to be 
> missing records up to an offset when using standby tasks.
> When a reassignment occurs and a task is migrated to a StandbyTask in another 
> StreamThread/TaskManager on the same JVM, there can be lock contention that 
> prevents the StandbyTask on the currently assigned StreamThread from 
> acquiring the lock and to not retry acquiring the lock because all of the 
> active StreamTasks are running for that StreamThread. If the StandbyTask does 
> not acquire the lock before the StreamThread enters into the RUNNING state, 
> then the StandbyTask will not consume any records. If there is no subsequent 
> reassignment before the second execution of the stateDirCleaner Thread, then 
> the task directory for the StandbyTask will be deleted. When the next 
> reassignment occurs the offset that was read by the StandbyTask at creation 
> time before acquiring the lock will be written back to the state store 
> directory, this re-creates the state store directory.
> An example:
> StreamThread(A) and StreamThread(B) are running on the same JVM in the same 
> streams application.
> StreamThread(A) has StandbyTask 1_0
> StreamThread(B) has no tasks
> A reassignment is triggered by another host in the streams application fleet.
> StreamThread(A) is notified with a PARTITIONS_REVOKED event of the threads 
> one task
> StreamThread(B) is notified with a PARTITIONS_ASSIGNED event of a standby 
> task for 1_0
> Here begins the race condition.
> StreamThread(B) creates the StandbyTask which reads the current checkpoint 
> from disk.
> StreamThread(B) then attempts to updateNewAndRestoringTasks() for it's 
> assigned tasks. [0]
> StreamThread(B) initializes the new tasks for the active and standby tasks. 
> [1] [2]
> StreamThread(B) attempts to lock the state directory for task 1_0 but fails 
> with a LockException [3], since StreamThread(A) still holds the lock.
> StreamThread(B) returns true from updateNewAndRestoringTasks() due to the 
> check at [4] which only checks that the active assigned tasks are running.
> StreamThread(B) state is set to RUNNING
> StreamThread(A) closes the previous StandbyTask specifically calling 
> closeStateManager() [5]
> StreamThread(A) state is set to RUNNING
> Streams application for this host has completed re-balancing and is now in 
> the RUNNING state.
> State at this point is the following: State directory exists for 1_0 and all 
> data is present.
> Then at a period that is 1 to 2 intervals of [6](which is default of 10 
> minutes) after the reassignment had completed the stateDirCleaner thread will 
> execute [7].
> The stateDirCleaner will then do [8], which finds the directory 1_0, finds 
> that there isn't an active lock for that directory, acquire the lock, and 
> deletes the directory.
> State at this point is the following: State directory does not exist for 1_0.
> When the next reassignment occurs. The offset that was read by 
> StreamThread(B) during construction of the StandbyTask for 1_0 will be 
> written back to disk. This write re-creates the state store directory and 
> writes the .checkpoint file with the old offset.
> State at this point is the following: State directory exists for 1_0 with a 
> '.checkpoint' file in it, but 

[jira] [Commented] (KAFKA-8187) State store record loss across multiple reassignments when using standby tasks

2019-05-28 Thread Bill Bejeck (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16849915#comment-16849915
 ] 

Bill Bejeck commented on KAFKA-8187:


Hi [~hustclf]

I've assigned the ticket to you.  Just for future reference, you'll want to 
check the "Assignee" field on the upper left-hand side and if the field is 
populated leave a comment asking the current user if they are still working on 
this ticket as you'd like to pick it up.

 

I'll also add to you to the contributors list in Jira so you can self-assign 
tickets in the future as well.

 

Thanks.

> State store record loss across multiple reassignments when using standby tasks
> --
>
> Key: KAFKA-8187
> URL: https://issues.apache.org/jira/browse/KAFKA-8187
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.1
>Reporter: William Greer
>Assignee: Lifei Chen
>Priority: Major
>
> Overview:
> There is a race condition that can cause a partitioned state store to be 
> missing records up to an offset when using standby tasks.
> When a reassignment occurs and a task is migrated to a StandbyTask in another 
> StreamThread/TaskManager on the same JVM, there can be lock contention that 
> prevents the StandbyTask on the currently assigned StreamThread from 
> acquiring the lock and to not retry acquiring the lock because all of the 
> active StreamTasks are running for that StreamThread. If the StandbyTask does 
> not acquire the lock before the StreamThread enters into the RUNNING state, 
> then the StandbyTask will not consume any records. If there is no subsequent 
> reassignment before the second execution of the stateDirCleaner Thread, then 
> the task directory for the StandbyTask will be deleted. When the next 
> reassignment occurs the offset that was read by the StandbyTask at creation 
> time before acquiring the lock will be written back to the state store 
> directory, this re-creates the state store directory.
> An example:
> StreamThread(A) and StreamThread(B) are running on the same JVM in the same 
> streams application.
> StreamThread(A) has StandbyTask 1_0
> StreamThread(B) has no tasks
> A reassignment is triggered by another host in the streams application fleet.
> StreamThread(A) is notified with a PARTITIONS_REVOKED event of the threads 
> one task
> StreamThread(B) is notified with a PARTITIONS_ASSIGNED event of a standby 
> task for 1_0
> Here begins the race condition.
> StreamThread(B) creates the StandbyTask which reads the current checkpoint 
> from disk.
> StreamThread(B) then attempts to updateNewAndRestoringTasks() for it's 
> assigned tasks. [0]
> StreamThread(B) initializes the new tasks for the active and standby tasks. 
> [1] [2]
> StreamThread(B) attempts to lock the state directory for task 1_0 but fails 
> with a LockException [3], since StreamThread(A) still holds the lock.
> StreamThread(B) returns true from updateNewAndRestoringTasks() due to the 
> check at [4] which only checks that the active assigned tasks are running.
> StreamThread(B) state is set to RUNNING
> StreamThread(A) closes the previous StandbyTask specifically calling 
> closeStateManager() [5]
> StreamThread(A) state is set to RUNNING
> Streams application for this host has completed re-balancing and is now in 
> the RUNNING state.
> State at this point is the following: State directory exists for 1_0 and all 
> data is present.
> Then at a period that is 1 to 2 intervals of [6](which is default of 10 
> minutes) after the reassignment had completed the stateDirCleaner thread will 
> execute [7].
> The stateDirCleaner will then do [8], which finds the directory 1_0, finds 
> that there isn't an active lock for that directory, acquire the lock, and 
> deletes the directory.
> State at this point is the following: State directory does not exist for 1_0.
> When the next reassignment occurs. The offset that was read by 
> StreamThread(B) during construction of the StandbyTask for 1_0 will be 
> written back to disk. This write re-creates the state store directory and 
> writes the .checkpoint file with the old offset.
> State at this point is the following: State directory exists for 1_0 with a 
> '.checkpoint' file in it, but there is no other state store data in the 
> directory.
> If this host is assigned the active task for 1_0 then all the history in the 
> state store will be missing from before the offset that was read at the 
> previous reassignment. 
> If this host is assigned the standby task for 1_0 then the lock will be 
> acquired and the standby will start to consume records, but it will still be 
> missing all records from before the offset that was read at the previous 
> reassignment.
> If this host is not assigned 1_0, then the state directory will 

[jira] [Commented] (KAFKA-8433) Give the opportunity to use serializers and deserializers with IntegrationTestUtils

2019-05-28 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8433?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16849911#comment-16849911
 ] 

Matthias J. Sax commented on KAFKA-8433:


\cc [~guozhang] [~bbejeck] WDYT?

> Give the opportunity to use serializers and deserializers with 
> IntegrationTestUtils
> ---
>
> Key: KAFKA-8433
> URL: https://issues.apache.org/jira/browse/KAFKA-8433
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Anthony C
>Priority: Minor
>
> Currently, each static method using a producer or a consumer don't allow to 
> pass serializers or deserializers as arguments.
> Because of that we are not able to mock schema registry (for example), or 
> other producer / consumer specific attributs.
> To resolve that we just need to add methods using serializers or 
> deserializers as arguments.
> Kafka producer and consumer constructors already accept null serializers or 
> deserializers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-8187) State store record loss across multiple reassignments when using standby tasks

2019-05-28 Thread Bill Bejeck (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8187?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck reassigned KAFKA-8187:
--

Assignee: Lifei Chen  (was: Bill Bejeck)

> State store record loss across multiple reassignments when using standby tasks
> --
>
> Key: KAFKA-8187
> URL: https://issues.apache.org/jira/browse/KAFKA-8187
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.1
>Reporter: William Greer
>Assignee: Lifei Chen
>Priority: Major
>
> Overview:
> There is a race condition that can cause a partitioned state store to be 
> missing records up to an offset when using standby tasks.
> When a reassignment occurs and a task is migrated to a StandbyTask in another 
> StreamThread/TaskManager on the same JVM, there can be lock contention that 
> prevents the StandbyTask on the currently assigned StreamThread from 
> acquiring the lock and to not retry acquiring the lock because all of the 
> active StreamTasks are running for that StreamThread. If the StandbyTask does 
> not acquire the lock before the StreamThread enters into the RUNNING state, 
> then the StandbyTask will not consume any records. If there is no subsequent 
> reassignment before the second execution of the stateDirCleaner Thread, then 
> the task directory for the StandbyTask will be deleted. When the next 
> reassignment occurs the offset that was read by the StandbyTask at creation 
> time before acquiring the lock will be written back to the state store 
> directory, this re-creates the state store directory.
> An example:
> StreamThread(A) and StreamThread(B) are running on the same JVM in the same 
> streams application.
> StreamThread(A) has StandbyTask 1_0
> StreamThread(B) has no tasks
> A reassignment is triggered by another host in the streams application fleet.
> StreamThread(A) is notified with a PARTITIONS_REVOKED event of the threads 
> one task
> StreamThread(B) is notified with a PARTITIONS_ASSIGNED event of a standby 
> task for 1_0
> Here begins the race condition.
> StreamThread(B) creates the StandbyTask which reads the current checkpoint 
> from disk.
> StreamThread(B) then attempts to updateNewAndRestoringTasks() for it's 
> assigned tasks. [0]
> StreamThread(B) initializes the new tasks for the active and standby tasks. 
> [1] [2]
> StreamThread(B) attempts to lock the state directory for task 1_0 but fails 
> with a LockException [3], since StreamThread(A) still holds the lock.
> StreamThread(B) returns true from updateNewAndRestoringTasks() due to the 
> check at [4] which only checks that the active assigned tasks are running.
> StreamThread(B) state is set to RUNNING
> StreamThread(A) closes the previous StandbyTask specifically calling 
> closeStateManager() [5]
> StreamThread(A) state is set to RUNNING
> Streams application for this host has completed re-balancing and is now in 
> the RUNNING state.
> State at this point is the following: State directory exists for 1_0 and all 
> data is present.
> Then at a period that is 1 to 2 intervals of [6](which is default of 10 
> minutes) after the reassignment had completed the stateDirCleaner thread will 
> execute [7].
> The stateDirCleaner will then do [8], which finds the directory 1_0, finds 
> that there isn't an active lock for that directory, acquire the lock, and 
> deletes the directory.
> State at this point is the following: State directory does not exist for 1_0.
> When the next reassignment occurs. The offset that was read by 
> StreamThread(B) during construction of the StandbyTask for 1_0 will be 
> written back to disk. This write re-creates the state store directory and 
> writes the .checkpoint file with the old offset.
> State at this point is the following: State directory exists for 1_0 with a 
> '.checkpoint' file in it, but there is no other state store data in the 
> directory.
> If this host is assigned the active task for 1_0 then all the history in the 
> state store will be missing from before the offset that was read at the 
> previous reassignment. 
> If this host is assigned the standby task for 1_0 then the lock will be 
> acquired and the standby will start to consume records, but it will still be 
> missing all records from before the offset that was read at the previous 
> reassignment.
> If this host is not assigned 1_0, then the state directory will get cleaned 
> up by the stateDirCleaner thread 10 to 20 minutes later and the record loss 
> issue will be hidden.
> [0] 
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L865-L869
> [1] 
> 

[jira] [Commented] (KAFKA-8423) Update ducktape to not use deprecated APIs

2019-05-28 Thread Ewen Cheslack-Postava (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16849908#comment-16849908
 ] 

Ewen Cheslack-Postava commented on KAFKA-8423:
--

[~htewari] The warning is actually coming from the paramiko library, so in a 
sense we're two levels removed from the actual source of the problem. This is, 
indeed an issue in ducktape in that the dependency on paramiko would need to be 
updated there, but checking the most recent released version of paramiko atm 
(2.4.2), 
[https://github.com/paramiko/paramiko/blob/2.4.2/paramiko/kex_ecdh_nist.py#L39] 
indicates paramiko is still using the same methods and hasn't updated. To fix 
this we might need to contribute patch upstream to paramiko, wait for them to 
release, then update the paramiko dependency in ducktape, release ducktape, and 
finally update the ducktape dependency here in Kafka.

> Update ducktape to not use deprecated APIs
> --
>
> Key: KAFKA-8423
> URL: https://issues.apache.org/jira/browse/KAFKA-8423
> Project: Kafka
>  Issue Type: Improvement
>  Components: system tests
>Affects Versions: 2.3.0
>Reporter: Matthias J. Sax
>Assignee: Hardik Tewari
>Priority: Major
>
> Running system tests locally, I see the following warnings:
> {code:java}
> /usr/local/lib/python2.7/dist-packages/paramiko/kex_ecdh_nist.py:39: 
> CryptographyDeprecationWarning: encode_point has been deprecated on 
> EllipticCurvePublicNumbers and will be removed in a future version. Please 
> use EllipticCurvePublicKey.public_bytes to obtain both compressed and 
> uncompressed point encoding.
> m.add_string(self.Q_C.public_numbers().encode_point())
> /usr/local/lib/python2.7/dist-packages/paramiko/kex_ecdh_nist.py:94: 
> CryptographyDeprecationWarning: Support for unsafe construction of public 
> numbers from encoded data will be removed in a future version. Please use 
> EllipticCurvePublicKey.from_encoded_point
> self.curve, Q_S_bytes
> /usr/local/lib/python2.7/dist-packages/paramiko/kex_ecdh_nist.py:109: 
> CryptographyDeprecationWarning: encode_point has been deprecated on 
> EllipticCurvePublicNumbers and will be removed in a future version. Please 
> use EllipticCurvePublicKey.public_bytes to obtain both compressed and 
> uncompressed point encoding.
> hm.add_string(self.Q_C.public_numbers().encode_point())
> {code}
> We should update the code to not use deprecated APIs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8187) State store record loss across multiple reassignments when using standby tasks

2019-05-28 Thread William Greer (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16849752#comment-16849752
 ] 

William Greer commented on KAFKA-8187:
--

Apologies on not getting back on this sooner. I'd be willing to fill a PR if it 
is still needed. However there appears to be a pull request already.

[~hustclf] Thank you for posting a PR for addressing this issue Kafka-8187 .

> State store record loss across multiple reassignments when using standby tasks
> --
>
> Key: KAFKA-8187
> URL: https://issues.apache.org/jira/browse/KAFKA-8187
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.1
>Reporter: William Greer
>Assignee: Bill Bejeck
>Priority: Major
>
> Overview:
> There is a race condition that can cause a partitioned state store to be 
> missing records up to an offset when using standby tasks.
> When a reassignment occurs and a task is migrated to a StandbyTask in another 
> StreamThread/TaskManager on the same JVM, there can be lock contention that 
> prevents the StandbyTask on the currently assigned StreamThread from 
> acquiring the lock and to not retry acquiring the lock because all of the 
> active StreamTasks are running for that StreamThread. If the StandbyTask does 
> not acquire the lock before the StreamThread enters into the RUNNING state, 
> then the StandbyTask will not consume any records. If there is no subsequent 
> reassignment before the second execution of the stateDirCleaner Thread, then 
> the task directory for the StandbyTask will be deleted. When the next 
> reassignment occurs the offset that was read by the StandbyTask at creation 
> time before acquiring the lock will be written back to the state store 
> directory, this re-creates the state store directory.
> An example:
> StreamThread(A) and StreamThread(B) are running on the same JVM in the same 
> streams application.
> StreamThread(A) has StandbyTask 1_0
> StreamThread(B) has no tasks
> A reassignment is triggered by another host in the streams application fleet.
> StreamThread(A) is notified with a PARTITIONS_REVOKED event of the threads 
> one task
> StreamThread(B) is notified with a PARTITIONS_ASSIGNED event of a standby 
> task for 1_0
> Here begins the race condition.
> StreamThread(B) creates the StandbyTask which reads the current checkpoint 
> from disk.
> StreamThread(B) then attempts to updateNewAndRestoringTasks() for it's 
> assigned tasks. [0]
> StreamThread(B) initializes the new tasks for the active and standby tasks. 
> [1] [2]
> StreamThread(B) attempts to lock the state directory for task 1_0 but fails 
> with a LockException [3], since StreamThread(A) still holds the lock.
> StreamThread(B) returns true from updateNewAndRestoringTasks() due to the 
> check at [4] which only checks that the active assigned tasks are running.
> StreamThread(B) state is set to RUNNING
> StreamThread(A) closes the previous StandbyTask specifically calling 
> closeStateManager() [5]
> StreamThread(A) state is set to RUNNING
> Streams application for this host has completed re-balancing and is now in 
> the RUNNING state.
> State at this point is the following: State directory exists for 1_0 and all 
> data is present.
> Then at a period that is 1 to 2 intervals of [6](which is default of 10 
> minutes) after the reassignment had completed the stateDirCleaner thread will 
> execute [7].
> The stateDirCleaner will then do [8], which finds the directory 1_0, finds 
> that there isn't an active lock for that directory, acquire the lock, and 
> deletes the directory.
> State at this point is the following: State directory does not exist for 1_0.
> When the next reassignment occurs. The offset that was read by 
> StreamThread(B) during construction of the StandbyTask for 1_0 will be 
> written back to disk. This write re-creates the state store directory and 
> writes the .checkpoint file with the old offset.
> State at this point is the following: State directory exists for 1_0 with a 
> '.checkpoint' file in it, but there is no other state store data in the 
> directory.
> If this host is assigned the active task for 1_0 then all the history in the 
> state store will be missing from before the offset that was read at the 
> previous reassignment. 
> If this host is assigned the standby task for 1_0 then the lock will be 
> acquired and the standby will start to consume records, but it will still be 
> missing all records from before the offset that was read at the previous 
> reassignment.
> If this host is not assigned 1_0, then the state directory will get cleaned 
> up by the stateDirCleaner thread 10 to 20 minutes later and the record loss 
> issue will be hidden.
> [0] 
> 

[jira] [Commented] (KAFKA-8390) Replace CreateDelegationToken request/response with automated protocol

2019-05-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16849625#comment-16849625
 ] 

ASF GitHub Bot commented on KAFKA-8390:
---

mimaison commented on pull request #6828: KAFKA-8390: Use automatic RPC 
generation in CreateDelegationToken
URL: https://github.com/apache/kafka/pull/6828
 
 
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Replace CreateDelegationToken request/response with automated protocol
> --
>
> Key: KAFKA-8390
> URL: https://issues.apache.org/jira/browse/KAFKA-8390
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8441) Flaky Test RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenCreated

2019-05-28 Thread Bruno Cadonna (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-8441:
-
Summary: Flaky Test 
RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenCreated  (was: Flaky Test 
KStreamAggregationIntegrationTest#shouldReduceSessionWindows)

> Flaky Test RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenCreated
> 
>
> Key: KAFKA-8441
> URL: https://issues.apache.org/jira/browse/KAFKA-8441
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.3.0
>Reporter: Bruno Cadonna
>Priority: Critical
>  Labels: flaky-test
>
> h1. Stacktrace:
> {noformat}
> java.lang.AssertionError: Condition not met within timeout 3. Topics not 
> deleted after 3 milli seconds.
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:375)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:352)
>   at 
> org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteTopicsAndWait(EmbeddedKafkaCluster.java:265)
>   at 
> org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteAndRecreateTopics(EmbeddedKafkaCluster.java:288)
>   at 
> org.apache.kafka.streams.integration.RegexSourceIntegrationTest.setUp(RegexSourceIntegrationTest.java:118)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.RunBefores.invokeMethod(RunBefores.java:33)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:412)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.GeneratedMethodAccessor14.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>   at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
>   at 

[jira] [Updated] (KAFKA-8441) Flaky Test KStreamAggregationIntegrationTest#shouldReduceSessionWindows

2019-05-28 Thread Bruno Cadonna (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-8441:
-
Summary: Flaky Test 
KStreamAggregationIntegrationTest#shouldReduceSessionWindows  (was: CLONE - 
Flaky Test KStreamAggregationIntegrationTest#shouldReduceSessionWindows)

> Flaky Test KStreamAggregationIntegrationTest#shouldReduceSessionWindows
> ---
>
> Key: KAFKA-8441
> URL: https://issues.apache.org/jira/browse/KAFKA-8441
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.3.0
>Reporter: Bruno Cadonna
>Priority: Critical
>  Labels: flaky-test
>
> h1. Stacktrace:
> {noformat}
> java.lang.AssertionError: 
> Expected: 
>  but: was null
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18)
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
>   at 
> org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.shouldReduceSessionWindows(KStreamAggregationIntegrationTest.java:663)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:412)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>   at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
>   at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>   at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
>   at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> 

[jira] [Created] (KAFKA-8441) CLONE - Flaky Test KStreamAggregationIntegrationTest#shouldReduceSessionWindows

2019-05-28 Thread Bruno Cadonna (JIRA)
Bruno Cadonna created KAFKA-8441:


 Summary: CLONE - Flaky Test 
KStreamAggregationIntegrationTest#shouldReduceSessionWindows
 Key: KAFKA-8441
 URL: https://issues.apache.org/jira/browse/KAFKA-8441
 Project: Kafka
  Issue Type: Bug
  Components: streams, unit tests
Affects Versions: 2.3.0
Reporter: Bruno Cadonna


h1. Stacktrace:
{noformat}
java.lang.AssertionError: 
Expected: 
 but: was null
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18)
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
at 
org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.shouldReduceSessionWindows(KStreamAggregationIntegrationTest.java:663)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
at org.junit.runners.ParentRunner.run(ParentRunner.java:412)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
at 
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
at sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at 
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
at 
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at 
org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:175)
at 
org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:157)

[jira] [Created] (KAFKA-8440) Flaky Test KStreamAggregationIntegrationTest#shouldReduceSessionWindows

2019-05-28 Thread Bruno Cadonna (JIRA)
Bruno Cadonna created KAFKA-8440:


 Summary: Flaky Test 
KStreamAggregationIntegrationTest#shouldReduceSessionWindows
 Key: KAFKA-8440
 URL: https://issues.apache.org/jira/browse/KAFKA-8440
 Project: Kafka
  Issue Type: Bug
  Components: streams, unit tests
Affects Versions: 2.3.0
Reporter: Bruno Cadonna






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6983) Error while deleting segments - The process cannot access the file because it is being used by another process

2019-05-28 Thread JIRA


[ 
https://issues.apache.org/jira/browse/KAFKA-6983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16849545#comment-16849545
 ] 

Gérald Quintana commented on KAFKA-6983:


Still occurs on Kafka 2.2.0.

It makes developping Kafka Streams applications on Windows really complicated. 
As the Kafka brokers are killed by this error.
{noformat}
[2019-05-28 10:57:03,991] ERROR Error while deleting segments for 
groupby-stream-word_table-repartition-0 in dir 
C:\Java\kafka_2.12-2.2.0\data\k-0 (kafka.server.LogDirFailureChannel)
java.nio.file.FileSystemException: 
C:\Java\kafka_2.12-2.2.0\data\k-0\groupby-stream-word_table-repartition-0\.timeindex
 -> 
C:\Java\kafka_2.12-2.2.0\data\k-0\groupby-stream-word_table-repartition-0\.timeindex.deleted:
 Le processus ne peut pas accÚder au fichier car ce fichier est utilisÚ par un 
autre processus.

    at 
sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86)
    at 
sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
    at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:387)
    at 
sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287)
    at java.nio.file.Files.move(Files.java:1395)
    at 
org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:805)
    at kafka.log.AbstractIndex.renameTo(AbstractIndex.scala:205)
    at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:490)
    at kafka.log.Log.asyncDeleteSegment(Log.scala:1924)
    at kafka.log.Log.deleteSegment(Log.scala:1909)
    at kafka.log.Log.$anonfun$deleteSegments$3(Log.scala:1455)
    at kafka.log.Log.$anonfun$deleteSegments$3$adapted(Log.scala:1455)
    at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at kafka.log.Log.$anonfun$deleteSegments$2(Log.scala:1455)
    at 
scala.runtime.java8.JFunction0$mcI$sp.apply(JFunction0$mcI$sp.java:23)
    at kafka.log.Log.maybeHandleIOException(Log.scala:2013)
    at kafka.log.Log.deleteSegments(Log.scala:1446)
    at kafka.log.Log.deleteOldSegments(Log.scala:1441)
    at kafka.log.Log.deleteLogStartOffsetBreachedSegments(Log.scala:1541)
    at kafka.log.Log.deleteOldSegments(Log.scala:1509)
    at kafka.log.LogManager.$anonfun$cleanupLogs$3(LogManager.scala:913)
    at 
kafka.log.LogManager.$anonfun$cleanupLogs$3$adapted(LogManager.scala:910)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at kafka.log.LogManager.cleanupLogs(LogManager.scala:910)
    at kafka.log.LogManager.$anonfun$startup$2(LogManager.scala:395)
    at 
kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
    at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
    at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
    Suppressed: java.nio.file.FileSystemException: 
C:\Java\kafka_2.12-2.2.0\data\k-0\groupby-stream-word_table-repartition-0\.timeindex
 -> 
C:\Java\kafka_2.12-2.2.0\data\k-0\groupby-stream-word_table-repartition-0\.timeindex.deleted:
 Le processus ne peut pas accÚder au fichier car ce fichier est utilisÚ par un 
autre processus.

    at 
sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86)
    at 
sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
    at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:301)
    at 
sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287)
    at java.nio.file.Files.move(Files.java:1395)
    at 
org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:802)
    ... 30 more{noformat}

> Error while deleting segments - The process cannot access the file because it 
> is being used by another process
> --
>
> Key: KAFKA-6983
> URL: https://issues.apache.org/jira/browse/KAFKA-6983
> 

[jira] [Created] (KAFKA-8439) Readability issues on mobile phone

2019-05-28 Thread Daniel Noga (JIRA)
Daniel Noga created KAFKA-8439:
--

 Summary: Readability issues on mobile phone
 Key: KAFKA-8439
 URL: https://issues.apache.org/jira/browse/KAFKA-8439
 Project: Kafka
  Issue Type: Bug
  Components: website
Reporter: Daniel Noga
 Attachments: Screenshot_20190528_083509.png, 
Screenshot_20190528_083603.png

I tried to read [https://kafka.apache.org/22/documentation/streams/quickstart] 
page on my Samsung Galaxy S7 mobile phone and I have two difficulties.

First, zooming is disabled. So when I want to see the image on the bottom of 
site, I cannot zoom it or open it. So I don't see content of the image -> zoom 
should be enabled.

Second, when I switch to portrait view, I have probably 1/3 of the page hidden 
by menu -> bottom menu should not be visible always.

 

Images in attachment are screenshoted in desktop Chrome with developer tools.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)