[jira] [Created] (KAFKA-8436) Replace AddOffsetsToTxn request/response with automated protocol

2019-05-26 Thread Boyang Chen (JIRA)
Boyang Chen created KAFKA-8436:
--

 Summary: Replace AddOffsetsToTxn request/response with automated 
protocol
 Key: KAFKA-8436
 URL: https://issues.apache.org/jira/browse/KAFKA-8436
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen
Assignee: Boyang Chen






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8435) Replace DeleteGroups request/response with automated protocol

2019-05-26 Thread Boyang Chen (JIRA)
Boyang Chen created KAFKA-8435:
--

 Summary: Replace DeleteGroups request/response with automated 
protocol
 Key: KAFKA-8435
 URL: https://issues.apache.org/jira/browse/KAFKA-8435
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen
Assignee: Boyang Chen






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7788) Support null defaults in KAFKA-7609 RPC specifications

2019-05-26 Thread Boyang Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16848646#comment-16848646
 ] 

Boyang Chen commented on KAFKA-7788:


[~hachikuji] Is this being solved by 
[https://github.com/apache/kafka/pull/6780]?

> Support null defaults in KAFKA-7609 RPC specifications
> --
>
> Key: KAFKA-7788
> URL: https://issues.apache.org/jira/browse/KAFKA-7788
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Colin P. McCabe
>Priority: Minor
>
> It would be nice if we could support null values as defaults in the 
> KAFKA-7609 RPC specification files.  null defaults should be allowed only if 
> the field is nullable in all supported versions of the field.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2019-05-26 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16848610#comment-16848610
 ] 

Richard Yu commented on KAFKA-7994:
---

cc [~mjsax] Made the change.

> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Richard Yu
>Priority: Major
> Attachments: possible-patch.diff
>
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2019-05-26 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16848607#comment-16848607
 ] 

Matthias J. Sax commented on KAFKA-7994:


Yes. Please rebase the PR to only include the "preserve partition time" fix. 
This helps to make progress. Thanks a lot.

> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Richard Yu
>Priority: Major
> Attachments: possible-patch.diff
>
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8433) Give the opportunity to use serializers and deserializers with IntegrationTestUtils

2019-05-26 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8433?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16848605#comment-16848605
 ] 

Matthias J. Sax commented on KAFKA-8433:


I seems you are using this class for your own testing purpose? It's not 
recommended, because it's not part of public API.

Do you know about the official test-utils package: 
[https://kafka.apache.org/22/documentation/streams/developer-guide/testing.html]

I would urge you to use the public test-utils package instead of relying on 
internal testing classes. We can still merge the PR, but we also tend to remove 
unused methods from time to time and if we don't need them internally, it might 
break for you in the future again.

> Give the opportunity to use serializers and deserializers with 
> IntegrationTestUtils
> ---
>
> Key: KAFKA-8433
> URL: https://issues.apache.org/jira/browse/KAFKA-8433
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Anthony Callaert
>Priority: Minor
>
> Currently, each static method using a producer or a consumer don't allow to 
> pass serializers or deserializers as arguments.
> Because of that we are not able to mock schema registry (for example), or 
> other producer / consumer specific attributs.
> To resolve that we just need to add methods using serializers or 
> deserializers as arguments.
> Kafka producer and consumer constructors already accept null serializers or 
> deserializers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2019-05-26 Thread Richard Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu updated KAFKA-7994:
--
Description: 
We compute a per-partition partition-time as the maximum timestamp over all 
records processed so far. Furthermore, we use partition-time to compute 
stream-time for each task as maximum over all partition-times (for all 
corresponding task partitions). This stream-time is used to make decisions 
about processing out-of-order records or drop them if they are late (ie, 
timestamp < stream-time - grace-period).

During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, -1) 
for tasks that are newly created (or migrated). In net effect, we forget 
current stream-time for this case what may lead to non-deterministic behavior 
if we stop processing right before a late record, that would be dropped if we 
continue processing, but is not dropped after rebalance/restart. Let's look at 
an examples with a grade period of 5ms for a tumbling windowed of 5ms, and the 
following records (timestamps in parenthesis):

 
{code:java}
r1(0) r2(5) r3(11) r4(2){code}
In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
rebalance after processing `r3` but before processing `r4`, we would 
reinitialize stream-time as -1, and thus would process `r4` on restart/after 
rebalance. The problem is, that stream-time does advance differently from a 
global point of view: 0, 5, 11, 2.

Note, this is a corner case, because if we would stop processing one record 
earlier, ie, after processing `r2` but before processing `r3`, stream-time 
would be advance correctly from a global point of view.

A potential fix would be, to store latest observed partition-time in the 
metadata of committed offsets. Thus way, on restart/rebalance we can 
re-initialize time correctly.

  was:
We compute a per-partition partition-time as the maximum timestamp over all 
records processed so far. Furthermore, we use partition-time to compute 
stream-time for each task as maximum over all partition-times (for all 
corresponding task partitions). This stream-time is used to make decisions 
about processing out-of-order records or drop them if they are late (ie, 
timestamp < stream-time - grace-period).

During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, -1) 
for tasks that are newly created (or migrated). In net effect, we forget 
current stream-time for this case what may lead to non-deterministic behavior 
if we stop processing right before a late record, that would be dropped if we 
continue processing, but is not dropped after rebalance/restart. Let's look at 
an examples with a grade period of 5ms for a tumbling windowed of 5ms, and the 
following records (timestamps in parenthesis):

 
{code:java}
r1(0) r2(5) r3(11) r4(2){code}
In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
rebalance after processing `r3` but before processing `r4`, we would 
reinitialize stream-time as -1, and thus would process `r4` on restart/after 
rebalance. The problem is, that stream-time does advance differently from a 
global point of view: 0, 5, 11, 2.

Note, this is a corner case, because if we would stop processing one record 
earlier, ie, after processing `r2` but before processing `r3`, stream-time 
would be advance correctly from a global point of view.

A potential fix would be, to store latest observed partition-time in the 
metadata of committed offsets. Thus way, on restart/rebalance we can 
re-initialize time correctly.

Notice that this particular issue applies for all Stream Tasks in the topology. 
The further down the DAG records flow, the more likely it is that the 
StreamTask will have an incorrect stream time. For instance, if r3 was filtered 
out, the tasks receiving the processed records will compute the stream time as 
5 instead of the correct timestamp being 11. This entails us to also propagate 
the latest observed partition time as well downstream.  That means the sources 
located at the head of the topology must forward the partition time to its 
subtopologies whenever records are sent.


> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Richard Yu
>Priority: Major
> Attachments: possible-patch.diff
>
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maxi

[jira] [Comment Edited] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2019-05-26 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16848534#comment-16848534
 ] 

Richard Yu edited comment on KAFKA-7994 at 5/26/19 9:32 PM:


Sure [~mjsax] I'm all for it, since it looks like the global stream time will 
be hard to define. So should I rebase my PR to only cover this issue?


was (Author: yohan123):
Sure [~mjsax] I'm all for it, since it looks like the global stream time will 
be hard to define. I will create a new issue.

> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Richard Yu
>Priority: Major
> Attachments: possible-patch.diff
>
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.
> Notice that this particular issue applies for all Stream Tasks in the 
> topology. The further down the DAG records flow, the more likely it is that 
> the StreamTask will have an incorrect stream time. For instance, if r3 was 
> filtered out, the tasks receiving the processed records will compute the 
> stream time as 5 instead of the correct timestamp being 11. This entails us 
> to also propagate the latest observed partition time as well downstream.  
> That means the sources located at the head of the topology must forward the 
> partition time to its subtopologies whenever records are sent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8434) Make global stream time consistent over all stream tasks

2019-05-26 Thread Richard Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu resolved KAFKA-8434.
---
Resolution: Fixed

> Make global stream time consistent over all stream tasks
> 
>
> Key: KAFKA-8434
> URL: https://issues.apache.org/jira/browse/KAFKA-8434
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Richard Yu
>Priority: Major
>  Labels: kip, needs-discussion
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8434) Make global stream time consistent over all stream tasks

2019-05-26 Thread Richard Yu (JIRA)
Richard Yu created KAFKA-8434:
-

 Summary: Make global stream time consistent over all stream tasks
 Key: KAFKA-8434
 URL: https://issues.apache.org/jira/browse/KAFKA-8434
 Project: Kafka
  Issue Type: Improvement
Reporter: Richard Yu






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8434) Make global stream time consistent over all stream tasks

2019-05-26 Thread Richard Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu updated KAFKA-8434:
--
Labels: kip needs-discussion  (was: )

> Make global stream time consistent over all stream tasks
> 
>
> Key: KAFKA-8434
> URL: https://issues.apache.org/jira/browse/KAFKA-8434
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Richard Yu
>Priority: Major
>  Labels: kip, needs-discussion
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8434) Make global stream time consistent over all stream tasks

2019-05-26 Thread Richard Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu updated KAFKA-8434:
--
Component/s: streams

> Make global stream time consistent over all stream tasks
> 
>
> Key: KAFKA-8434
> URL: https://issues.apache.org/jira/browse/KAFKA-8434
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Richard Yu
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2019-05-26 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16848534#comment-16848534
 ] 

Richard Yu commented on KAFKA-7994:
---

Sure [~mjsax] I'm all for it, since it looks like the global stream time will 
be hard to define. I will create a new issue.

> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Richard Yu
>Priority: Major
> Attachments: possible-patch.diff
>
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.
> Notice that this particular issue applies for all Stream Tasks in the 
> topology. The further down the DAG records flow, the more likely it is that 
> the StreamTask will have an incorrect stream time. For instance, if r3 was 
> filtered out, the tasks receiving the processed records will compute the 
> stream time as 5 instead of the correct timestamp being 11. This entails us 
> to also propagate the latest observed partition time as well downstream.  
> That means the sources located at the head of the topology must forward the 
> partition time to its subtopologies whenever records are sent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8433) Give the opportunity to use serializers and deserializers with IntegrationTestUtils

2019-05-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8433?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16848535#comment-16848535
 ] 

ASF GitHub Bot commented on KAFKA-8433:
---

callaertanthony commented on pull request #6822: KAFKA-8433 : Use serializers 
and deserializers with IntegrationTestUtils
URL: https://github.com/apache/kafka/pull/6822
 
 
   Add serializers and deserializers to methods
   
   Only IntegrationTestUtils is modified.
   No regression detected in stream tests.
   
   ### Committer Checklist (excluded from commit message)
   - [x] Verify design and implementation 
   - [x] Verify test coverage and CI build status
   - [x] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Give the opportunity to use serializers and deserializers with 
> IntegrationTestUtils
> ---
>
> Key: KAFKA-8433
> URL: https://issues.apache.org/jira/browse/KAFKA-8433
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Anthony Callaert
>Priority: Minor
>
> Currently, each static method using a producer or a consumer don't allow to 
> pass serializers or deserializers as arguments.
> Because of that we are not able to mock schema registry (for example), or 
> other producer / consumer specific attributs.
> To resolve that we just need to add methods using serializers or 
> deserializers as arguments.
> Kafka producer and consumer constructors already accept null serializers or 
> deserializers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8433) Give the opportunity to use serializers and deserializers with IntegrationTestUtils

2019-05-26 Thread Anthony Callaert (JIRA)
Anthony Callaert created KAFKA-8433:
---

 Summary: Give the opportunity to use serializers and deserializers 
with IntegrationTestUtils
 Key: KAFKA-8433
 URL: https://issues.apache.org/jira/browse/KAFKA-8433
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 2.3.0
Reporter: Anthony Callaert


Currently, each static method using a producer or a consumer don't allow to 
pass serializers or deserializers as arguments.

Because of that we are not able to mock schema registry (for example), or other 
producer / consumer specific attributs.

To resolve that we just need to add methods using serializers or deserializers 
as arguments.
Kafka producer and consumer constructors already accept null serializers or 
deserializers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8246) refactor topic/group instance id validation condition

2019-05-26 Thread Boyang Chen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-8246.

Resolution: Not A Problem

Since it's only one time validation, we don't need to refactor.

> refactor topic/group instance id validation condition
> -
>
> Key: KAFKA-8246
> URL: https://issues.apache.org/jira/browse/KAFKA-8246
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-8432) Add static membership to Sticky assignor

2019-05-26 Thread Boyang Chen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8432?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen reassigned KAFKA-8432:
--

Assignee: Boyang Chen

> Add static membership to Sticky assignor
> 
>
> Key: KAFKA-8432
> URL: https://issues.apache.org/jira/browse/KAFKA-8432
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-8355) Add static membership to Range assignor

2019-05-26 Thread Boyang Chen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen reassigned KAFKA-8355:
--

Assignee: Boyang Chen

> Add static membership to Range assignor
> ---
>
> Key: KAFKA-8355
> URL: https://issues.apache.org/jira/browse/KAFKA-8355
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8432) Add static membership to Sticky assignor

2019-05-26 Thread Boyang Chen (JIRA)
Boyang Chen created KAFKA-8432:
--

 Summary: Add static membership to Sticky assignor
 Key: KAFKA-8432
 URL: https://issues.apache.org/jira/browse/KAFKA-8432
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8297) Kafka Streams ConsumerRecordFactory yields difficult compiler error about generics

2019-05-26 Thread Rens Groothuijsen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16848480#comment-16848480
 ] 

Rens Groothuijsen edited comment on KAFKA-8297 at 5/26/19 4:42 PM:
---

Would it break the existing interface to only expose create(K, V)? Technically 
speaking, this method could then still handle the case where K happens to be a 
String. Or would that be too unintuitive for the user?

 

Edit: upon closer inspection, it does seem like it would be confusing because 
the meaning of the parameters changes depending on the input. They could be 
distinguished by a flag that indicates whether the first parameter should be 
interpreted as a topic name, but that isn't optimal either.


was (Author: rensgroothuijsen):
Would it break the existing interface to only expose create(K, V)? Technically 
speaking, this method could then still handle the case where K happens to be a 
String. Or would that be too unintuitive for the user?

> Kafka Streams ConsumerRecordFactory yields difficult compiler error about 
> generics
> --
>
> Key: KAFKA-8297
> URL: https://issues.apache.org/jira/browse/KAFKA-8297
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Michael Drogalis
>Priority: Minor
>
> When using the ConsumerRecordFactory, it's convenient to specify a default 
> topic to create records with:
> {code:java}
> ConsumerRecordFactory inputFactory = new 
> ConsumerRecordFactory<>(inputTopic, keySerializer, valueSerializer);
> {code}
> However, when the factory is used to create a record with a String key:
> {code:java}
> inputFactory.create("any string", user)
> {code}
> Compilation fails with the following warning:
> {code:java}
> Ambiguous method call. Both:
> create(String, User) in ConsumerRecordFactory and
> create(String, User) in ConsumerRecordFactory match
> {code}
> At first glance, this is a really confusing error to see during compilation. 
> What's happening is that there are two clashing signatures for `create`: 
> create(K, V) and create(String, V). The latter signature represents a topic 
> name.
> It seems like fixing this would require breaking the existing interface. This 
> is a really opaque problem to hit though, and it would be great if we could 
> avoid having users encounter this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8297) Kafka Streams ConsumerRecordFactory yields difficult compiler error about generics

2019-05-26 Thread Rens Groothuijsen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16848480#comment-16848480
 ] 

Rens Groothuijsen edited comment on KAFKA-8297 at 5/26/19 4:42 PM:
---

Would it break the existing interface to only expose create(K, V)? Technically 
speaking, this method could then still handle the case where K happens to be a 
String. Or would that be too unintuitive for the user?

Edit: upon closer inspection, it does seem like it would be confusing because 
the meaning of the parameters changes depending on the input. They could be 
distinguished by a flag that indicates whether the first parameter should be 
interpreted as a topic name, but that isn't optimal either.


was (Author: rensgroothuijsen):
Would it break the existing interface to only expose create(K, V)? Technically 
speaking, this method could then still handle the case where K happens to be a 
String. Or would that be too unintuitive for the user?

 

Edit: upon closer inspection, it does seem like it would be confusing because 
the meaning of the parameters changes depending on the input. They could be 
distinguished by a flag that indicates whether the first parameter should be 
interpreted as a topic name, but that isn't optimal either.

> Kafka Streams ConsumerRecordFactory yields difficult compiler error about 
> generics
> --
>
> Key: KAFKA-8297
> URL: https://issues.apache.org/jira/browse/KAFKA-8297
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Michael Drogalis
>Priority: Minor
>
> When using the ConsumerRecordFactory, it's convenient to specify a default 
> topic to create records with:
> {code:java}
> ConsumerRecordFactory inputFactory = new 
> ConsumerRecordFactory<>(inputTopic, keySerializer, valueSerializer);
> {code}
> However, when the factory is used to create a record with a String key:
> {code:java}
> inputFactory.create("any string", user)
> {code}
> Compilation fails with the following warning:
> {code:java}
> Ambiguous method call. Both:
> create(String, User) in ConsumerRecordFactory and
> create(String, User) in ConsumerRecordFactory match
> {code}
> At first glance, this is a really confusing error to see during compilation. 
> What's happening is that there are two clashing signatures for `create`: 
> create(K, V) and create(String, V). The latter signature represents a topic 
> name.
> It seems like fixing this would require breaking the existing interface. This 
> is a really opaque problem to hit though, and it would be great if we could 
> avoid having users encounter this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8297) Kafka Streams ConsumerRecordFactory yields difficult compiler error about generics

2019-05-26 Thread Rens Groothuijsen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16848480#comment-16848480
 ] 

Rens Groothuijsen commented on KAFKA-8297:
--

Would it break the existing interface to only expose create(K, V)? Technically 
speaking, this method could then still handle the case where K happens to be a 
String. Or would that be too unintuitive for the user?

> Kafka Streams ConsumerRecordFactory yields difficult compiler error about 
> generics
> --
>
> Key: KAFKA-8297
> URL: https://issues.apache.org/jira/browse/KAFKA-8297
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Michael Drogalis
>Priority: Minor
>
> When using the ConsumerRecordFactory, it's convenient to specify a default 
> topic to create records with:
> {code:java}
> ConsumerRecordFactory inputFactory = new 
> ConsumerRecordFactory<>(inputTopic, keySerializer, valueSerializer);
> {code}
> However, when the factory is used to create a record with a String key:
> {code:java}
> inputFactory.create("any string", user)
> {code}
> Compilation fails with the following warning:
> {code:java}
> Ambiguous method call. Both:
> create(String, User) in ConsumerRecordFactory and
> create(String, User) in ConsumerRecordFactory match
> {code}
> At first glance, this is a really confusing error to see during compilation. 
> What's happening is that there are two clashing signatures for `create`: 
> create(K, V) and create(String, V). The latter signature represents a topic 
> name.
> It seems like fixing this would require breaking the existing interface. This 
> is a really opaque problem to hit though, and it would be great if we could 
> avoid having users encounter this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8431) Add a onTimeoutExpired callback to Kafka Consumer

2019-05-26 Thread Richard Yu (JIRA)
Richard Yu created KAFKA-8431:
-

 Summary: Add a onTimeoutExpired callback to Kafka Consumer
 Key: KAFKA-8431
 URL: https://issues.apache.org/jira/browse/KAFKA-8431
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Reporter: Richard Yu


Currently, after the changes introduced in KIP-266, many methods in Kafka 
Consumer have a bounded execution time given by a user specified {{Duration}} 
parameter. However, in some cases, some methods could not perform their 
operations in the allocated timeout. In this case, the user might wish to have 
a {{onTimeoutExpired}} callback which would be called should a blocking method 
timeout before any results could be returned. 

The user can implement something like described above, but Kafka can spare the 
user the necessity of coding such a feature if we can support one by itself.

One possible use of this callback is to retry the method (e.g. the 
{{onTimeoutExpired}} callback triggers another call to the same method after 
some allocated time).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8431) Add a onTimeoutExpired callback to Kafka Consumer

2019-05-26 Thread Richard Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8431?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu updated KAFKA-8431:
--
Labels: needs-kip  (was: )

> Add a onTimeoutExpired callback to Kafka Consumer
> -
>
> Key: KAFKA-8431
> URL: https://issues.apache.org/jira/browse/KAFKA-8431
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Richard Yu
>Priority: Minor
>  Labels: needs-kip
>
> Currently, after the changes introduced in KIP-266, many methods in Kafka 
> Consumer have a bounded execution time given by a user specified {{Duration}} 
> parameter. However, in some cases, some methods could not perform their 
> operations in the allocated timeout. In this case, the user might wish to 
> have a {{onTimeoutExpired}} callback which would be called should a blocking 
> method timeout before any results could be returned. 
> The user can implement something like described above, but Kafka can spare 
> the user the necessity of coding such a feature if we can support one by 
> itself.
> One possible use of this callback is to retry the method (e.g. the 
> {{onTimeoutExpired}} callback triggers another call to the same method after 
> some allocated time).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8187) State store record loss across multiple reassignments when using standby tasks

2019-05-26 Thread Lifei Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16848458#comment-16848458
 ] 

Lifei Chen commented on KAFKA-8187:
---

Hi, I'm also interested about the issue,  and fill a PR for it based on 
[~wgreerx]'s detailed description about the root cause, and [~guozhang]'s 
advice about proposed option 1.

My solution is as follow:
 * Add wait time for other threads in the same jvm to free the locks of state 
store
 * Do not let the thread to transit to RUNNING until all tasks (including 
standby tasks) are ready.

Please review the PR and give me your helpful feedback, thanks a lot.

> State store record loss across multiple reassignments when using standby tasks
> --
>
> Key: KAFKA-8187
> URL: https://issues.apache.org/jira/browse/KAFKA-8187
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.1
>Reporter: William Greer
>Assignee: Bill Bejeck
>Priority: Major
>
> Overview:
> There is a race condition that can cause a partitioned state store to be 
> missing records up to an offset when using standby tasks.
> When a reassignment occurs and a task is migrated to a StandbyTask in another 
> StreamThread/TaskManager on the same JVM, there can be lock contention that 
> prevents the StandbyTask on the currently assigned StreamThread from 
> acquiring the lock and to not retry acquiring the lock because all of the 
> active StreamTasks are running for that StreamThread. If the StandbyTask does 
> not acquire the lock before the StreamThread enters into the RUNNING state, 
> then the StandbyTask will not consume any records. If there is no subsequent 
> reassignment before the second execution of the stateDirCleaner Thread, then 
> the task directory for the StandbyTask will be deleted. When the next 
> reassignment occurs the offset that was read by the StandbyTask at creation 
> time before acquiring the lock will be written back to the state store 
> directory, this re-creates the state store directory.
> An example:
> StreamThread(A) and StreamThread(B) are running on the same JVM in the same 
> streams application.
> StreamThread(A) has StandbyTask 1_0
> StreamThread(B) has no tasks
> A reassignment is triggered by another host in the streams application fleet.
> StreamThread(A) is notified with a PARTITIONS_REVOKED event of the threads 
> one task
> StreamThread(B) is notified with a PARTITIONS_ASSIGNED event of a standby 
> task for 1_0
> Here begins the race condition.
> StreamThread(B) creates the StandbyTask which reads the current checkpoint 
> from disk.
> StreamThread(B) then attempts to updateNewAndRestoringTasks() for it's 
> assigned tasks. [0]
> StreamThread(B) initializes the new tasks for the active and standby tasks. 
> [1] [2]
> StreamThread(B) attempts to lock the state directory for task 1_0 but fails 
> with a LockException [3], since StreamThread(A) still holds the lock.
> StreamThread(B) returns true from updateNewAndRestoringTasks() due to the 
> check at [4] which only checks that the active assigned tasks are running.
> StreamThread(B) state is set to RUNNING
> StreamThread(A) closes the previous StandbyTask specifically calling 
> closeStateManager() [5]
> StreamThread(A) state is set to RUNNING
> Streams application for this host has completed re-balancing and is now in 
> the RUNNING state.
> State at this point is the following: State directory exists for 1_0 and all 
> data is present.
> Then at a period that is 1 to 2 intervals of [6](which is default of 10 
> minutes) after the reassignment had completed the stateDirCleaner thread will 
> execute [7].
> The stateDirCleaner will then do [8], which finds the directory 1_0, finds 
> that there isn't an active lock for that directory, acquire the lock, and 
> deletes the directory.
> State at this point is the following: State directory does not exist for 1_0.
> When the next reassignment occurs. The offset that was read by 
> StreamThread(B) during construction of the StandbyTask for 1_0 will be 
> written back to disk. This write re-creates the state store directory and 
> writes the .checkpoint file with the old offset.
> State at this point is the following: State directory exists for 1_0 with a 
> '.checkpoint' file in it, but there is no other state store data in the 
> directory.
> If this host is assigned the active task for 1_0 then all the history in the 
> state store will be missing from before the offset that was read at the 
> previous reassignment. 
> If this host is assigned the standby task for 1_0 then the lock will be 
> acquired and the standby will start to consume records, but it will still be 
> missing all records from before the offset that was read at the previous 
> reassignment.
> 

[jira] [Commented] (KAFKA-8187) State store record loss across multiple reassignments when using standby tasks

2019-05-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16848456#comment-16848456
 ] 

ASF GitHub Bot commented on KAFKA-8187:
---

hustclf commented on pull request #6818: KAFKA-8187: Add wait time for other 
thread in the same jvm to free the locks
URL: https://github.com/apache/kafka/pull/6818
 
 
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   Fix KAFKA-8187: State store record loss across multiple reassignments when 
using standby tasks.
   
   - Add wait time for other thread in the save jvm to free the locks of state 
store
   
   - Do not let the thread to transit to RUNNING until all tasks (including 
standby tasks) are ready.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> State store record loss across multiple reassignments when using standby tasks
> --
>
> Key: KAFKA-8187
> URL: https://issues.apache.org/jira/browse/KAFKA-8187
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.1
>Reporter: William Greer
>Assignee: Bill Bejeck
>Priority: Major
>
> Overview:
> There is a race condition that can cause a partitioned state store to be 
> missing records up to an offset when using standby tasks.
> When a reassignment occurs and a task is migrated to a StandbyTask in another 
> StreamThread/TaskManager on the same JVM, there can be lock contention that 
> prevents the StandbyTask on the currently assigned StreamThread from 
> acquiring the lock and to not retry acquiring the lock because all of the 
> active StreamTasks are running for that StreamThread. If the StandbyTask does 
> not acquire the lock before the StreamThread enters into the RUNNING state, 
> then the StandbyTask will not consume any records. If there is no subsequent 
> reassignment before the second execution of the stateDirCleaner Thread, then 
> the task directory for the StandbyTask will be deleted. When the next 
> reassignment occurs the offset that was read by the StandbyTask at creation 
> time before acquiring the lock will be written back to the state store 
> directory, this re-creates the state store directory.
> An example:
> StreamThread(A) and StreamThread(B) are running on the same JVM in the same 
> streams application.
> StreamThread(A) has StandbyTask 1_0
> StreamThread(B) has no tasks
> A reassignment is triggered by another host in the streams application fleet.
> StreamThread(A) is notified with a PARTITIONS_REVOKED event of the threads 
> one task
> StreamThread(B) is notified with a PARTITIONS_ASSIGNED event of a standby 
> task for 1_0
> Here begins the race condition.
> StreamThread(B) creates the StandbyTask which reads the current checkpoint 
> from disk.
> StreamThread(B) then attempts to updateNewAndRestoringTasks() for it's 
> assigned tasks. [0]
> StreamThread(B) initializes the new tasks for the active and standby tasks. 
> [1] [2]
> StreamThread(B) attempts to lock the state directory for task 1_0 but fails 
> with a LockException [3], since StreamThread(A) still holds the lock.
> StreamThread(B) returns true from updateNewAndRestoringTasks() due to the 
> check at [4] which only checks that the active assigned tasks are running.
> StreamThread(B) state is set to RUNNING
> StreamThread(A) closes the previous StandbyTask specifically calling 
> closeStateManager() [5]
> StreamThread(A) state is set to RUNNING
> Streams application for this host has completed re-balancing and is now in 
> the RUNNING state.
> State at this point is the following: State directory exists for 1_0 and all 
> data is present.
> Then at a period that is 1 to 2 intervals of [6](which is default of 10 
> minutes) after the reassignment had completed the stateDirCleaner thread will 
> execute [7].
> The stateDirCleaner will then do [8], which finds the directory 1_0, finds 
> that there isn't an active lock for that directory, acquire the lock, and 
> deletes the 

[jira] [Commented] (KAFKA-2939) Make AbstractConfig.logUnused() tunable for clients

2019-05-26 Thread Rens Groothuijsen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-2939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16848414#comment-16848414
 ] 

Rens Groothuijsen commented on KAFKA-2939:
--

Took a shot at implementing the boolean idea, though this would mean the 3 
places where logUnused() is actually called (KafkaConsumer, KafkaProducer and 
KafkaAdminClient) would also need to have an extra boolean tacked on to their 
constructors. It looks very hacky, to be honest.

> Make AbstractConfig.logUnused() tunable for clients
> ---
>
> Key: KAFKA-2939
> URL: https://issues.apache.org/jira/browse/KAFKA-2939
> Project: Kafka
>  Issue Type: Improvement
>  Components: config
>Reporter: Guozhang Wang
>Assignee: Rens Groothuijsen
>Priority: Major
>  Labels: newbie
>
> Today we always log unused configs in KafkaProducer / KafkaConsumer in their 
> constructors, however for some cases like Kafka Streams that make use of 
> these clients, other configs may be passed in to configure Partitioner / 
> Serializer classes, etc. So it would be better to make this function call 
> optional to avoid printing unnecessary and confusing WARN entries.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7245) Deprecate WindowStore#put(key, value)

2019-05-26 Thread Omkar Mestry (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16848387#comment-16848387
 ] 

Omkar Mestry commented on KAFKA-7245:
-

Thread:- [https://www.mail-archive.com/dev@kafka.apache.org/msg98201.html]

Added to "Under Discussion" section in the specified pages

> Deprecate WindowStore#put(key, value)
> -
>
> Key: KAFKA-7245
> URL: https://issues.apache.org/jira/browse/KAFKA-7245
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Omkar Mestry
>Priority: Minor
>  Labels: needs-kip, newbie
>
> We want to remove `WindowStore#put(key, value)` – for this, we first need to 
> deprecate is via a KIP and remove later.
> Instead of using `WindowStore#put(key, value)` we need to migrate code to 
> specify the timestamp explicitly using `WindowStore#put(key, value, 
> timestamp)`. The current code base use the explicit call to set the timestamp 
> in production code already. The simplified `put(key, value)` is only used in 
> tests, and thus, we would need to update those tests.
> KIP-474 :- 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=115526545]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)