[jira] [Updated] (KAFKA-12676) Improve sticky general assignor underlying algorithm for the imbalanced case

2021-04-15 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-12676:
--
Description: 
As discussed in KAFKA-12675, we think the general assignor algorithm might be 
able to improve to make some edge cases more balanced, and improve the 
scalability and performance.

Ref: The algorithm is here: 
[https://github.com/twmb/franz-go/blob/master/pkg/kgo/internal|https://github.com/twmb/franz-go/blob/master/pkg/kgo/internal/sticky/graph.go]

 

[https://issues.apache.org/jira/browse/KAFKA-12675?focusedCommentId=17322603=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17322603/sticky/graph.go|https://github.com/twmb/franz-go/blob/master/pkg/kgo/internal/sticky/graph.go]

 

  was:
As discussed in KAFKA-12675, we think the general assignor algorithm might be 
able to improve to make some edge cases more balanced, and improve the 
scalability and performance.

Ref: The algorithm is here: 
[https://github.com/twmb/franz-go/blob/master/pkg/kgo/internal|https://github.com/twmb/franz-go/blob/master/pkg/kgo/internal/sticky/graph.go]

 

[The algorithm is here: 
|https://github.com/twmb/franz-go/blob/master/pkg/kgo/internal/sticky/graph.go] 
[https://github.com/twmb/franz-go/blob/master/pkg/kgo/internal/sticky/graph.go] 
[/sticky/graph.go|https://github.com/twmb/franz-go/blob/master/pkg/kgo/internal/sticky/graph.go]

 


> Improve sticky general assignor underlying algorithm for the imbalanced case
> 
>
> Key: KAFKA-12676
> URL: https://issues.apache.org/jira/browse/KAFKA-12676
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Priority: Major
>
> As discussed in KAFKA-12675, we think the general assignor algorithm might be 
> able to improve to make some edge cases more balanced, and improve the 
> scalability and performance.
> Ref: The algorithm is here: 
> [https://github.com/twmb/franz-go/blob/master/pkg/kgo/internal|https://github.com/twmb/franz-go/blob/master/pkg/kgo/internal/sticky/graph.go]
>  
> [https://issues.apache.org/jira/browse/KAFKA-12675?focusedCommentId=17322603=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17322603/sticky/graph.go|https://github.com/twmb/franz-go/blob/master/pkg/kgo/internal/sticky/graph.go]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12675) Improve sticky general assignor scalability and performance

2021-04-15 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17322608#comment-17322608
 ] 

Luke Chen commented on KAFKA-12675:
---

[~twmb], I see. I've created another ticket KAFKA-12676 to address your 
suggestion. We can make incremental improvement for this. Thanks for suggestion!

> Improve sticky general assignor scalability and performance
> ---
>
> Key: KAFKA-12675
> URL: https://issues.apache.org/jira/browse/KAFKA-12675
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> Currently, we have "general assignor" for non-equal subscription case and 
> "constrained assignor" for all equal subscription case. There's a performance 
> test for constrained assignor with:
> topicCount = {color:#ff}500{color};
>  partitionCount = {color:#ff}2000{color}; 
>  consumerCount = {color:#ff}2000{color};
> in _testLargeAssignmentAndGroupWithUniformSubscription,_ total 1 million 
> partitions and we can complete the assignment within 2 second in my machine.
> However, if we let 1 of the consumer subscribe to only 1 topic, it'll use 
> "general assignor", and the result with the same setting as above is: 
> *OutOfMemory,* 
>  Even we down the count to:
> topicCount = {color:#ff}50{color};
>  partitionCount = 1{color:#ff}000{color}; 
>  consumerCount = 1{color:#ff}000{color};
> We still got *OutOfMemory*.
> With this setting:
> topicCount = {color:#ff}50{color};
>  partitionCount = 8{color:#ff}00{color}; 
>  consumerCount = 8{color:#ff}00{color};
> We can complete in 10 seconds in my machine, which is still slow.
>  
> Since we are going to set default assignment strategy to 
> "CooperativeStickyAssignor" soon,  we should improve the scalability and 
> performance for sticky general assignor.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12676) Improve sticky general assignor underlying algorithm for the imbalanced case

2021-04-15 Thread Luke Chen (Jira)
Luke Chen created KAFKA-12676:
-

 Summary: Improve sticky general assignor underlying algorithm for 
the imbalanced case
 Key: KAFKA-12676
 URL: https://issues.apache.org/jira/browse/KAFKA-12676
 Project: Kafka
  Issue Type: Improvement
Reporter: Luke Chen


As discussed in KAFKA-12675, we think the general assignor algorithm might be 
able to improve to make some edge cases more balanced, and improve the 
scalability and performance.

Ref: The algorithm is here: 
[https://github.com/twmb/franz-go/blob/master/pkg/kgo/internal|https://github.com/twmb/franz-go/blob/master/pkg/kgo/internal/sticky/graph.go]

 

[The algorithm is here: 
|https://github.com/twmb/franz-go/blob/master/pkg/kgo/internal/sticky/graph.go] 
[https://github.com/twmb/franz-go/blob/master/pkg/kgo/internal/sticky/graph.go] 
[/sticky/graph.go|https://github.com/twmb/franz-go/blob/master/pkg/kgo/internal/sticky/graph.go]

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-12675) Improve sticky general assignor scalability and performance

2021-04-15 Thread Travis Bischel (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17322603#comment-17322603
 ] 

Travis Bischel edited comment on KAFKA-12675 at 4/16/21, 4:25 AM:
--

What I mean to say is that the logic powering the existing sticky algorithm is 
heuristic and not truly balanced, and that the logic itself can be changed to 
be more exact to the sticky goals while being much more efficient. That is, 
changes can be made for the imbalanced case similar to how [~ableegoldman] made 
changes to the balanced case (for KAFKA-9987), and these changes will more 
exactly fulfill the goal of sticky balancing while being more efficient. This 
does not change how things are balanced / it does not change the actual sticky 
aspect. Basically, improving the underlying algorithm for the imbalanced case 
directly fulfills the goals of this ticket to improve the scalability and 
performance.

For some numbers:
{noformat}
$ go test -run nothing -bench Java -benchmem -v -benchtime 60s
goos: darwin
goarch: amd64
pkg: github.com/twmb/franz-go/pkg/kgo/internal/sticky
cpu: Intel(R) Core(TM) i7-1068NG7 CPU @ 2.30GHz
BenchmarkJava
BenchmarkJava/large
sticky_test.go:1419: avg 2.696451608s per 1 balances of 2000 members and 
100 total partitions
sticky_test.go:1419: avg 2.573368814s per 26 balances of 2000 members and 
100 total partitions
BenchmarkJava/large-8 262573370133 ns/op
531478500 B/op   1038983 allocs/op
BenchmarkJava/large_imbalance
sticky_test.go:1419: avg 13.798672936s per 1 balances of 2001 members and 
100 total partitions
sticky_test.go:1419: avg 9.581320518s per 4 balances of 2001 members and 
100 total partitions
sticky_test.go:1419: avg 9.626729812s per 7 balances of 2001 members and 
100 total partitions
BenchmarkJava/large_imbalance-879626739151 
ns/op8535692965 B/op  1039100 allocs/op
BenchmarkJava/medium
sticky_test.go:1419: avg 77.798053ms per 1 balances of 1000 members and 
5 total partitions
sticky_test.go:1419: avg 72.271454ms per 100 balances of 1000 members and 
5 total partitions
sticky_test.go:1419: avg 72.044377ms per 996 balances of 1000 members and 
5 total partitions
BenchmarkJava/medium-8   996  72044411 
ns/op22502623 B/op  56085 allocs/op
BenchmarkJava/medium_imbalance
sticky_test.go:1419: avg 216.340842ms per 1 balances of 1001 members and 
5 total partitions
sticky_test.go:1419: avg 217.385765ms per 100 balances of 1001 members and 
5 total partitions
sticky_test.go:1419: avg 218.218478ms per 331 balances of 1001 members and 
5 total partitions
BenchmarkJava/medium_imbalance-8 331 218218666 
ns/op222795358 B/op 56097 allocs/op
BenchmarkJava/small
sticky_test.go:1419: avg 52.22238ms per 1 balances of 800 members and 4 
total partitions
sticky_test.go:1419: avg 50.190192ms per 100 balances of 800 members and 
4 total partitions
sticky_test.go:1419: avg 50.252975ms per 1434 balances of 800 members and 
4 total partitions
BenchmarkJava/small-8   1434  50253022 
ns/op18823337 B/op  44906 allocs/op
BenchmarkJava/small_imbalance
sticky_test.go:1419: avg 149.416236ms per 1 balances of 801 members and 
4 total partitions
sticky_test.go:1419: avg 149.050743ms per 100 balances of 801 members and 
4 total partitions
sticky_test.go:1419: avg 149.224721ms per 482 balances of 801 members and 
4 total partitions
BenchmarkJava/small_imbalance-8  482 149224854 
ns/op147060761 B/op 44914 allocs/op
{noformat}

I've just pushed the code for this benchmark in [this 
commit|https://github.com/twmb/franz-go/commit/e0c960e094e8f100924411f6c5fb514b79fc761a].

This is still clearly using a decent amount of memory (up to 8G in the 
imbalanced case), but I spent a good amount of time already optimizing how much 
memory this can consume. I'm sure I can lose some speed to drop some memory 
usage. But, as it stands, 1mil partitions and 2000 members just has to take up 
some memory.

What I mean by heuristic is that the general assignor does not really have much 
reasoning for what it is doing, it just somewhat tries a bunch of things and 
then says "good enough" at a certain point. It also tries a bunch of things, 
checks to see if what it did got a better balance score, and if not, discards 
what it tried. This can be seen on lines 642 and 643 of 
[AbstractStickyAssignor.java|https://github.com/apache/kafka/blob/637c44c976c115b7e770a6fd9e62e8822051b45b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java#L642-L643]:
{noformat}
   // if we are not preserving existing assignments 

[jira] [Comment Edited] (KAFKA-12675) Improve sticky general assignor scalability and performance

2021-04-15 Thread Travis Bischel (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17322603#comment-17322603
 ] 

Travis Bischel edited comment on KAFKA-12675 at 4/16/21, 4:24 AM:
--

What I mean to say is that the logic powering the existing cooperative-sticky 
algorithm is heuristic and not truly balanced, and that the logic itself can be 
changed to be more exact to the cooperative-sticky goals while being much more 
efficient. That is, changes can be made for the imbalanced case similar to how 
[~ableegoldman] made changes to the balanced case (for KAFKA-9987), and these 
changes will more exactly fulfill the goal of cooperative sticky while being 
more efficient. This does not change how things are balanced / it does not 
change the actual sticky aspect. Basically, improving the underlying algorithm 
for the imbalanced case directly fulfills the goals of this ticket to improve 
the scalability and performance.

For some numbers:
{noformat}
$ go test -run nothing -bench Java -benchmem -v -benchtime 60s
goos: darwin
goarch: amd64
pkg: github.com/twmb/franz-go/pkg/kgo/internal/sticky
cpu: Intel(R) Core(TM) i7-1068NG7 CPU @ 2.30GHz
BenchmarkJava
BenchmarkJava/large
sticky_test.go:1419: avg 2.696451608s per 1 balances of 2000 members and 
100 total partitions
sticky_test.go:1419: avg 2.573368814s per 26 balances of 2000 members and 
100 total partitions
BenchmarkJava/large-8 262573370133 ns/op
531478500 B/op   1038983 allocs/op
BenchmarkJava/large_imbalance
sticky_test.go:1419: avg 13.798672936s per 1 balances of 2001 members and 
100 total partitions
sticky_test.go:1419: avg 9.581320518s per 4 balances of 2001 members and 
100 total partitions
sticky_test.go:1419: avg 9.626729812s per 7 balances of 2001 members and 
100 total partitions
BenchmarkJava/large_imbalance-879626739151 
ns/op8535692965 B/op  1039100 allocs/op
BenchmarkJava/medium
sticky_test.go:1419: avg 77.798053ms per 1 balances of 1000 members and 
5 total partitions
sticky_test.go:1419: avg 72.271454ms per 100 balances of 1000 members and 
5 total partitions
sticky_test.go:1419: avg 72.044377ms per 996 balances of 1000 members and 
5 total partitions
BenchmarkJava/medium-8   996  72044411 
ns/op22502623 B/op  56085 allocs/op
BenchmarkJava/medium_imbalance
sticky_test.go:1419: avg 216.340842ms per 1 balances of 1001 members and 
5 total partitions
sticky_test.go:1419: avg 217.385765ms per 100 balances of 1001 members and 
5 total partitions
sticky_test.go:1419: avg 218.218478ms per 331 balances of 1001 members and 
5 total partitions
BenchmarkJava/medium_imbalance-8 331 218218666 
ns/op222795358 B/op 56097 allocs/op
BenchmarkJava/small
sticky_test.go:1419: avg 52.22238ms per 1 balances of 800 members and 4 
total partitions
sticky_test.go:1419: avg 50.190192ms per 100 balances of 800 members and 
4 total partitions
sticky_test.go:1419: avg 50.252975ms per 1434 balances of 800 members and 
4 total partitions
BenchmarkJava/small-8   1434  50253022 
ns/op18823337 B/op  44906 allocs/op
BenchmarkJava/small_imbalance
sticky_test.go:1419: avg 149.416236ms per 1 balances of 801 members and 
4 total partitions
sticky_test.go:1419: avg 149.050743ms per 100 balances of 801 members and 
4 total partitions
sticky_test.go:1419: avg 149.224721ms per 482 balances of 801 members and 
4 total partitions
BenchmarkJava/small_imbalance-8  482 149224854 
ns/op147060761 B/op 44914 allocs/op
{noformat}

I've just pushed the code for this benchmark in [this 
commit|https://github.com/twmb/franz-go/commit/e0c960e094e8f100924411f6c5fb514b79fc761a].

This is still clearly using a decent amount of memory (up to 8G in the 
imbalanced case), but I spent a good amount of time already optimizing how much 
memory this can consume. I'm sure I can lose some speed to drop some memory 
usage. But, as it stands, 1mil partitions and 2000 members just has to take up 
some memory.

What I mean by heuristic is that the general assignor does not really have much 
reasoning for what it is doing, it just somewhat tries a bunch of things and 
then says "good enough" at a certain point. It also tries a bunch of things, 
checks to see if what it did got a better balance score, and if not, discards 
what it tried. This can be seen on lines 642 and 643 of 
[AbstractStickyAssignor.java|https://github.com/apache/kafka/blob/637c44c976c115b7e770a6fd9e62e8822051b45b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java#L642-L643]:
{noformat}
   // if we are not 

[jira] [Comment Edited] (KAFKA-12675) Improve sticky general assignor scalability and performance

2021-04-15 Thread Travis Bischel (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17322603#comment-17322603
 ] 

Travis Bischel edited comment on KAFKA-12675 at 4/16/21, 4:10 AM:
--

What I mean to say is that the logic powering the existing cooperative-sticky 
algorithm is heuristic and not truly balanced, and that the logic itself can be 
changed to be more exact to the cooperative-sticky goals while being much more 
efficient. That is, changes can be made for the imbalanced case similar to how 
[~ableegoldman] made changes to the balanced case (for KAFKA-9987), and these 
changes will more exactly fulfill the goal of cooperative sticky while being 
more efficient. This does not change how things are balanced / it does not 
change the actual sticky aspect. Basically, improving the underlying algorithm 
for the imbalanced case directly fulfills the goals of this ticket to improve 
the scalability and performance. I'll edit this comment shortly with some 
benchmarking numbers.


was (Author: twmb):
What I mean to say is that the logic powering the existing cooperative-sticky 
algorithm is heuristic and not truly balanced, and that the logic itself can be 
changed to be more exact to the cooperative-sticky goals while being much more 
efficient. That is, changes can be made for the imbalanced case similar to how 
[~ableegoldman] made changes to the balanced case, and these changes will more 
exactly fulfill the goal of cooperative sticky while being more efficient. This 
does not change how things are balanced / it does not change the actual sticky 
aspect. Basically, improving the underlying algorithm for the imbalanced case 
directly fulfills the goals of this ticket to improve the scalability and 
performance. I'll edit this comment shortly with some benchmarking numbers.

> Improve sticky general assignor scalability and performance
> ---
>
> Key: KAFKA-12675
> URL: https://issues.apache.org/jira/browse/KAFKA-12675
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> Currently, we have "general assignor" for non-equal subscription case and 
> "constrained assignor" for all equal subscription case. There's a performance 
> test for constrained assignor with:
> topicCount = {color:#ff}500{color};
>  partitionCount = {color:#ff}2000{color}; 
>  consumerCount = {color:#ff}2000{color};
> in _testLargeAssignmentAndGroupWithUniformSubscription,_ total 1 million 
> partitions and we can complete the assignment within 2 second in my machine.
> However, if we let 1 of the consumer subscribe to only 1 topic, it'll use 
> "general assignor", and the result with the same setting as above is: 
> *OutOfMemory,* 
>  Even we down the count to:
> topicCount = {color:#ff}50{color};
>  partitionCount = 1{color:#ff}000{color}; 
>  consumerCount = 1{color:#ff}000{color};
> We still got *OutOfMemory*.
> With this setting:
> topicCount = {color:#ff}50{color};
>  partitionCount = 8{color:#ff}00{color}; 
>  consumerCount = 8{color:#ff}00{color};
> We can complete in 10 seconds in my machine, which is still slow.
>  
> Since we are going to set default assignment strategy to 
> "CooperativeStickyAssignor" soon,  we should improve the scalability and 
> performance for sticky general assignor.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12675) Improve sticky general assignor scalability and performance

2021-04-15 Thread Travis Bischel (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17322603#comment-17322603
 ] 

Travis Bischel commented on KAFKA-12675:


What I mean to say is that the logic powering the existing cooperative-sticky 
algorithm is heuristic and not truly balanced, and that the logic itself can be 
changed to be more exact to the cooperative-sticky goals while being much more 
efficient. That is, changes can be made for the imbalanced case similar to how 
[~ableegoldman] made changes to the balanced case, and these changes will more 
exactly fulfill the goal of cooperative sticky while being more efficient. This 
does not change how things are balanced / it does not change the actual sticky 
aspect. Basically, improving the underlying algorithm for the imbalanced case 
directly fulfills the goals of this ticket to improve the scalability and 
performance. I'll edit this comment shortly with some benchmarking numbers.

> Improve sticky general assignor scalability and performance
> ---
>
> Key: KAFKA-12675
> URL: https://issues.apache.org/jira/browse/KAFKA-12675
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> Currently, we have "general assignor" for non-equal subscription case and 
> "constrained assignor" for all equal subscription case. There's a performance 
> test for constrained assignor with:
> topicCount = {color:#ff}500{color};
>  partitionCount = {color:#ff}2000{color}; 
>  consumerCount = {color:#ff}2000{color};
> in _testLargeAssignmentAndGroupWithUniformSubscription,_ total 1 million 
> partitions and we can complete the assignment within 2 second in my machine.
> However, if we let 1 of the consumer subscribe to only 1 topic, it'll use 
> "general assignor", and the result with the same setting as above is: 
> *OutOfMemory,* 
>  Even we down the count to:
> topicCount = {color:#ff}50{color};
>  partitionCount = 1{color:#ff}000{color}; 
>  consumerCount = 1{color:#ff}000{color};
> We still got *OutOfMemory*.
> With this setting:
> topicCount = {color:#ff}50{color};
>  partitionCount = 8{color:#ff}00{color}; 
>  consumerCount = 8{color:#ff}00{color};
> We can complete in 10 seconds in my machine, which is still slow.
>  
> Since we are going to set default assignment strategy to 
> "CooperativeStickyAssignor" soon,  we should improve the scalability and 
> performance for sticky general assignor.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-12675) Improve sticky general assignor scalability and performance

2021-04-15 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17322599#comment-17322599
 ] 

Luke Chen edited comment on KAFKA-12675 at 4/16/21, 3:59 AM:
-

[~twmb], thanks for suggestion. But in this ticket, I'd like to focus on 
improving the scalability and performance first. I agree the whole algorithm 
can be improved, but that needs to go through KIP and have more discussion to 
be able to go on. Thanks anyway, I'll think about it. :)


was (Author: showuon):
[~twmb], thanks for suggestion. But in this ticket, I'd like to improve the 
scalability and performance first. I agree the whole algorithm can be improved, 
too, but that needs to go through KIP and have more discussion to be able to go 
on. Thanks anyway, I'll think about it. :)

> Improve sticky general assignor scalability and performance
> ---
>
> Key: KAFKA-12675
> URL: https://issues.apache.org/jira/browse/KAFKA-12675
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> Currently, we have "general assignor" for non-equal subscription case and 
> "constrained assignor" for all equal subscription case. There's a performance 
> test for constrained assignor with:
> topicCount = {color:#ff}500{color};
>  partitionCount = {color:#ff}2000{color}; 
>  consumerCount = {color:#ff}2000{color};
> in _testLargeAssignmentAndGroupWithUniformSubscription,_ total 1 million 
> partitions and we can complete the assignment within 2 second in my machine.
> However, if we let 1 of the consumer subscribe to only 1 topic, it'll use 
> "general assignor", and the result with the same setting as above is: 
> *OutOfMemory,* 
>  Even we down the count to:
> topicCount = {color:#ff}50{color};
>  partitionCount = 1{color:#ff}000{color}; 
>  consumerCount = 1{color:#ff}000{color};
> We still got *OutOfMemory*.
> With this setting:
> topicCount = {color:#ff}50{color};
>  partitionCount = 8{color:#ff}00{color}; 
>  consumerCount = 8{color:#ff}00{color};
> We can complete in 10 seconds in my machine, which is still slow.
>  
> Since we are going to set default assignment strategy to 
> "CooperativeStickyAssignor" soon,  we should improve the scalability and 
> performance for sticky general assignor.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12675) Improve sticky general assignor scalability and performance

2021-04-15 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17322599#comment-17322599
 ] 

Luke Chen commented on KAFKA-12675:
---

[~twmb], thanks for suggestion. But in this ticket, I'd like to improve the 
scalability and performance first. I agree the whole algorithm can be improved, 
too, but that needs to go through KIP and have more discussion to be able to go 
on. Thanks anyway, I'll think about it. :)

> Improve sticky general assignor scalability and performance
> ---
>
> Key: KAFKA-12675
> URL: https://issues.apache.org/jira/browse/KAFKA-12675
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> Currently, we have "general assignor" for non-equal subscription case and 
> "constrained assignor" for all equal subscription case. There's a performance 
> test for constrained assignor with:
> topicCount = {color:#ff}500{color};
>  partitionCount = {color:#ff}2000{color}; 
>  consumerCount = {color:#ff}2000{color};
> in _testLargeAssignmentAndGroupWithUniformSubscription,_ total 1 million 
> partitions and we can complete the assignment within 2 second in my machine.
> However, if we let 1 of the consumer subscribe to only 1 topic, it'll use 
> "general assignor", and the result with the same setting as above is: 
> *OutOfMemory,* 
>  Even we down the count to:
> topicCount = {color:#ff}50{color};
>  partitionCount = 1{color:#ff}000{color}; 
>  consumerCount = 1{color:#ff}000{color};
> We still got *OutOfMemory*.
> With this setting:
> topicCount = {color:#ff}50{color};
>  partitionCount = 8{color:#ff}00{color}; 
>  consumerCount = 8{color:#ff}00{color};
> We can complete in 10 seconds in my machine, which is still slow.
>  
> Since we are going to set default assignment strategy to 
> "CooperativeStickyAssignor" soon,  we should improve the scalability and 
> performance for sticky general assignor.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on pull request #10545: KAFKA-12672: Added config for raft testing server

2021-04-15 Thread GitBox


ijuma commented on pull request #10545:
URL: https://github.com/apache/kafka/pull/10545#issuecomment-820886172


   Merged to trunk and cherry-picked to 2.8.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #10545: KAFKA-12672: Added config for raft testing server

2021-04-15 Thread GitBox


ijuma merged pull request #10545:
URL: https://github.com/apache/kafka/pull/10545


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-15 Thread GitBox


mjsax commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r614538992



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##
@@ -486,6 +486,327 @@ public void testJoin() {
 }
 }
 
+@Test
+public void testImprovedLeftJoin() {
+final StreamsBuilder builder = new StreamsBuilder();
+
+final KStream stream1;
+final KStream stream2;
+final KStream joined;
+final MockProcessorSupplier supplier = new 
MockProcessorSupplier<>();
+
+stream1 = builder.stream(topic1, consumed);
+stream2 = builder.stream(topic2, consumed);
+joined = stream1.leftJoin(
+stream2,
+MockValueJoiner.TOSTRING_JOINER,
+JoinWindows.of(ofMillis(100)).grace(ofMillis(0)),
+StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String()));
+joined.process(supplier);
+
+try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+final TestInputTopic inputTopic1 =
+driver.createInputTopic(topic1, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final TestInputTopic inputTopic2 =
+driver.createInputTopic(topic2, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final MockProcessor processor = 
supplier.theCapturedProcessor();
+
+/*
+ * Verifies left non-joined records are emitted by a record 
processed in the left topic
+ */
+
+long windowStart = 0;
+
+// No joins detected; No null-joins emitted
+inputTopic1.pipeInput(0, "A0", windowStart + 1);
+inputTopic1.pipeInput(1, "A1", windowStart + 2);
+inputTopic1.pipeInput(0, "A0-0", windowStart + 3);
+processor.checkAndClearProcessResult();
+
+// Join detected; No null-joins emitted
+inputTopic2.pipeInput(1, "a1", windowStart + 3);
+processor.checkAndClearProcessResult(
+new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3));
+
+// Dummy record in left topic will emit expired non-joined records 
from the left topic
+inputTopic1.pipeInput(2, "dummy", windowStart + 401);
+processor.checkAndClearProcessResult(
+new KeyValueTimestamp<>(0, "A0+null", windowStart + 1),
+new KeyValueTimestamp<>(0, "A0-0+null", windowStart + 3));
+
+// Flush internal non-joined state store by joining the dummy 
record
+inputTopic2.pipeInput(2, "dummy", windowStart + 401);
+processor.checkAndClearProcessResult(
+new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 401));
+
+/*
+ * Verifies left non-joined records are emitted by a record 
processed in the right topic
+ */
+
+windowStart = windowStart + 301;
+
+// No joins detected; No null-joins emitted
+inputTopic1.pipeInput(0, "A0", windowStart + 1);
+inputTopic1.pipeInput(1, "A1", windowStart + 2);
+inputTopic1.pipeInput(0, "A0-0", windowStart + 3);
+processor.checkAndClearProcessResult();
+
+// Join detected; No null-joins emitted
+inputTopic2.pipeInput(1, "a1", windowStart + 3);
+processor.checkAndClearProcessResult(
+new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3));
+
+// Dummy record in right topic will emit expired non-joined 
records from the left topic
+inputTopic2.pipeInput(2, "dummy", windowStart + 401);
+processor.checkAndClearProcessResult(
+new KeyValueTimestamp<>(0, "A0+null", windowStart + 1),
+new KeyValueTimestamp<>(0, "A0-0+null", windowStart + 3));
+
+// Flush internal non-joined state store by joining the dummy 
record
+inputTopic1.pipeInput(2, "dummy", windowStart + 402);
+processor.checkAndClearProcessResult(
+new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402));
+
+/*
+ * Verifies right non-joined records are not emitted by record 
processed in the left topic
+ */
+
+windowStart = windowStart + 401;
+
+// No joins detected; No null-joins emitted
+inputTopic2.pipeInput(0, "A0", windowStart + 1);
+inputTopic2.pipeInput(1, "A1", windowStart + 2);
+inputTopic2.pipeInput(0, "A0-0", windowStart + 3);
+processor.checkAndClearProcessResult();
+
+// Join detected; No null-joins emitted
+inputTopic1.pipeInput(1, "a1", windowStart + 3);
+processor.checkAndClearProcessResult(
+new 

[GitHub] [kafka] mjsax commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-15 Thread GitBox


mjsax commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r614538557



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##
@@ -486,6 +486,327 @@ public void testJoin() {
 }
 }
 
+@Test
+public void testImprovedLeftJoin() {
+final StreamsBuilder builder = new StreamsBuilder();
+
+final KStream stream1;
+final KStream stream2;
+final KStream joined;
+final MockProcessorSupplier supplier = new 
MockProcessorSupplier<>();
+
+stream1 = builder.stream(topic1, consumed);
+stream2 = builder.stream(topic2, consumed);
+joined = stream1.leftJoin(
+stream2,
+MockValueJoiner.TOSTRING_JOINER,
+JoinWindows.of(ofMillis(100)).grace(ofMillis(0)),
+StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String()));
+joined.process(supplier);
+
+try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+final TestInputTopic inputTopic1 =
+driver.createInputTopic(topic1, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final TestInputTopic inputTopic2 =
+driver.createInputTopic(topic2, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final MockProcessor processor = 
supplier.theCapturedProcessor();
+
+/*
+ * Verifies left non-joined records are emitted by a record 
processed in the left topic
+ */
+
+long windowStart = 0;
+
+// No joins detected; No null-joins emitted
+inputTopic1.pipeInput(0, "A0", windowStart + 1);
+inputTopic1.pipeInput(1, "A1", windowStart + 2);
+inputTopic1.pipeInput(0, "A0-0", windowStart + 3);
+processor.checkAndClearProcessResult();
+
+// Join detected; No null-joins emitted
+inputTopic2.pipeInput(1, "a1", windowStart + 3);
+processor.checkAndClearProcessResult(
+new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3));
+
+// Dummy record in left topic will emit expired non-joined records 
from the left topic
+inputTopic1.pipeInput(2, "dummy", windowStart + 401);
+processor.checkAndClearProcessResult(
+new KeyValueTimestamp<>(0, "A0+null", windowStart + 1),
+new KeyValueTimestamp<>(0, "A0-0+null", windowStart + 3));
+
+// Flush internal non-joined state store by joining the dummy 
record
+inputTopic2.pipeInput(2, "dummy", windowStart + 401);
+processor.checkAndClearProcessResult(
+new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 401));
+
+/*
+ * Verifies left non-joined records are emitted by a record 
processed in the right topic
+ */
+
+windowStart = windowStart + 301;
+
+// No joins detected; No null-joins emitted
+inputTopic1.pipeInput(0, "A0", windowStart + 1);
+inputTopic1.pipeInput(1, "A1", windowStart + 2);
+inputTopic1.pipeInput(0, "A0-0", windowStart + 3);
+processor.checkAndClearProcessResult();
+
+// Join detected; No null-joins emitted
+inputTopic2.pipeInput(1, "a1", windowStart + 3);
+processor.checkAndClearProcessResult(
+new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3));
+
+// Dummy record in right topic will emit expired non-joined 
records from the left topic
+inputTopic2.pipeInput(2, "dummy", windowStart + 401);
+processor.checkAndClearProcessResult(
+new KeyValueTimestamp<>(0, "A0+null", windowStart + 1),
+new KeyValueTimestamp<>(0, "A0-0+null", windowStart + 3));
+
+// Flush internal non-joined state store by joining the dummy 
record
+inputTopic1.pipeInput(2, "dummy", windowStart + 402);
+processor.checkAndClearProcessResult(
+new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402));
+
+/*
+ * Verifies right non-joined records are not emitted by record 
processed in the left topic
+ */
+
+windowStart = windowStart + 401;
+
+// No joins detected; No null-joins emitted
+inputTopic2.pipeInput(0, "A0", windowStart + 1);
+inputTopic2.pipeInput(1, "A1", windowStart + 2);
+inputTopic2.pipeInput(0, "A0-0", windowStart + 3);
+processor.checkAndClearProcessResult();
+
+// Join detected; No null-joins emitted
+inputTopic1.pipeInput(1, "a1", windowStart + 3);
+processor.checkAndClearProcessResult(
+new 

[GitHub] [kafka] mjsax commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-15 Thread GitBox


mjsax commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r614538486



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##
@@ -486,6 +486,327 @@ public void testJoin() {
 }
 }
 
+@Test
+public void testImprovedLeftJoin() {
+final StreamsBuilder builder = new StreamsBuilder();
+
+final KStream stream1;
+final KStream stream2;
+final KStream joined;
+final MockProcessorSupplier supplier = new 
MockProcessorSupplier<>();
+
+stream1 = builder.stream(topic1, consumed);
+stream2 = builder.stream(topic2, consumed);
+joined = stream1.leftJoin(
+stream2,
+MockValueJoiner.TOSTRING_JOINER,
+JoinWindows.of(ofMillis(100)).grace(ofMillis(0)),
+StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String()));
+joined.process(supplier);
+
+try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+final TestInputTopic inputTopic1 =
+driver.createInputTopic(topic1, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final TestInputTopic inputTopic2 =
+driver.createInputTopic(topic2, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final MockProcessor processor = 
supplier.theCapturedProcessor();
+
+/*
+ * Verifies left non-joined records are emitted by a record 
processed in the left topic
+ */
+
+long windowStart = 0;
+
+// No joins detected; No null-joins emitted
+inputTopic1.pipeInput(0, "A0", windowStart + 1);
+inputTopic1.pipeInput(1, "A1", windowStart + 2);
+inputTopic1.pipeInput(0, "A0-0", windowStart + 3);
+processor.checkAndClearProcessResult();
+
+// Join detected; No null-joins emitted
+inputTopic2.pipeInput(1, "a1", windowStart + 3);
+processor.checkAndClearProcessResult(
+new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3));
+
+// Dummy record in left topic will emit expired non-joined records 
from the left topic
+inputTopic1.pipeInput(2, "dummy", windowStart + 401);
+processor.checkAndClearProcessResult(
+new KeyValueTimestamp<>(0, "A0+null", windowStart + 1),
+new KeyValueTimestamp<>(0, "A0-0+null", windowStart + 3));
+
+// Flush internal non-joined state store by joining the dummy 
record
+inputTopic2.pipeInput(2, "dummy", windowStart + 401);
+processor.checkAndClearProcessResult(
+new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 401));
+
+/*
+ * Verifies left non-joined records are emitted by a record 
processed in the right topic
+ */
+
+windowStart = windowStart + 301;
+
+// No joins detected; No null-joins emitted
+inputTopic1.pipeInput(0, "A0", windowStart + 1);
+inputTopic1.pipeInput(1, "A1", windowStart + 2);
+inputTopic1.pipeInput(0, "A0-0", windowStart + 3);
+processor.checkAndClearProcessResult();
+
+// Join detected; No null-joins emitted
+inputTopic2.pipeInput(1, "a1", windowStart + 3);
+processor.checkAndClearProcessResult(
+new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3));
+
+// Dummy record in right topic will emit expired non-joined 
records from the left topic
+inputTopic2.pipeInput(2, "dummy", windowStart + 401);
+processor.checkAndClearProcessResult(
+new KeyValueTimestamp<>(0, "A0+null", windowStart + 1),
+new KeyValueTimestamp<>(0, "A0-0+null", windowStart + 3));
+
+// Flush internal non-joined state store by joining the dummy 
record
+inputTopic1.pipeInput(2, "dummy", windowStart + 402);
+processor.checkAndClearProcessResult(
+new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402));
+
+/*
+ * Verifies right non-joined records are not emitted by record 
processed in the left topic
+ */
+
+windowStart = windowStart + 401;
+
+// No joins detected; No null-joins emitted
+inputTopic2.pipeInput(0, "A0", windowStart + 1);
+inputTopic2.pipeInput(1, "A1", windowStart + 2);
+inputTopic2.pipeInput(0, "A0-0", windowStart + 3);
+processor.checkAndClearProcessResult();
+
+// Join detected; No null-joins emitted
+inputTopic1.pipeInput(1, "a1", windowStart + 3);
+processor.checkAndClearProcessResult(
+new 

[GitHub] [kafka] mjsax commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-15 Thread GitBox


mjsax commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r614538200



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##
@@ -486,6 +486,327 @@ public void testJoin() {
 }
 }
 
+@Test
+public void testImprovedLeftJoin() {

Review comment:
   Should this test not be added to `KStreamKStreamLeftJoinTest.java` ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12675) Improve sticky general assignor scalability and performance

2021-04-15 Thread Travis Bischel (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17322593#comment-17322593
 ] 

Travis Bischel commented on KAFKA-12675:


An option to evaluate is the algorithm I devised in my franz-go client, which 
translates the balancing into a graph and uses A* search to perform an exact 
balance much more efficiently. I noticed that the existing Java algorithm is 
heuristic based, and I have a few tests in my repo showing edge cases that the 
existing heuristic algorithm cannot really handle.

The algorithm is here: 
https://github.com/twmb/franz-go/blob/master/pkg/kgo/internal/sticky/graph.go
with the option to switch into that algorithm in the sticky.go file.

> Improve sticky general assignor scalability and performance
> ---
>
> Key: KAFKA-12675
> URL: https://issues.apache.org/jira/browse/KAFKA-12675
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> Currently, we have "general assignor" for non-equal subscription case and 
> "constrained assignor" for all equal subscription case. There's a performance 
> test for constrained assignor with:
> topicCount = {color:#ff}500{color};
>  partitionCount = {color:#ff}2000{color}; 
>  consumerCount = {color:#ff}2000{color};
> in _testLargeAssignmentAndGroupWithUniformSubscription,_ total 1 million 
> partitions and we can complete the assignment within 2 second in my machine.
> However, if we let 1 of the consumer subscribe to only 1 topic, it'll use 
> "general assignor", and the result with the same setting as above is: 
> *OutOfMemory,* 
>  Even we down the count to:
> topicCount = {color:#ff}50{color};
>  partitionCount = 1{color:#ff}000{color}; 
>  consumerCount = 1{color:#ff}000{color};
> We still got *OutOfMemory*.
> With this setting:
> topicCount = {color:#ff}50{color};
>  partitionCount = 8{color:#ff}00{color}; 
>  consumerCount = 8{color:#ff}00{color};
> We can complete in 10 seconds in my machine, which is still slow.
>  
> Since we are going to set default assignment strategy to 
> "CooperativeStickyAssignor" soon,  we should improve the scalability and 
> performance for sticky general assignor.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-15 Thread GitBox


mjsax commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r614537097



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -93,23 +136,118 @@ public void process(final K key, final V1 value) {
 }
 
 boolean needOuterJoin = outer;
+boolean joinFound = false;
 
 final long inputRecordTimestamp = context().timestamp();
 final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
 final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
 
+maxObservedStreamTime.advance(inputRecordTimestamp);
+
 try (final WindowStoreIterator iter = otherWindow.fetch(key, 
timeFrom, timeTo)) {
 while (iter.hasNext()) {
 needOuterJoin = false;
+joinFound = true;
 final KeyValue otherRecord = iter.next();
+final long otherRecordTimestamp = otherRecord.key;

Review comment:
   Yes, it a bug in the current implementation...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-15 Thread GitBox


mjsax commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r614536735



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##
@@ -211,6 +263,66 @@ private void assertUniqueStoreNames(final 
WindowBytesStoreSupplier supplier,
 return builder;
 }
 
+@SuppressWarnings("unchecked")
+private static  StoreBuilder, 
LeftOrRightValue>> outerJoinWindowStoreBuilder(final String storeName,
+   
   final JoinWindows windows,
+   
   final 
StreamJoinedInternal streamJoinedInternal) {
+final StoreBuilder, LeftOrRightValue>> builder = new TimeOrderedWindowStoreBuilder, 
LeftOrRightValue>(
+persistentTimeOrderedWindowStore(
+storeName + "-store",
+Duration.ofMillis(windows.size() + windows.gracePeriodMs()),
+Duration.ofMillis(windows.size())
+),
+new KeyAndJoinSideSerde<>(streamJoinedInternal.keySerde()),
+new LeftOrRightValueSerde(streamJoinedInternal.valueSerde(), 
streamJoinedInternal.otherValueSerde()),
+Time.SYSTEM

Review comment:
   Ok. Maybe good enough as-is. (Fair point that we don't mock it in other 
stores either -- maybe there was never any demand to be able to mock it. As you 
said, we could change it as follow up if needed.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-15 Thread GitBox


mjsax commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r614536011



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##
@@ -118,20 +142,47 @@
 final ProcessorGraphNode otherWindowedStreamsNode = new 
ProcessorGraphNode<>(otherWindowStreamProcessorName, 
otherWindowStreamProcessorParams);
 builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);
 
+Optional, 
LeftOrRightValue>>> outerJoinWindowStore = Optional.empty();
+if (leftOuter || rightOuter) {
+final String outerJoinSuffix = "-shared-outer-join-store";
+
+// Get the suffix index of the joinThisGeneratedName to build the 
outer join store name.
+final String outerJoinStoreGeneratedName = 
KStreamImpl.OUTERSHARED_NAME
++ joinThisGeneratedName.substring(
+rightOuter
+? KStreamImpl.OUTERTHIS_NAME.length()
+: KStreamImpl.JOINTHIS_NAME.length());

Review comment:
   I agree that we should not use one more index to avoid compatibility 
issues... Maybe the question is really (just for my better understanding), what 
would the name be, ie, could be give a concrete example (with and without 
`Named` parameter)? That is also why I asked for a test using 
`TopologyDescription` -- makes it easier to wrap my head around.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12675) Improve sticky general assignor scalability and performance

2021-04-15 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-12675:
--
Description: 
Currently, we have "general assignor" for non-equal subscription case and 
"constrained assignor" for all equal subscription case. There's a performance 
test for constrained assignor with:

topicCount = {color:#ff}500{color};
 partitionCount = {color:#ff}2000{color}; 
 consumerCount = {color:#ff}2000{color};

in _testLargeAssignmentAndGroupWithUniformSubscription,_ total 1 million 
partitions and we can complete the assignment within 2 second in my machine.

However, if we let 1 of the consumer subscribe to only 1 topic, it'll use 
"general assignor", and the result with the same setting as above is: 
*OutOfMemory,* 
 Even we down the count to:

topicCount = {color:#ff}50{color};
 partitionCount = 1{color:#ff}000{color}; 
 consumerCount = 1{color:#ff}000{color};

We still got *OutOfMemory*.

With this setting:

topicCount = {color:#ff}50{color};
 partitionCount = 8{color:#ff}00{color}; 
 consumerCount = 8{color:#ff}00{color};

We can complete in 10 seconds in my machine, which is still slow.

 

Since we are going to set default assignment strategy to 
"CooperativeStickyAssignor" soon,  we should improve the scalability and 
performance for sticky general assignor.

  was:
Currently, we have "general assignor" for non-equal subscription case and 
"constrained assignor" for all equal subscription case. There's a performance 
test for constrained assignor with:

topicCount = {color:#ff}500{color};
partitionCount = {color:#ff}2000{color}; 
consumerCount = {color:#ff}2000{color};

in _testLargeAssignmentAndGroupWithUniformSubscription,_ total 1 million 
partitions and we can complete the assignment within 2 second in my machine.

However, if we let 1 of the consumer subscribe to only 1 topic, it'll use 
"general assignor", and the result with the same setting as above is: 
*OutOfMemory,* 
Even we down the count to:

topicCount = {color:#ff}50{color};
partitionCount = 1{color:#ff}000{color}; 
consumerCount = 1{color:#ff}000{color};

We still got *OutOfMemory*.

With this setting:

topicCount = {color:#ff}50{color};
partitionCount = 8{color:#ff}00{color}; 
consumerCount = 8{color:#ff}00{color};

We can complete with 10 seconds in my machine, which is still slow.

 

Since we are going to set default assignment strategy to 
"CooperativeStickyAssignor" soon,  we should improve the scalability and 
performance for sticky general assignor.


> Improve sticky general assignor scalability and performance
> ---
>
> Key: KAFKA-12675
> URL: https://issues.apache.org/jira/browse/KAFKA-12675
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> Currently, we have "general assignor" for non-equal subscription case and 
> "constrained assignor" for all equal subscription case. There's a performance 
> test for constrained assignor with:
> topicCount = {color:#ff}500{color};
>  partitionCount = {color:#ff}2000{color}; 
>  consumerCount = {color:#ff}2000{color};
> in _testLargeAssignmentAndGroupWithUniformSubscription,_ total 1 million 
> partitions and we can complete the assignment within 2 second in my machine.
> However, if we let 1 of the consumer subscribe to only 1 topic, it'll use 
> "general assignor", and the result with the same setting as above is: 
> *OutOfMemory,* 
>  Even we down the count to:
> topicCount = {color:#ff}50{color};
>  partitionCount = 1{color:#ff}000{color}; 
>  consumerCount = 1{color:#ff}000{color};
> We still got *OutOfMemory*.
> With this setting:
> topicCount = {color:#ff}50{color};
>  partitionCount = 8{color:#ff}00{color}; 
>  consumerCount = 8{color:#ff}00{color};
> We can complete in 10 seconds in my machine, which is still slow.
>  
> Since we are going to set default assignment strategy to 
> "CooperativeStickyAssignor" soon,  we should improve the scalability and 
> performance for sticky general assignor.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12675) Improve sticky general assignor scalability and performance

2021-04-15 Thread Luke Chen (Jira)
Luke Chen created KAFKA-12675:
-

 Summary: Improve sticky general assignor scalability and 
performance
 Key: KAFKA-12675
 URL: https://issues.apache.org/jira/browse/KAFKA-12675
 Project: Kafka
  Issue Type: Improvement
Reporter: Luke Chen
Assignee: Luke Chen


Currently, we have "general assignor" for non-equal subscription case and 
"constrained assignor" for all equal subscription case. There's a performance 
test for constrained assignor with:

topicCount = {color:#ff}500{color};
partitionCount = {color:#ff}2000{color}; 
consumerCount = {color:#ff}2000{color};

in _testLargeAssignmentAndGroupWithUniformSubscription,_ total 1 million 
partitions and we can complete the assignment within 2 second in my machine.

However, if we let 1 of the consumer subscribe to only 1 topic, it'll use 
"general assignor", and the result with the same setting as above is: 
*OutOfMemory,* 
Even we down the count to:

topicCount = {color:#ff}50{color};
partitionCount = 1{color:#ff}000{color}; 
consumerCount = 1{color:#ff}000{color};

We still got *OutOfMemory*.

With this setting:

topicCount = {color:#ff}50{color};
partitionCount = 8{color:#ff}00{color}; 
consumerCount = 8{color:#ff}00{color};

We can complete with 10 seconds in my machine, which is still slow.

 

Since we are going to set default assignment strategy to 
"CooperativeStickyAssignor" soon,  we should improve the scalability and 
performance for sticky general assignor.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-15 Thread GitBox


mjsax commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r614534849



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##
@@ -118,20 +142,47 @@
 final ProcessorGraphNode otherWindowedStreamsNode = new 
ProcessorGraphNode<>(otherWindowStreamProcessorName, 
otherWindowStreamProcessorParams);
 builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);
 
+Optional, 
LeftOrRightValue>>> outerJoinWindowStore = Optional.empty();
+if (leftOuter || rightOuter) {
+final String outerJoinSuffix = "-shared-outer-join-store";

Review comment:
   Sorry for the confusion. What I mean was, if it's in `leftJoin()` we 
could use `-shared-left-join-store` and if it's an `outerJoin()` we could use 
`-shared-outer-join-store` ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12674) Client failover takes 2-4 seconds on clean broker shutdown

2021-04-15 Thread Gwen Shapira (Jira)
Gwen Shapira created KAFKA-12674:


 Summary: Client failover takes 2-4 seconds on clean broker shutdown
 Key: KAFKA-12674
 URL: https://issues.apache.org/jira/browse/KAFKA-12674
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.7.0
Reporter: Gwen Shapira


I ran two perf-producer clients against a 4-broker cluster running AWS, behind 
ELB. And then did a rolling restart, taking down one broker at a time using 
controlled shutdown.

I got the following errors on every broker shutdown:

{{[2021-04-16 01:31:39,846] WARN [Producer clientId=producer-1] Received 
invalid metadata error in produce request on partition perf-test-3 due to 
org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests 
intended only for the leader, this error indicates that the broker is not the 
current leader. For requests intended for any replica, this error indicates 
that the broker is not a replica of the topic partition.. Going to request 
metadata update now (org.apache.kafka.clients.producer.internals.Sender)}}
 {{[2021-04-16 01:44:22,691] WARN [Producer clientId=producer-1] Connection to 
node 0 (b0-pkc-7yrmj.us-east-2.aws.confluent.cloud/3.140.123.43:9092) 
terminated during authentication. This may happen due to any of the following 
reasons: (1) Authentication failed due to invalid credentials with brokers 
older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow 
HTTPS traffic), (3) Transient network issue. 
(org.apache.kafka.clients.NetworkClient)}}

 The "Connection to node... terminated" error continued for 2-4 seconds. 

It looks like the metadata request was repeatedly sent to the node that just 
went down. I'd expect it to go on an existing connection to one of the live 
nodes.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.

2021-04-15 Thread GitBox


junrao commented on a change in pull request #10271:
URL: https://github.com/apache/kafka/pull/10271#discussion_r614448708



##
File path: storage/src/main/resources/message/RemotePartitionDeleteMetadata.json
##
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 2,
+  "type": "data",
+  "name": "RemotePartitionDeleteMetadataRecord",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{
+  "name": "TopicIdPartition",
+  "type": "TopicIdPartitionEntry",
+  "versions": "0+",
+  "about": "Represents unique topic partition.",
+  "fields": [
+{
+  "name": "Name",
+  "type": "string",
+  "versions": "0+",
+  "about": "Topic name."
+},
+{
+  "name": "Id",
+  "type": "uuid",
+  "versions": "0+",
+  "about": "Unique identifier of the topic."
+},
+{
+  "name": "Partition",
+  "type": "int32",
+  "versions": "0+",
+  "about": "Partition number."
+}
+  ]
+},
+{
+  "name": "BrokerId",
+  "type": "int32",
+  "versions": "0+",
+  "about": "Broker (controller or leader) id from which this event is 
created. DELETE_PARTITION_MARKED is sent by the controller. 
DELETE_PARTITION_STARTED and DELETE_PARTITION_FINISHED are sent by remote log 
metadata topic partition leader."
+},
+{
+  "name": "EventTimestampMs",

Review comment:
   The KIP has an epoch field. Should we add it here?

##
File path: 
storage/src/main/resources/META-INF/services/org.apache.kafka.common.config.provider.ConfigProvider
##
@@ -0,0 +1,16 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ #http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+org.apache.kafka.common.config.provider.FileConfigProvider

Review comment:
   Hmm, what is this file for?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-12468) Initial offsets are copied from source to target cluster

2021-04-15 Thread Jitesh Motwani (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17322537#comment-17322537
 ] 

Jitesh Motwani edited comment on KAFKA-12468 at 4/15/21, 11:52 PM:
---

[~askldjd]

 what do you mean by run 10 consecutive full mirror ? 10 separate processes on 
the same box?

https://issues.apache.org/jira/browse/KAFKA-12468?focusedCommentId=17312826=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17312826


was (Author: jitesh88):
[~askldjd]

 what do you mean by run 10 consecutive full mirror ?

https://issues.apache.org/jira/browse/KAFKA-12468?focusedCommentId=17312826=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17312826

> Initial offsets are copied from source to target cluster
> 
>
> Key: KAFKA-12468
> URL: https://issues.apache.org/jira/browse/KAFKA-12468
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.7.0
>Reporter: Bart De Neuter
>Priority: Major
>
> We have an active-passive setup where  the 3 connectors from mirror maker 2 
> (heartbeat, checkpoint and source) are running on a dedicated Kafka connect 
> cluster on the target cluster.
> Offset syncing is enabled as specified by KIP-545. But when activated, it 
> seems the offsets from the source cluster are initially copied to the target 
> cluster without translation. This causes a negative lag for all synced 
> consumer groups. Only when we reset the offsets for each topic/partition on 
> the target cluster and produce a record on the topic/partition in the source, 
> the sync starts working correctly. 
> I would expect that the consumer groups are synced but that the current 
> offsets of the source cluster are not copied to the target cluster.
> This is the configuration we are currently using:
> Heartbeat connector
>  
> {code:xml}
> {
>   "name": "mm2-mirror-heartbeat",
>   "config": {
> "name": "mm2-mirror-heartbeat",
> "connector.class": 
> "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
> "source.cluster.alias": "eventador",
> "target.cluster.alias": "msk",
> "source.cluster.bootstrap.servers": "",
> "target.cluster.bootstrap.servers": "",
> "topics": ".*",
> "groups": ".*",
> "tasks.max": "1",
> "replication.policy.class": "CustomReplicationPolicy",
> "sync.group.offsets.enabled": "true",
> "sync.group.offsets.interval.seconds": "5",
> "emit.checkpoints.enabled": "true",
> "emit.checkpoints.interval.seconds": "30",
> "emit.heartbeats.interval.seconds": "30",
> "key.converter": " 
> org.apache.kafka.connect.converters.ByteArrayConverter",
> "value.converter": 
> "org.apache.kafka.connect.converters.ByteArrayConverter"
>   }
> }
> {code}
> Checkpoint connector:
> {code:xml}
> {
>   "name": "mm2-mirror-checkpoint",
>   "config": {
> "name": "mm2-mirror-checkpoint",
> "connector.class": 
> "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
> "source.cluster.alias": "eventador",
> "target.cluster.alias": "msk",
> "source.cluster.bootstrap.servers": "",
> "target.cluster.bootstrap.servers": "",
> "topics": ".*",
> "groups": ".*",
> "tasks.max": "40",
> "replication.policy.class": "CustomReplicationPolicy",
> "sync.group.offsets.enabled": "true",
> "sync.group.offsets.interval.seconds": "5",
> "emit.checkpoints.enabled": "true",
> "emit.checkpoints.interval.seconds": "30",
> "emit.heartbeats.interval.seconds": "30",
> "key.converter": " 
> org.apache.kafka.connect.converters.ByteArrayConverter",
> "value.converter": 
> "org.apache.kafka.connect.converters.ByteArrayConverter"
>   }
> }
> {code}
>  Source connector:
> {code:xml}
> {
>   "name": "mm2-mirror-source",
>   "config": {
> "name": "mm2-mirror-source",
> "connector.class": 
> "org.apache.kafka.connect.mirror.MirrorSourceConnector",
> "source.cluster.alias": "eventador",
> "target.cluster.alias": "msk",
> "source.cluster.bootstrap.servers": "",
> "target.cluster.bootstrap.servers": "",
> "topics": ".*",
> "groups": ".*",
> "tasks.max": "40",
> "replication.policy.class": "CustomReplicationPolicy",
> "sync.group.offsets.enabled": "true",
> "sync.group.offsets.interval.seconds": "5",
> "emit.checkpoints.enabled": "true",
> "emit.checkpoints.interval.seconds": "30",
> "emit.heartbeats.interval.seconds": "30",
> "key.converter": " 
> org.apache.kafka.connect.converters.ByteArrayConverter",
> "value.converter": 
> "org.apache.kafka.connect.converters.ByteArrayConverter"
>   }
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12468) Initial offsets are copied from source to target cluster

2021-04-15 Thread Jitesh Motwani (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17322537#comment-17322537
 ] 

Jitesh Motwani commented on KAFKA-12468:


[~askldjd]

 what do you mean by run 10 consecutive full mirror ?

https://issues.apache.org/jira/browse/KAFKA-12468?focusedCommentId=17312826=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17312826

> Initial offsets are copied from source to target cluster
> 
>
> Key: KAFKA-12468
> URL: https://issues.apache.org/jira/browse/KAFKA-12468
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.7.0
>Reporter: Bart De Neuter
>Priority: Major
>
> We have an active-passive setup where  the 3 connectors from mirror maker 2 
> (heartbeat, checkpoint and source) are running on a dedicated Kafka connect 
> cluster on the target cluster.
> Offset syncing is enabled as specified by KIP-545. But when activated, it 
> seems the offsets from the source cluster are initially copied to the target 
> cluster without translation. This causes a negative lag for all synced 
> consumer groups. Only when we reset the offsets for each topic/partition on 
> the target cluster and produce a record on the topic/partition in the source, 
> the sync starts working correctly. 
> I would expect that the consumer groups are synced but that the current 
> offsets of the source cluster are not copied to the target cluster.
> This is the configuration we are currently using:
> Heartbeat connector
>  
> {code:xml}
> {
>   "name": "mm2-mirror-heartbeat",
>   "config": {
> "name": "mm2-mirror-heartbeat",
> "connector.class": 
> "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
> "source.cluster.alias": "eventador",
> "target.cluster.alias": "msk",
> "source.cluster.bootstrap.servers": "",
> "target.cluster.bootstrap.servers": "",
> "topics": ".*",
> "groups": ".*",
> "tasks.max": "1",
> "replication.policy.class": "CustomReplicationPolicy",
> "sync.group.offsets.enabled": "true",
> "sync.group.offsets.interval.seconds": "5",
> "emit.checkpoints.enabled": "true",
> "emit.checkpoints.interval.seconds": "30",
> "emit.heartbeats.interval.seconds": "30",
> "key.converter": " 
> org.apache.kafka.connect.converters.ByteArrayConverter",
> "value.converter": 
> "org.apache.kafka.connect.converters.ByteArrayConverter"
>   }
> }
> {code}
> Checkpoint connector:
> {code:xml}
> {
>   "name": "mm2-mirror-checkpoint",
>   "config": {
> "name": "mm2-mirror-checkpoint",
> "connector.class": 
> "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
> "source.cluster.alias": "eventador",
> "target.cluster.alias": "msk",
> "source.cluster.bootstrap.servers": "",
> "target.cluster.bootstrap.servers": "",
> "topics": ".*",
> "groups": ".*",
> "tasks.max": "40",
> "replication.policy.class": "CustomReplicationPolicy",
> "sync.group.offsets.enabled": "true",
> "sync.group.offsets.interval.seconds": "5",
> "emit.checkpoints.enabled": "true",
> "emit.checkpoints.interval.seconds": "30",
> "emit.heartbeats.interval.seconds": "30",
> "key.converter": " 
> org.apache.kafka.connect.converters.ByteArrayConverter",
> "value.converter": 
> "org.apache.kafka.connect.converters.ByteArrayConverter"
>   }
> }
> {code}
>  Source connector:
> {code:xml}
> {
>   "name": "mm2-mirror-source",
>   "config": {
> "name": "mm2-mirror-source",
> "connector.class": 
> "org.apache.kafka.connect.mirror.MirrorSourceConnector",
> "source.cluster.alias": "eventador",
> "target.cluster.alias": "msk",
> "source.cluster.bootstrap.servers": "",
> "target.cluster.bootstrap.servers": "",
> "topics": ".*",
> "groups": ".*",
> "tasks.max": "40",
> "replication.policy.class": "CustomReplicationPolicy",
> "sync.group.offsets.enabled": "true",
> "sync.group.offsets.interval.seconds": "5",
> "emit.checkpoints.enabled": "true",
> "emit.checkpoints.interval.seconds": "30",
> "emit.heartbeats.interval.seconds": "30",
> "key.converter": " 
> org.apache.kafka.connect.converters.ByteArrayConverter",
> "value.converter": 
> "org.apache.kafka.connect.converters.ByteArrayConverter"
>   }
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on pull request #10294: KAFKA-12450: Remove deprecated methods from ReadOnlyWindowStore

2021-04-15 Thread GitBox


guozhangwang commented on pull request #10294:
URL: https://github.com/apache/kafka/pull/10294#issuecomment-820756041


   @jeqo could you rebase the PR while I review it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #10544: KAFKA-12648: minimum changes for error handling namedTopologies

2021-04-15 Thread GitBox


wcarlson5 commented on a change in pull request #10544:
URL: https://github.com/apache/kafka/pull/10544#discussion_r614398250



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -471,6 +472,10 @@ private void replaceStreamThread(final Throwable 
throwable) {
 
 private void handleStreamsUncaughtException(final Throwable throwable,
 final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+if (throwable instanceof NamedTopologyStreamsException) {
+String name = ((NamedTopologyStreamsException) 
throwable).getTopologyName();
+((StreamThread) 
Thread.currentThread()).deprioritizeNamedTopology(name);

Review comment:
   The thread is guaranteed to be handled on the dying thread. Though we 
might want to call this on the new thread
   
   EDIT: we def will need to track the failed topologies in `KafkaStreams`, 
though tracking successes across restarts will be an issue




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #10544: KAFKA-12648: minimum changes for error handling namedTopologies

2021-04-15 Thread GitBox


wcarlson5 commented on a change in pull request #10544:
URL: https://github.com/apache/kafka/pull/10544#discussion_r614398250



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -471,6 +472,10 @@ private void replaceStreamThread(final Throwable 
throwable) {
 
 private void handleStreamsUncaughtException(final Throwable throwable,
 final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+if (throwable instanceof NamedTopologyStreamsException) {
+String name = ((NamedTopologyStreamsException) 
throwable).getTopologyName();
+((StreamThread) 
Thread.currentThread()).deprioritizeNamedTopology(name);

Review comment:
   The thread is guaranteed to be handled on the dying thread. Though we 
might want to call this on the new thread
   
   EDIT: we def will need to track the failed topologies in `KafkaStreams`, 
though tracking successes will be an issue




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10463: KAFKA-12670: support unclean.leader.election.enable in KRaft mode

2021-04-15 Thread GitBox


cmccabe commented on a change in pull request #10463:
URL: https://github.com/apache/kafka/pull/10463#discussion_r614404680



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -1119,6 +1148,113 @@ void validateManualPartitionAssignment(List 
assignment,
 }
 }
 
+/**
+ * Handle legacy configuration alterations.
+ */
+ControllerResult> legacyAlterConfigs(
+Map> newConfigs) {
+ControllerResult> result =
+configurationControl.legacyAlterConfigs(newConfigs);
+return alterConfigs(result);
+}
+
+/**
+ * Handle incremental configuration alterations.
+ */
+ControllerResult> incrementalAlterConfigs(
+Map>> 
configChanges) {
+ControllerResult> result =
+configurationControl.incrementalAlterConfigs(configChanges);
+return alterConfigs(result);
+}
+
+/**
+ * If important controller configurations were changed, generate records 
which will
+ * apply the changes.
+ */
+ControllerResult> alterConfigs(
+ControllerResult> result) {
+ElectionStrategizer strategizer = 
examineConfigAlterations(result.records());
+boolean isAtomic = true;
+List records = result.records();
+if (strategizer != null) {
+records.addAll(handleLeaderElectionConfigChanges(strategizer));
+isAtomic = false;

Review comment:
   I changed the approach here in the latest PR.  Now, rather than 
combining the records into a single list, I schedule another task to scan 
through the offline partitions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #10544: KAFKA-12648: minimum changes for error handling namedTopologies

2021-04-15 Thread GitBox


wcarlson5 commented on a change in pull request #10544:
URL: https://github.com/apache/kafka/pull/10544#discussion_r614398757



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -1198,7 +1210,7 @@ int process(final int maxNumRecords, final Time time) {
 task.recordProcessBatchTime(now - then);
 }
 }
-
+reprioritizeTasks();

Review comment:
   if processing succeeds 

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
##
@@ -43,6 +43,8 @@
 private final Map allTasksPerId = new TreeMap<>();
 private final Map readOnlyTasksPerId = 
Collections.unmodifiableMap(allTasksPerId);
 private final Collection readOnlyTasks = 
Collections.unmodifiableCollection(allTasksPerId.values());
+private final Collection misbehavingTasks = new HashSet<>();
+private final Collection taskJail = new HashSet<>();

Review comment:
   working title

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -471,6 +472,10 @@ private void replaceStreamThread(final Throwable 
throwable) {
 
 private void handleStreamsUncaughtException(final Throwable throwable,
 final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+if (throwable instanceof NamedTopologyStreamsException) {
+String name = ((NamedTopologyStreamsException) 
throwable).getTopologyName();
+((StreamThread) 
Thread.currentThread()).deprioritizeNamedTopology(name);

Review comment:
   The thread is guaranteed to be handled on the dying thread. Though we 
might want to call this on the new thread

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -471,6 +472,10 @@ private void replaceStreamThread(final Throwable 
throwable) {
 
 private void handleStreamsUncaughtException(final Throwable throwable,
 final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+if (throwable instanceof NamedTopologyStreamsException) {

Review comment:
   we should consider exceptions that were not StreamExceptions as well I 
think 

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
##
@@ -304,4 +310,26 @@ void addTask(final Task task) {
 }
 allTasksPerId.put(task.id(), task);
 }
+
+public void deprioritizeNamedTopology(String name) {
+for (Task task: readOnlyTasks) {
+if (task.id().namedTopology().equals(name)){
+misbehavingTasks.add(task);
+}
+}
+for (Task task: misbehavingTasks) {

Review comment:
   I think suspending will work. however I am not sure it will survive 
thread restarts. I may need to store the topology names that are in each state 
to repopulate each of these lists in new threads




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #10545: KAFKA-12672: Added config for raft testing server

2021-04-15 Thread GitBox


bbejeck commented on pull request #10545:
URL: https://github.com/apache/kafka/pull/10545#issuecomment-820665466


   @ijuma updated README


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12574) Deprecate eos-alpha

2021-04-15 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17322417#comment-17322417
 ] 

Guozhang Wang commented on KAFKA-12574:
---

Hi [~feyman] I'm aware of KAFKA-9689 as well, and also discussed with [~boyang] 
about that. I also feel that, doing how swapping online is quite hard, and also 
would bring in much complexity with the code as well, and hence suggested we 
moving towards the direction to enforce the config change.

> Deprecate eos-alpha
> ---
>
> Key: KAFKA-12574
> URL: https://issues.apache.org/jira/browse/KAFKA-12574
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: A. Sophie Blee-Goldman
>Priority: Blocker
>  Labels: needs-kip
> Fix For: 3.0.0
>
>
> In KIP-447 we introduced a new thread-producer which is capable of 
> exactly-once semantics across multiple tasks. The new mode of EOS, called 
> eos-beta, is intended to eventually be the preferred processing mode for EOS 
> as it improves the performance and scaling of partitions/tasks. The only 
> downside is that it requires brokers to be on version 2.5+ in order to 
> understand the latest APIs that are necessary for this thread-producer.
> We should consider deprecating the eos-alpha config, ie 
> StreamsConfig.EXACTLY_ONCE, to encourage new. & existing EOS users to migrate 
> to the new-and-improved processing mode, and upgrade their brokers if 
> necessary.
> Eventually we would like to be able to remove the eos-alpha code paths from 
> Streams as this will help to simplify the logic and reduce the processing 
> mode branching. But since this will break client-broker compatibility, and 
> 2.5 is still a relatively recent version, we probably can't actually remove 
> eos-alpha in the near future



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on pull request #10505: MINOR: fix some bugs in ControllerApis.scala

2021-04-15 Thread GitBox


cmccabe commented on pull request #10505:
URL: https://github.com/apache/kafka/pull/10505#issuecomment-820659355


   @chia7712: I re-ran the test failures locally and couldn't get them to occur.
   
   The failures I saw were mostly streams tests... those have have been a bit 
flaky lately... I hope we can fix this soon :(
   
   Also, closer to home, 
`RaftClusterTest#testCreateClusterAndCreateAndManyTopicsWithManyPartitions` 
seems to flake periodically on Jenkins (even before this PR), but I can't seem 
to reproduce it locally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10545: KAFKA-12672: Added config for raft testing server

2021-04-15 Thread GitBox


ijuma commented on pull request #10545:
URL: https://github.com/apache/kafka/pull/10545#issuecomment-820659248


   Thanks for the PR. Don't you need to update the multi-node-quorum section? 
https://github.com/apache/kafka/tree/trunk/raft#run-multi-node-quorum


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck opened a new pull request #10545: KAFKA-12672: Added config for raft testing server

2021-04-15 Thread GitBox


bbejeck opened a new pull request #10545:
URL: https://github.com/apache/kafka/pull/10545


   Adding a property to the `raft/config/kraft.properties` for running the raft 
test server in development.
   
   For testing I ran `./bin/test-kraft-server-start.sh --config 
config/kraft.properties` and validated the test server started running with a 
throughput test.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-15 Thread GitBox


spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r614313362



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -93,23 +136,118 @@ public void process(final K key, final V1 value) {
 }
 
 boolean needOuterJoin = outer;
+boolean joinFound = false;
 
 final long inputRecordTimestamp = context().timestamp();
 final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
 final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
 
+maxObservedStreamTime.advance(inputRecordTimestamp);
+
 try (final WindowStoreIterator iter = otherWindow.fetch(key, 
timeFrom, timeTo)) {
 while (iter.hasNext()) {
 needOuterJoin = false;
+joinFound = true;
 final KeyValue otherRecord = iter.next();
+final long otherRecordTimestamp = otherRecord.key;
+
+// Emit expired records before the joined record to keep 
time ordering
+emitExpiredNonJoinedOuterRecordsExcept(key, 
otherRecordTimestamp);
+
 context().forward(
 key,
 joiner.apply(key, value, otherRecord.value),
-To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecord.key)));
+To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecordTimestamp)));
+}
+
+// Emit all expired records before adding a new non-joined 
record to the store. Otherwise,
+// the put() call will advance the stream time, which causes 
records out of the retention
+// period to be deleted, thus not being emitted later.
+if (!joinFound && inputRecordTimestamp == 
maxObservedStreamTime.get()) {
+emitExpiredNonJoinedOuterRecords();
 }
 
 if (needOuterJoin) {
-context().forward(key, joiner.apply(key, value, null));
+// The maxStreamTime contains the max time observed in 
both sides of the join.
+// Having access to the time observed in the other join 
side fixes the following
+// problem:
+//
+// Say we have a window size of 5 seconds
+//  1. A non-joined record wth time T10 is seen in the 
left-topic (maxLeftStreamTime: 10)
+// The record is not processed yet, and is added to 
the outer-join store
+//  2. A non-joined record with time T2 is seen in the 
right-topic (maxRightStreamTime: 2)
+// The record is not processed yet, and is added to 
the outer-join store
+//  3. A joined record with time T11 is seen in the 
left-topic (maxLeftStreamTime: 11)
+// It is time to look at the expired records. T10 and 
T2 should be emitted, but
+// because T2 was late, then it is not fetched by the 
window store, so it is not processed
+//
+// See KStreamKStreamLeftJoinTest.testLowerWindowBound() 
tests
+//
+// the condition below allows us to process the late 
record without the need
+// to hold it in the temporary outer store
+if (!outerJoinWindowStore.isPresent() || timeTo < 
maxObservedStreamTime.get()) {
+context().forward(key, joiner.apply(key, value, null));
+} else {
+outerJoinWindowStore.ifPresent(store -> store.put(
+KeyAndJoinSide.make(isLeftJoin, key),
+LeftOrRightValue.make(isLeftJoin, value),
+inputRecordTimestamp));
+}
+}
+}
+}
+
+private void emitExpiredNonJoinedOuterRecords() {
+outerJoinWindowStore.ifPresent(store ->
+emitExpiredNonJoinedOuterRecords(store, 
recordWindowHasClosed));
+}
+
+private void emitExpiredNonJoinedOuterRecordsExcept(final K key, final 
long timestamp) {
+outerJoinWindowStore.ifPresent(store -> {
+final KeyAndJoinSide keyAndJoinSide = 
KeyAndJoinSide.make(!isLeftJoin, key);
+
+// Emit all expired records except the just found non-joined 
key. We need
+// to emit all expired records before calling put(), otherwise 
the internal
+// stream time will advance and may cause records out of the 
retention period to
+// be deleted.
+emitExpiredNonJoinedOuterRecords(store,
+

[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-15 Thread GitBox


spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r614312924



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##
@@ -211,6 +263,66 @@ private void assertUniqueStoreNames(final 
WindowBytesStoreSupplier supplier,
 return builder;
 }
 
+@SuppressWarnings("unchecked")
+private static  StoreBuilder, 
LeftOrRightValue>> outerJoinWindowStoreBuilder(final String storeName,

Review comment:
   Done

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -60,20 +82,41 @@
 }
 
 private class KStreamKStreamJoinProcessor extends AbstractProcessor 
{
+private static final boolean 
DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT = false;

Review comment:
   Done

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -60,20 +82,41 @@
 }
 
 private class KStreamKStreamJoinProcessor extends AbstractProcessor 
{
+private static final boolean 
DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT = false;
+
+private final Predicate>> 
recordWindowHasClosed =
+windowedKey -> windowedKey.window().start() + joinAfterMs + 
joinGraceMs < maxObservedStreamTime.get();
 
 private WindowStore otherWindow;
 private StreamsMetricsImpl metrics;
 private Sensor droppedRecordsSensor;
+private Optional, LeftOrRightValue>> 
outerJoinWindowStore = Optional.empty();
 
 @SuppressWarnings("unchecked")
 @Override
 public void init(final ProcessorContext context) {
 super.init(context);
 metrics = (StreamsMetricsImpl) context.metrics();
 droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
-otherWindow = (WindowStore) 
context.getStateStore(otherWindowName);
+otherWindow = context.getStateStore(otherWindowName);
+
+if (!internalOuterJoinFixDisabled(context.appConfigs())) {
+outerJoinWindowStore = outerJoinWindowName.map(name -> 
context.getStateStore(name));
+}
 }
 
+private boolean internalOuterJoinFixDisabled(final Map 
configs) {
+final Object value = 
configs.get(StreamsConfig.InternalConfig.INTERNAL_DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX);
+if (value == null) {
+return DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT;

Review comment:
   Done

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -60,20 +82,41 @@
 }
 
 private class KStreamKStreamJoinProcessor extends AbstractProcessor 
{
+private static final boolean 
DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT = false;
+
+private final Predicate>> 
recordWindowHasClosed =
+windowedKey -> windowedKey.window().start() + joinAfterMs + 
joinGraceMs < maxObservedStreamTime.get();
 
 private WindowStore otherWindow;
 private StreamsMetricsImpl metrics;
 private Sensor droppedRecordsSensor;
+private Optional, LeftOrRightValue>> 
outerJoinWindowStore = Optional.empty();
 
 @SuppressWarnings("unchecked")
 @Override
 public void init(final ProcessorContext context) {
 super.init(context);
 metrics = (StreamsMetricsImpl) context.metrics();
 droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
-otherWindow = (WindowStore) 
context.getStateStore(otherWindowName);
+otherWindow = context.getStateStore(otherWindowName);
+
+if (!internalOuterJoinFixDisabled(context.appConfigs())) {
+outerJoinWindowStore = outerJoinWindowName.map(name -> 
context.getStateStore(name));
+}
 }
 
+private boolean internalOuterJoinFixDisabled(final Map 
configs) {
+final Object value = 
configs.get(StreamsConfig.InternalConfig.INTERNAL_DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX);
+if (value == null) {
+return DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT;
+}
+
+if (value instanceof Boolean) {
+return (Boolean) value;
+} else {

Review comment:
   Done

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -60,20 +82,41 @@
 }
 
 private class KStreamKStreamJoinProcessor extends AbstractProcessor 
{
+private static final boolean 
DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT = false;
+
+private final Predicate>> 

[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-15 Thread GitBox


spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r614312224



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##
@@ -211,6 +263,66 @@ private void assertUniqueStoreNames(final 
WindowBytesStoreSupplier supplier,
 return builder;
 }
 
+@SuppressWarnings("unchecked")
+private static  StoreBuilder, 
LeftOrRightValue>> outerJoinWindowStoreBuilder(final String storeName,
+   
   final JoinWindows windows,
+   
   final 
StreamJoinedInternal streamJoinedInternal) {
+final StoreBuilder, LeftOrRightValue>> builder = new TimeOrderedWindowStoreBuilder, 
LeftOrRightValue>(
+persistentTimeOrderedWindowStore(
+storeName + "-store",
+Duration.ofMillis(windows.size() + windows.gracePeriodMs()),
+Duration.ofMillis(windows.size())
+),
+new KeyAndJoinSideSerde<>(streamJoinedInternal.keySerde()),
+new LeftOrRightValueSerde(streamJoinedInternal.valueSerde(), 
streamJoinedInternal.otherValueSerde()),
+Time.SYSTEM

Review comment:
   It will require more changes just to allow that. The `KStreamImplJoin` 
constructor, where we could overload to pass a `Time` mock object, is only used 
by the `KStreamImplJoinImpl` class. The tests use the `StreamsBuilder` to 
create the joins, and they do not accept a Time object. 
   
   Also, the `Stores` class, which is called by `KStreamImplJoin`, does not 
mock it. Maybe because the same code changes required just for that?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #10539: Minor: Move trogdor out of tools and into its own project

2021-04-15 Thread GitBox


cmccabe merged pull request #10539:
URL: https://github.com/apache/kafka/pull/10539


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-15 Thread GitBox


spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r614304330



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -93,23 +136,118 @@ public void process(final K key, final V1 value) {
 }
 
 boolean needOuterJoin = outer;
+boolean joinFound = false;
 
 final long inputRecordTimestamp = context().timestamp();
 final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
 final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
 
+maxObservedStreamTime.advance(inputRecordTimestamp);
+
 try (final WindowStoreIterator iter = otherWindow.fetch(key, 
timeFrom, timeTo)) {
 while (iter.hasNext()) {
 needOuterJoin = false;
+joinFound = true;
 final KeyValue otherRecord = iter.next();
+final long otherRecordTimestamp = otherRecord.key;

Review comment:
   Interesting. Is this a current bug with the old join semantics?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-15 Thread GitBox


spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r614303123



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -93,23 +136,118 @@ public void process(final K key, final V1 value) {
 }
 
 boolean needOuterJoin = outer;
+boolean joinFound = false;
 
 final long inputRecordTimestamp = context().timestamp();
 final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
 final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
 
+maxObservedStreamTime.advance(inputRecordTimestamp);
+
 try (final WindowStoreIterator iter = otherWindow.fetch(key, 
timeFrom, timeTo)) {
 while (iter.hasNext()) {
 needOuterJoin = false;
+joinFound = true;
 final KeyValue otherRecord = iter.next();
+final long otherRecordTimestamp = otherRecord.key;
+
+// Emit expired records before the joined record to keep 
time ordering
+emitExpiredNonJoinedOuterRecordsExcept(key, 
otherRecordTimestamp);
+
 context().forward(
 key,
 joiner.apply(key, value, otherRecord.value),
-To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecord.key)));
+To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecordTimestamp)));
+}
+
+// Emit all expired records before adding a new non-joined 
record to the store. Otherwise,
+// the put() call will advance the stream time, which causes 
records out of the retention
+// period to be deleted, thus not being emitted later.
+if (!joinFound && inputRecordTimestamp == 
maxObservedStreamTime.get()) {
+emitExpiredNonJoinedOuterRecords();
 }
 
 if (needOuterJoin) {
-context().forward(key, joiner.apply(key, value, null));
+// The maxStreamTime contains the max time observed in 
both sides of the join.
+// Having access to the time observed in the other join 
side fixes the following
+// problem:
+//
+// Say we have a window size of 5 seconds
+//  1. A non-joined record wth time T10 is seen in the 
left-topic (maxLeftStreamTime: 10)
+// The record is not processed yet, and is added to 
the outer-join store
+//  2. A non-joined record with time T2 is seen in the 
right-topic (maxRightStreamTime: 2)
+// The record is not processed yet, and is added to 
the outer-join store
+//  3. A joined record with time T11 is seen in the 
left-topic (maxLeftStreamTime: 11)
+// It is time to look at the expired records. T10 and 
T2 should be emitted, but
+// because T2 was late, then it is not fetched by the 
window store, so it is not processed
+//
+// See KStreamKStreamLeftJoinTest.testLowerWindowBound() 
tests
+//
+// the condition below allows us to process the late 
record without the need
+// to hold it in the temporary outer store
+if (!outerJoinWindowStore.isPresent() || timeTo < 
maxObservedStreamTime.get()) {
+context().forward(key, joiner.apply(key, value, null));
+} else {
+outerJoinWindowStore.ifPresent(store -> store.put(
+KeyAndJoinSide.make(isLeftJoin, key),
+LeftOrRightValue.make(isLeftJoin, value),
+inputRecordTimestamp));
+}
+}
+}
+}
+
+private void emitExpiredNonJoinedOuterRecords() {
+outerJoinWindowStore.ifPresent(store ->
+emitExpiredNonJoinedOuterRecords(store, 
recordWindowHasClosed));
+}
+
+private void emitExpiredNonJoinedOuterRecordsExcept(final K key, final 
long timestamp) {
+outerJoinWindowStore.ifPresent(store -> {
+final KeyAndJoinSide keyAndJoinSide = 
KeyAndJoinSide.make(!isLeftJoin, key);
+
+// Emit all expired records except the just found non-joined 
key. We need
+// to emit all expired records before calling put(), otherwise 
the internal
+// stream time will advance and may cause records out of the 
retention period to
+// be deleted.
+emitExpiredNonJoinedOuterRecords(store,
+

[jira] [Updated] (KAFKA-12672) Running test-kraft-server-start results in error

2021-04-15 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-12672:

Description: 
Running the {{test-kraft-server-start}} script in the {{raft}} module results 
in this error

 
{code:java}
ERROR Exiting raft server due to fatal exception (kafka.tools.TestRaftServer$)
java.lang.IllegalArgumentException: No enum constant 
org.apache.kafka.common.security.auth.SecurityProtocol.
at java.lang.Enum.valueOf(Enum.java:238)
at 
org.apache.kafka.common.security.auth.SecurityProtocol.valueOf(SecurityProtocol.java:26)
at 
org.apache.kafka.common.security.auth.SecurityProtocol.forName(SecurityProtocol.java:72)
at 
kafka.raft.KafkaRaftManager.$anonfun$buildNetworkClient$1(RaftManager.scala:256)
at scala.collection.immutable.Map$Map4.getOrElse(Map.scala:530)
at kafka.raft.KafkaRaftManager.buildNetworkClient(RaftManager.scala:256)
at 
kafka.raft.KafkaRaftManager.buildNetworkChannel(RaftManager.scala:234)
at kafka.raft.KafkaRaftManager.(RaftManager.scala:126)
at kafka.tools.TestRaftServer.startup(TestRaftServer.scala:88)
at kafka.tools.TestRaftServer$.main(TestRaftServer.scala:442)
at kafka.tools.TestRaftServer.main(TestRaftServer.scala)
{code}
Looks like the listener property in the config is not getting picked up as an 
empty string gets passed to {{SecurityProtocol.forName}}

EDIT: The issue is the properties file needs to have a 
{{controller.listener.names}} property with just values of the names

  was:
Running the {{test-kraft-server-start}} script in the {{raft}} module results 
in this error

 
{code:java}
ERROR Exiting raft server due to fatal exception (kafka.tools.TestRaftServer$)
java.lang.IllegalArgumentException: No enum constant 
org.apache.kafka.common.security.auth.SecurityProtocol.
at java.lang.Enum.valueOf(Enum.java:238)
at 
org.apache.kafka.common.security.auth.SecurityProtocol.valueOf(SecurityProtocol.java:26)
at 
org.apache.kafka.common.security.auth.SecurityProtocol.forName(SecurityProtocol.java:72)
at 
kafka.raft.KafkaRaftManager.$anonfun$buildNetworkClient$1(RaftManager.scala:256)
at scala.collection.immutable.Map$Map4.getOrElse(Map.scala:530)
at kafka.raft.KafkaRaftManager.buildNetworkClient(RaftManager.scala:256)
at 
kafka.raft.KafkaRaftManager.buildNetworkChannel(RaftManager.scala:234)
at kafka.raft.KafkaRaftManager.(RaftManager.scala:126)
at kafka.tools.TestRaftServer.startup(TestRaftServer.scala:88)
at kafka.tools.TestRaftServer$.main(TestRaftServer.scala:442)
at kafka.tools.TestRaftServer.main(TestRaftServer.scala)
{code}
Looks like the listener property in the config is not getting picked up as an 
empty string gets passed to {{SecurityProtocol.forName}}


> Running test-kraft-server-start results in error
> 
>
> Key: KAFKA-12672
> URL: https://issues.apache.org/jira/browse/KAFKA-12672
> Project: Kafka
>  Issue Type: Bug
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
>
> Running the {{test-kraft-server-start}} script in the {{raft}} module results 
> in this error
>  
> {code:java}
> ERROR Exiting raft server due to fatal exception (kafka.tools.TestRaftServer$)
> java.lang.IllegalArgumentException: No enum constant 
> org.apache.kafka.common.security.auth.SecurityProtocol.
>   at java.lang.Enum.valueOf(Enum.java:238)
>   at 
> org.apache.kafka.common.security.auth.SecurityProtocol.valueOf(SecurityProtocol.java:26)
>   at 
> org.apache.kafka.common.security.auth.SecurityProtocol.forName(SecurityProtocol.java:72)
>   at 
> kafka.raft.KafkaRaftManager.$anonfun$buildNetworkClient$1(RaftManager.scala:256)
>   at scala.collection.immutable.Map$Map4.getOrElse(Map.scala:530)
>   at kafka.raft.KafkaRaftManager.buildNetworkClient(RaftManager.scala:256)
>   at 
> kafka.raft.KafkaRaftManager.buildNetworkChannel(RaftManager.scala:234)
>   at kafka.raft.KafkaRaftManager.(RaftManager.scala:126)
>   at kafka.tools.TestRaftServer.startup(TestRaftServer.scala:88)
>   at kafka.tools.TestRaftServer$.main(TestRaftServer.scala:442)
>   at kafka.tools.TestRaftServer.main(TestRaftServer.scala)
> {code}
> Looks like the listener property in the config is not getting picked up as an 
> empty string gets passed to {{SecurityProtocol.forName}}
> EDIT: The issue is the properties file needs to have a 
> {{controller.listener.names}} property with just values of the names



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12673) Kafka and zookeeper dependency

2021-04-15 Thread suhas (Jira)
suhas created KAFKA-12673:
-

 Summary: Kafka and zookeeper dependency
 Key: KAFKA-12673
 URL: https://issues.apache.org/jira/browse/KAFKA-12673
 Project: Kafka
  Issue Type: Task
Reporter: suhas


We are using kafka version 2.4.1 in our cluster.

We are not able to upgrade zookeeper latest version 3.7.0 version alone as it 
having dependency with kafka.

So changing zookeeper version alone in our Kafka cluster is incompatible change.

Looking forward for the resolution.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] wcarlson5 commented on pull request #10544: KAFKA-12648: minimal changes for error handling namedTopologies

2021-04-15 Thread GitBox


wcarlson5 commented on pull request #10544:
URL: https://github.com/apache/kafka/pull/10544#issuecomment-820620451


   @ableegoldman This is what I think the error handling should look like. We 
can try to categorize more but I think we should make this best effort and we 
can add to it as we go.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 opened a new pull request #10544: KAFKA-12648: minimal changes for error handling namedTopologies

2021-04-15 Thread GitBox


wcarlson5 opened a new pull request #10544:
URL: https://github.com/apache/kafka/pull/10544


   changed the obvious ones to attribute to a named topology. This might be all 
we need, we can always add more if they come up.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12671) Out of order processing with a transactional producer can lead to a stuck LastStableOffset

2021-04-15 Thread Travis Bischel (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Travis Bischel updated KAFKA-12671:
---
Description: 
If there is pathological processing of incoming produce requests and EndTxn 
requests, then the LastStableOffset can get stuck, which will block consuming 
in READ_COMMITTED mode.

To transactionally produce, the standard flow is to InitProducerId, and then 
loop AddPartitionsToTxn -> Produce+ -> EndTxn. The AddPartitionsToTxn is 
responsible for fencing and adding partitions to a transaction, and the end 
transaction is responsible for finishing the transaction. Producing itself is 
mostly uninvolved with the proper fencing / ending flow, but produce requests 
are required to be after AddPartitionsToTxn and before EndTxn.

When a ProduceRequest is handled, Kafka uses an internal ProducerStateManager 
to mildly manage transactions. The ProducerStateManager is completely 
independent of the TxnCoordinator, and its guarantees are relatively weak. The 
ProducerStateManager handles two types of "batches" being added: a data batch 
and a transaction marker. When a data batch is added, a "transaction" is begun 
and tied to the producer ID that is producing the batch. When a transaction 
marker is handled, the ProducerStateManager removes the transaction for the 
producer ID (roughly).

EndTxn is what triggers transaction markers to be sent to the 
ProducerStateManager. In essence, EndTxn is the one part of the transactional 
producer flow that talks across both the TxnCoordinator and the 
ProducerStateManager.

If a ProduceRequest is issued before EndTxn, but handled internally in Kafka 
after EndTxn, then the ProduceRequest will begin a new transaction in the 
ProducerStateManager. If the client was disconnecting, and the EndTxn was the 
final request issued, the new transaction created in ProducerStateManager is 
orphaned and nothing can clean it up. The LastStableOffset then hangs based off 
of this hung transaction.

This same problem can be triggered by a produce request that is issued with a 
transactional ID outside of the context of a transaction at all (no 
AddPartitionsToTxn). This problem cannot be triggered by producing for so long 
that the transaction expires; the difference here is that the transaction 
coordinator bumps the epoch for the producer ID, thus producing again with the 
old epoch does not work.

Theoretically, we are supposed have unlimited retries on produce requests, but 
in the context of wanting to abort everything and shut down, this is not always 
feasible. As it currently stands, I'm not sure there's a truly safe way to shut 
down _without_ flushing and receiving responses for every record produced, even 
if I want to abort everything and quit. The safest approach I can think of is 
to actually avoid issuing an EndTxn so that instead we rely on Kafka internally 
to time things out after a period of time.

—

For some context, here's my request logs from the client. Note that I write two 
ProduceRequests, read one, and then issue EndTxn (because I know I want to 
quit). The second ProduceRequest is read successfully before shutdown, but I 
ignore the results because I am shutting down. I've taken out logs related to 
consuming, but the order of the logs is unchanged:
{noformat}
[INFO] done waiting for unknown topic, metadata was successful; topic: 
2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765
[INFO] initializing producer id
[DEBUG] wrote FindCoordinator v3; err: 
[DEBUG] read FindCoordinator v3; err: 
[DEBUG] wrote InitProducerID v4; err: 
[DEBUG] read InitProducerID v4; err: 
[INFO] producer id initialization success; id: 1463, epoch: 0
[DEBUG] wrote AddPartitionsToTxn v2; err: 
[DEBUG] read AddPartitionsToTxn v2; err: 
[DEBUG] wrote Produce v8; err: 
[DEBUG] read Produce v8; err: 
[DEBUG] produced; to: 
2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765[1{15589=>19686}]
[DEBUG] wrote Produce v8; err: 
[DEBUG] wrote EndTxn v2; err: 
[DEBUG] read EndTxn v2; err: 
[DEBUG] read from broker errored, killing connection; addr: localhost:9092, id: 
1, successful_reads: 1, err: context canceled
[DEBUG] read Produce v8; err: 
[DEBUG] produced; to: 
2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765[1{skipped}]
{noformat}
And then from the broker's point of view. Across two brokers, the second 
ProduceRequest is completed after EndTxn is handled (and after the 
WriteTxnMarkers request is handled, which is the important one that hooks into 
the ProducerStateManager):
{noformat}
/// Broker 3: init producer ID
[2021-04-15 00:56:40,030] DEBUG Completed 
request:RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, clientId=kgo, 
correlationId=3) -- 

[GitHub] [kafka] cmccabe merged pull request #10505: MINOR: fix some bugs in ControllerApis.scala

2021-04-15 Thread GitBox


cmccabe merged pull request #10505:
URL: https://github.com/apache/kafka/pull/10505


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on pull request #10543: KAFKA-12648: tag uses of StreamsException to add Named to

2021-04-15 Thread GitBox


wcarlson5 commented on pull request #10543:
URL: https://github.com/apache/kafka/pull/10543#issuecomment-820610403


   @ableegoldman I tagged all the cases of StreamsExceptions that I think can 
be attributed to a Sub topology. I may have missed some and some might not be 
attributable. Also there might be other error types so I am relying on all 
exceptions being wrapped in a streamsException before they reach the handler. 
However I know I have seen one thing in soak where that is not the case. But it 
is a start


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 opened a new pull request #10543: KAFKA-12648: tag uses of StreamsException to add Named to

2021-04-15 Thread GitBox


wcarlson5 opened a new pull request #10543:
URL: https://github.com/apache/kafka/pull/10543


   change each StreamsException so that it has the sub topology name in it
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10505: MINOR: fix some bugs in ControllerApis.scala

2021-04-15 Thread GitBox


chia7712 commented on a change in pull request #10505:
URL: https://github.com/apache/kafka/pull/10505#discussion_r614266078



##
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##
@@ -242,87 +253,99 @@ class ControllerApis(val requestChannel: RequestChannel,
 val toAuthenticate = new util.HashSet[String]
 toAuthenticate.addAll(providedNames)
 val idToName = new util.HashMap[Uuid, String]
-controller.findTopicNames(providedIds).get().forEach { (id, nameOrError) =>
-  if (nameOrError.isError) {
-appendResponse(null, id, nameOrError.error())
-  } else {
-toAuthenticate.add(nameOrError.result())
-idToName.put(id, nameOrError.result())
-  }
-}
-// Get the list of deletable topics (those we can delete) and the list of 
describeable
-// topics.  If a topic can't be deleted or described, we have to act like 
it doesn't
-// exist, even when it does.
-val topicsToAuthenticate = toAuthenticate.asScala
-val (describeable, deletable) = if (hasClusterAuth) {
-  (topicsToAuthenticate.toSet, topicsToAuthenticate.toSet)
-} else {
-  (getDescribableTopics(topicsToAuthenticate), 
getDeletableTopics(topicsToAuthenticate))
-}
-// For each topic that was provided by ID, check if authentication failed.
-// If so, remove it from the idToName map and create an error response for 
it.
-val iterator = idToName.entrySet().iterator()
-while (iterator.hasNext) {
-  val entry = iterator.next()
-  val id = entry.getKey
-  val name = entry.getValue
-  if (!deletable.contains(name)) {
-if (describeable.contains(name)) {
-  appendResponse(name, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+controller.findTopicNames(providedIds).thenCompose(topicNames => {

Review comment:
   Sure. I will file a PR to deal with it if I get a way.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10505: MINOR: fix some bugs in ControllerApis.scala

2021-04-15 Thread GitBox


cmccabe commented on a change in pull request #10505:
URL: https://github.com/apache/kafka/pull/10505#discussion_r614264455



##
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##
@@ -242,87 +253,99 @@ class ControllerApis(val requestChannel: RequestChannel,
 val toAuthenticate = new util.HashSet[String]
 toAuthenticate.addAll(providedNames)
 val idToName = new util.HashMap[Uuid, String]
-controller.findTopicNames(providedIds).get().forEach { (id, nameOrError) =>
-  if (nameOrError.isError) {
-appendResponse(null, id, nameOrError.error())
-  } else {
-toAuthenticate.add(nameOrError.result())
-idToName.put(id, nameOrError.result())
-  }
-}
-// Get the list of deletable topics (those we can delete) and the list of 
describeable
-// topics.  If a topic can't be deleted or described, we have to act like 
it doesn't
-// exist, even when it does.
-val topicsToAuthenticate = toAuthenticate.asScala
-val (describeable, deletable) = if (hasClusterAuth) {
-  (topicsToAuthenticate.toSet, topicsToAuthenticate.toSet)
-} else {
-  (getDescribableTopics(topicsToAuthenticate), 
getDeletableTopics(topicsToAuthenticate))
-}
-// For each topic that was provided by ID, check if authentication failed.
-// If so, remove it from the idToName map and create an error response for 
it.
-val iterator = idToName.entrySet().iterator()
-while (iterator.hasNext) {
-  val entry = iterator.next()
-  val id = entry.getKey
-  val name = entry.getValue
-  if (!deletable.contains(name)) {
-if (describeable.contains(name)) {
-  appendResponse(name, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+controller.findTopicNames(providedIds).thenCompose(topicNames => {

Review comment:
   When I remove the braces I get this compiler error:
   ```
   > Task :core:compileScala FAILED
   [Error] 
/home/cmccabe/src/kafka1/core/src/main/scala/kafka/server/ControllerApis.scala:270:
 ')' expected but 'val' found.
   [Error] 
/home/cmccabe/src/kafka1/core/src/main/scala/kafka/server/ControllerApis.scala:278:
 ';' expected but 'val' found.
   two errors found
   ```
   
   Maybe let's follow up on this in a separate PR, if you can find a way to 
remove some of the braces and still have it compile...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12672) Running test-kraft-server-start results in error

2021-04-15 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-12672:

Description: 
Running the {{test-kraft-server-start}} script in the {{raft}} module results 
in this error

 
{code:java}
ERROR Exiting raft server due to fatal exception (kafka.tools.TestRaftServer$)
java.lang.IllegalArgumentException: No enum constant 
org.apache.kafka.common.security.auth.SecurityProtocol.
at java.lang.Enum.valueOf(Enum.java:238)
at 
org.apache.kafka.common.security.auth.SecurityProtocol.valueOf(SecurityProtocol.java:26)
at 
org.apache.kafka.common.security.auth.SecurityProtocol.forName(SecurityProtocol.java:72)
at 
kafka.raft.KafkaRaftManager.$anonfun$buildNetworkClient$1(RaftManager.scala:256)
at scala.collection.immutable.Map$Map4.getOrElse(Map.scala:530)
at kafka.raft.KafkaRaftManager.buildNetworkClient(RaftManager.scala:256)
at 
kafka.raft.KafkaRaftManager.buildNetworkChannel(RaftManager.scala:234)
at kafka.raft.KafkaRaftManager.(RaftManager.scala:126)
at kafka.tools.TestRaftServer.startup(TestRaftServer.scala:88)
at kafka.tools.TestRaftServer$.main(TestRaftServer.scala:442)
at kafka.tools.TestRaftServer.main(TestRaftServer.scala)
{code}
Looks like the listener property in the config is not getting picked up as an 
empty string gets passed to {{SecurityProtocol.forName}}

  was:
Running the {{test-kraft-server-start}} script in the {{raft}} module results 
in this error

 
{code:java}
ERROR Exiting raft server due to fatal exception (kafka.tools.TestRaftServer$)
java.lang.IllegalArgumentException: No enum constant 
org.apache.kafka.common.security.auth.SecurityProtocol.
at java.lang.Enum.valueOf(Enum.java:238)
at 
org.apache.kafka.common.security.auth.SecurityProtocol.valueOf(SecurityProtocol.java:26)
at 
org.apache.kafka.common.security.auth.SecurityProtocol.forName(SecurityProtocol.java:72)
at 
kafka.raft.KafkaRaftManager.$anonfun$buildNetworkClient$1(RaftManager.scala:256)
at scala.collection.immutable.Map$Map4.getOrElse(Map.scala:530)
at kafka.raft.KafkaRaftManager.buildNetworkClient(RaftManager.scala:256)
at 
kafka.raft.KafkaRaftManager.buildNetworkChannel(RaftManager.scala:234)
at kafka.raft.KafkaRaftManager.(RaftManager.scala:126)
at kafka.tools.TestRaftServer.startup(TestRaftServer.scala:88)
at kafka.tools.TestRaftServer$.main(TestRaftServer.scala:442)
at kafka.tools.TestRaftServer.main(TestRaftServer.scala)
{code}
Looks like the listener property in the config is not getting picked up


> Running test-kraft-server-start results in error
> 
>
> Key: KAFKA-12672
> URL: https://issues.apache.org/jira/browse/KAFKA-12672
> Project: Kafka
>  Issue Type: Bug
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
>
> Running the {{test-kraft-server-start}} script in the {{raft}} module results 
> in this error
>  
> {code:java}
> ERROR Exiting raft server due to fatal exception (kafka.tools.TestRaftServer$)
> java.lang.IllegalArgumentException: No enum constant 
> org.apache.kafka.common.security.auth.SecurityProtocol.
>   at java.lang.Enum.valueOf(Enum.java:238)
>   at 
> org.apache.kafka.common.security.auth.SecurityProtocol.valueOf(SecurityProtocol.java:26)
>   at 
> org.apache.kafka.common.security.auth.SecurityProtocol.forName(SecurityProtocol.java:72)
>   at 
> kafka.raft.KafkaRaftManager.$anonfun$buildNetworkClient$1(RaftManager.scala:256)
>   at scala.collection.immutable.Map$Map4.getOrElse(Map.scala:530)
>   at kafka.raft.KafkaRaftManager.buildNetworkClient(RaftManager.scala:256)
>   at 
> kafka.raft.KafkaRaftManager.buildNetworkChannel(RaftManager.scala:234)
>   at kafka.raft.KafkaRaftManager.(RaftManager.scala:126)
>   at kafka.tools.TestRaftServer.startup(TestRaftServer.scala:88)
>   at kafka.tools.TestRaftServer$.main(TestRaftServer.scala:442)
>   at kafka.tools.TestRaftServer.main(TestRaftServer.scala)
> {code}
> Looks like the listener property in the config is not getting picked up as an 
> empty string gets passed to {{SecurityProtocol.forName}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] Iskuskov commented on pull request #9541: KAFKA-10675: Add schema name to ConnectSchema.validateValue() error message

2021-04-15 Thread GitBox


Iskuskov commented on pull request #9541:
URL: https://github.com/apache/kafka/pull/9541#issuecomment-820596386


   @chia7712, @ijuma, could you take a look at this please?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12672) Running test-kraft-server-start results in error

2021-04-15 Thread Bill Bejeck (Jira)
Bill Bejeck created KAFKA-12672:
---

 Summary: Running test-kraft-server-start results in error
 Key: KAFKA-12672
 URL: https://issues.apache.org/jira/browse/KAFKA-12672
 Project: Kafka
  Issue Type: Bug
Reporter: Bill Bejeck
Assignee: Bill Bejeck


Running the {{test-kraft-server-start}} script in the {{raft}} module results 
in this error

 
{code:java}
ERROR Exiting raft server due to fatal exception (kafka.tools.TestRaftServer$)
java.lang.IllegalArgumentException: No enum constant 
org.apache.kafka.common.security.auth.SecurityProtocol.
at java.lang.Enum.valueOf(Enum.java:238)
at 
org.apache.kafka.common.security.auth.SecurityProtocol.valueOf(SecurityProtocol.java:26)
at 
org.apache.kafka.common.security.auth.SecurityProtocol.forName(SecurityProtocol.java:72)
at 
kafka.raft.KafkaRaftManager.$anonfun$buildNetworkClient$1(RaftManager.scala:256)
at scala.collection.immutable.Map$Map4.getOrElse(Map.scala:530)
at kafka.raft.KafkaRaftManager.buildNetworkClient(RaftManager.scala:256)
at 
kafka.raft.KafkaRaftManager.buildNetworkChannel(RaftManager.scala:234)
at kafka.raft.KafkaRaftManager.(RaftManager.scala:126)
at kafka.tools.TestRaftServer.startup(TestRaftServer.scala:88)
at kafka.tools.TestRaftServer$.main(TestRaftServer.scala:442)
at kafka.tools.TestRaftServer.main(TestRaftServer.scala)
{code}
Looks like the listener property in the config is not getting picked up



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe merged pull request #10538: MINOR: shutdown KafkaScheduler at appropriate time

2021-04-15 Thread GitBox


cmccabe merged pull request #10538:
URL: https://github.com/apache/kafka/pull/10538


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kpatelatwork commented on a change in pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits

2021-04-15 Thread GitBox


kpatelatwork commented on a change in pull request #10528:
URL: https://github.com/apache/kafka/pull/10528#discussion_r614230413



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
##
@@ -105,6 +105,11 @@ public void remove(ConnectorTaskId id) {
 }
 
 private void commit(WorkerSourceTask workerTask) {
+if (!workerTask.shouldCommitOffsets()) {

Review comment:
   I prefer the encapsulation approach but will leave the decision to you 
as it's a minor nitpick :).
   
   I prefer encapsulation because the caller to `commitOffsets` need not worry 
about first calling `shouldCommitOffsets` and only if it's true then I should 
call `commitOffsets`.  Think like we don't need to do  `!ArrayList.isEmpty` 
check before iterating over it :).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10505: MINOR: fix some bugs in ControllerApis.scala

2021-04-15 Thread GitBox


mumrah commented on a change in pull request #10505:
URL: https://github.com/apache/kafka/pull/10505#discussion_r614218074



##
File path: core/src/test/java/kafka/test/MockController.java
##
@@ -197,15 +209,73 @@ private MockController(Collection 
initialTopics) {
 
 @Override
 public CompletableFuture> 
incrementalAlterConfigs(
-Map>> configChanges,
+Map>> configChanges,
 boolean validateOnly) {
+Map results = new HashMap<>();
+for (Entry>> entry :
+configChanges.entrySet()) {
+ConfigResource resource = entry.getKey();
+results.put(resource, incrementalAlterResource(resource, 
entry.getValue(), validateOnly));
+}
+CompletableFuture> future = new 
CompletableFuture<>();
+future.complete(results);
+return future;
+}
+
+private ApiError incrementalAlterResource(ConfigResource resource,
+Map> ops, boolean 
validateOnly) {
+for (Entry> entry : 
ops.entrySet()) {
+AlterConfigOp.OpType opType = entry.getValue().getKey();
+if (opType != SET && opType != DELETE) {
+return new ApiError(INVALID_REQUEST, "This mock does not " +
+"support the " + opType + " config operation.");
+}
+}
+if (!validateOnly) {
+for (Entry> entry : 
ops.entrySet()) {
+String key = entry.getKey();
+AlterConfigOp.OpType op = entry.getValue().getKey();
+String value = entry.getValue().getValue();
+switch (op) {
+case SET:
+configs.computeIfAbsent(resource, __ -> new 
HashMap<>()).put(key, value);

Review comment:
   well, i thought a method reference would work here for the hash map, but 
I tried it and it doesn't seem to work. 樂 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] JoelWee commented on pull request #8923: KAFKA-6435: KIP-623 Add internal topics option to streamResetter

2021-04-15 Thread GitBox


JoelWee commented on pull request #8923:
URL: https://github.com/apache/kafka/pull/8923#issuecomment-820560188


   Thanks @ableegoldman and @wcarlson5! Will try to get this done over the next 
week or so and ping you when ready :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on a change in pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits

2021-04-15 Thread GitBox


C0urante commented on a change in pull request #10528:
URL: https://github.com/apache/kafka/pull/10528#discussion_r614208924



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
##
@@ -662,20 +666,94 @@ public void testSendRecordsProducerCallbackFail() throws 
Exception {
 
 expectSendRecordProducerCallbackFail();
 
+EasyMock.expect(offsetWriter.willFlush()).andReturn(true);
+
 PowerMock.replayAll();
 
 Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, 
record2));
 assertThrows(ConnectException.class, () -> 
Whitebox.invokeMethod(workerTask, "sendRecords"));
+assertFalse(workerTask.shouldCommitOffsets());
 }
 
 @Test
-public void testSendRecordsProducerSendFailsImmediately() {
-if (!enableTopicCreation)
-// should only test with topic creation enabled
-return;
+public void testSendRecordsProducerCallbackFailInBacklog() throws 
Exception {
+/*
+1. A record is sent successfully
+2. Flush for offset commit begins
+3. Another record is dispatched to the producer and, because of 
the active offset commit, added to the backlog
+4. The producer fails to send that record and notifies the worker 
via producer callback
+5. The first offset commit succeeds as the first record has been 
sent successfully
+6. No further offset commits are attempted as the only remaining 
record has failed with a non-retriable error
+ */
+createWorkerTask();

Review comment:
   Whoops! Thanks for catching this.

##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
##
@@ -662,20 +666,94 @@ public void testSendRecordsProducerCallbackFail() throws 
Exception {
 
 expectSendRecordProducerCallbackFail();
 
+EasyMock.expect(offsetWriter.willFlush()).andReturn(true);
+
 PowerMock.replayAll();
 
 Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, 
record2));
 assertThrows(ConnectException.class, () -> 
Whitebox.invokeMethod(workerTask, "sendRecords"));
+assertFalse(workerTask.shouldCommitOffsets());
 }
 
 @Test
-public void testSendRecordsProducerSendFailsImmediately() {
-if (!enableTopicCreation)
-// should only test with topic creation enabled
-return;
+public void testSendRecordsProducerCallbackFailInBacklog() throws 
Exception {
+/*
+1. A record is sent successfully
+2. Flush for offset commit begins
+3. Another record is dispatched to the producer and, because of 
the active offset commit, added to the backlog
+4. The producer fails to send that record and notifies the worker 
via producer callback
+5. The first offset commit succeeds as the first record has been 
sent successfully
+6. No further offset commits are attempted as the only remaining 
record has failed with a non-retriable error
+ */
+createWorkerTask();
 
 createWorkerTask();
 
+SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, "topic", 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+expectTopicCreation(TOPIC);
+
+expectSendRecordOnce(false);
+expectSendRecordProducerCallbackFail();
+
+expectOffsetFlush(true);
+EasyMock.expect(offsetWriter.willFlush()).andReturn(true).anyTimes();
+
+PowerMock.replayAll();
+
+Whitebox.setInternalState(workerTask, "toSend", 
Collections.singletonList(record1));
+Whitebox.invokeMethod(workerTask, "sendRecords");
+Whitebox.setInternalState(workerTask, "flushing", true);
+Whitebox.setInternalState(workerTask, "toSend", 
Collections.singletonList(record2));
+Whitebox.invokeMethod(workerTask, "sendRecords");
+assertTrue(workerTask.shouldCommitOffsets());
+assertTrue(workerTask.commitOffsets());
+assertFalse(workerTask.shouldCommitOffsets());
+}
+
+
+@Test
+public void 
testSendRecordsProducerCallbackFailInBacklogWithNonRetriedOffsetCommit() throws 
Exception {
+/*
+1. A record is sent successfully
+2. Flush for offset commit begins
+3. Another record is dispatched to the producer and, because of 
the active offset commit, added to the backlog
+4. The producer fails to send that record and notifies the worker 
via producer callback
+5. The first offset commit fails even though first record has been 
sent successfully, (possibly from failure to produce offsets to Kafka)
+6. No further offset commits are attempted as the new record batch 
contains 

[GitHub] [kafka] C0urante commented on a change in pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits

2021-04-15 Thread GitBox


C0urante commented on a change in pull request #10528:
URL: https://github.com/apache/kafka/pull/10528#discussion_r614208181



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
##
@@ -120,6 +120,13 @@ public synchronized boolean beginFlush() {
 return true;
 }
 
+/**
+ * @return whether there's anything to flush right now.
+ */
+public boolean willFlush() {

Review comment:
   Good catch, thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on a change in pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits

2021-04-15 Thread GitBox


C0urante commented on a change in pull request #10528:
URL: https://github.com/apache/kafka/pull/10528#discussion_r614207888



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
##
@@ -105,6 +105,11 @@ public void remove(ConnectorTaskId id) {
 }
 
 private void commit(WorkerSourceTask workerTask) {
+if (!workerTask.shouldCommitOffsets()) {

Review comment:
   I initially wanted to do just that, but held off for two reasons:
   
   1. The `SourceTaskOffsetCommitter` log message may be valuable to some 
users, especially if they've muted the `WorkerSourceTask` namespace or at least 
raised its level to `INFO`.
   2. It's a little easier to test this logic separately by creating a separate 
method that can be queried on its own. The `boolean` return value of 
`WorkerSourceTask::commitOffsets` makes it difficult to determine if an offset 
commit has been attempted and failed, or skipped entirely.
   
   I'm not super attached to either approach; if you have thoughts on why one 
or the other is better I'm happy to hear them.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12574) Deprecate eos-alpha

2021-04-15 Thread Haoran Xuan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17322296#comment-17322296
 ] 

Haoran Xuan commented on KAFKA-12574:
-

Hi, previously I was working on KAFKA-9689, in which [~bchen225242] and I were 
thinking of leveraging the feature versioning system to automatically detect 
the `exactly_once_semantics` and also make the `StreamThread` able to hot swap 
from `eos` to `eos-beta`. During which I found that the hot swapping the 
`exactly_once_semantics` is kind of difficult and I haven't thought it 
thorough, this discussion here made me realize that maybe I'm not heading in 
the correct direction. 

> Deprecate eos-alpha
> ---
>
> Key: KAFKA-12574
> URL: https://issues.apache.org/jira/browse/KAFKA-12574
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: A. Sophie Blee-Goldman
>Priority: Blocker
>  Labels: needs-kip
> Fix For: 3.0.0
>
>
> In KIP-447 we introduced a new thread-producer which is capable of 
> exactly-once semantics across multiple tasks. The new mode of EOS, called 
> eos-beta, is intended to eventually be the preferred processing mode for EOS 
> as it improves the performance and scaling of partitions/tasks. The only 
> downside is that it requires brokers to be on version 2.5+ in order to 
> understand the latest APIs that are necessary for this thread-producer.
> We should consider deprecating the eos-alpha config, ie 
> StreamsConfig.EXACTLY_ONCE, to encourage new. & existing EOS users to migrate 
> to the new-and-improved processing mode, and upgrade their brokers if 
> necessary.
> Eventually we would like to be able to remove the eos-alpha code paths from 
> Streams as this will help to simplify the logic and reduce the processing 
> mode branching. But since this will break client-broker compatibility, and 
> 2.5 is still a relatively recent version, we probably can't actually remove 
> eos-alpha in the near future



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] wcarlson5 commented on a change in pull request #8923: KAFKA-6435: KIP-623 Add internal topics option to streamResetter

2021-04-15 Thread GitBox


wcarlson5 commented on a change in pull request #8923:
URL: https://github.com/apache/kafka/pull/8923#discussion_r614166802



##
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##
@@ -248,7 +257,7 @@ private void parseArguments(final String[] args) {
 .ofType(String.class)
 .describedAs("file name");
 forceOption = optionParser.accepts("force", "Force the removal of 
members of the consumer group (intended to remove stopped members if a long 
session timeout was used). " +
-"Make sure to shut down all stream applications when this 
option is specified to avoid unexpected rebalances.");

Review comment:
   unnecessary whitespace change




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #8923: KAFKA-6435: KIP-623 Add internal topics option to streamResetter

2021-04-15 Thread GitBox


wcarlson5 commented on a change in pull request #8923:
URL: https://github.com/apache/kafka/pull/8923#discussion_r614166802



##
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##
@@ -248,7 +257,7 @@ private void parseArguments(final String[] args) {
 .ofType(String.class)
 .describedAs("file name");
 forceOption = optionParser.accepts("force", "Force the removal of 
members of the consumer group (intended to remove stopped members if a long 
session timeout was used). " +
-"Make sure to shut down all stream applications when this 
option is specified to avoid unexpected rebalances.");

Review comment:
   unnecessary

##
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##
@@ -225,6 +229,11 @@ private void parseArguments(final String[] args) {
 .ofType(String.class)
 .withValuesSeparatedBy(',')
 .describedAs("list");
+internalTopicsOption = optionParser.accepts("internal-topics", 
"Comma-separated list of internal topics to delete. Must be a subset of the 
internal topics marked for deletion by the default behaviour (do a dry-run 
without this option to view these topics).")

Review comment:
   line is too long

##
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##
@@ -642,22 +651,38 @@ private boolean isIntermediateTopic(final String topic) {
 return options.valuesOf(intermediateTopicsOption).contains(topic);
 }
 
-private void maybeDeleteInternalTopics(final Admin adminClient, final 
boolean dryRun) {
-System.out.println("Deleting all internal/auto-created topics for 
application " + options.valueOf(applicationIdOption));
-final List topicsToDelete = new ArrayList<>();
-for (final String listing : allTopics) {
-if (isInternalTopic(listing)) {
-if (!dryRun) {
-topicsToDelete.add(listing);
-} else {
-System.out.println("Topic: " + listing);
-}
+private int maybeDeleteInternalTopics(final Admin adminClient, final 
boolean dryRun) {
+final List inferredInternalTopics = allTopics.stream()
+.filter(this::isInferredInternalTopic)
+.collect(Collectors.toList());
+final List specifiedInternalTopics = 
options.valuesOf(internalTopicsOption);
+final List topicsToDelete;
+
+if (!specifiedInternalTopics.isEmpty()) {
+final List notFoundInternalTopics = 
specifiedInternalTopics.stream()
+.filter(topic -> !inferredInternalTopics.contains(topic))
+.collect(Collectors.toList());
+if (!notFoundInternalTopics.isEmpty()) {

Review comment:
   something like 
`inferredInternalTopics.containsAll(specifiedInternalTopics)` might be easier 
to understand here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kpatelatwork edited a comment on pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits

2021-04-15 Thread GitBox


kpatelatwork edited a comment on pull request #10528:
URL: https://github.com/apache/kafka/pull/10528#issuecomment-820517908


   @C0urante  overall looks good and very good job on the tests but as I am new 
to the code, I tried my best to review but I recommend getting a LGTM from one 
more reviewer also who knows this code better than me to ensure we don't miss 
something obvious?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12671) Out of order processing with a transactional producer can lead to a stuck LastStableOffset

2021-04-15 Thread Travis Bischel (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Travis Bischel updated KAFKA-12671:
---
Description: 
If there is pathological processing of incoming produce requests and EndTxn 
requests, then the LastStableOffset can get stuck, which will block consuming 
in READ_COMMITTED mode.

To transactionally produce, the standard flow is to InitProducerId, and then 
loop AddPartitionsToTxn -> Produce+ -> EndTxn. The AddPartitionsToTxn is 
responsible for fencing and adding partitions to a transaction, and the end 
transaction is responsible for finishing the transaction. Producing itself is 
mostly uninvolved with the proper fencing / ending flow, but produce requests 
are required to be after AddPartitionsToTxn and before EndTxn.

When a ProduceRequest is handled, Kafka uses an internal ProducerStateManager 
to mildly manage transactions. The ProducerStateManager is completely 
independent of the TxnCoordinator, and its guarantees are relatively weak. The 
ProducerStateManager handles two types of "batches" being added: a data batch 
and a transaction marker. When a data batch is added, a "transaction" is begun 
and tied to the producer ID that is producing the batch. When a transaction 
marker is handled, the ProducerStateManager removes the transaction for the 
producer ID (roughly).

EndTxn is what triggers transaction markers to be sent to the 
ProducerStateManager. In essence, EndTxn is the one part of the transactional 
producer flow that talks across both the TxnCoordinator and the 
ProducerStateManager.

If a ProduceRequest is issued before EndTxn, but handled internally in Kafka 
after EndTxn, then the ProduceRequest will begin a new transaction in the 
ProducerStateManager. If the client was disconnecting, and the EndTxn was the 
final request issued, the new transaction created in ProducerStateManager is 
orphaned and nothing can clean it up. The LastStableOffset then hangs based off 
of this hung transaction.

This same problem can be triggered by a produce request that is issued with a 
transactional ID outside of the context of a transaction at all (no 
AddPartitionsToTxn). This problem cannot be triggered by producing for so long 
that the transaction expires; the difference here is that the transaction 
coordinator bumps the epoch for the producer ID, thus producing again with the 
old epoch does not work.

Theoretically, we are supposed have unlimited retries on produce requests, but 
in the context of wanting to abort everything and shut down, this is not always 
feasible. As it currently stands, I'm not sure there's a truly safe way to shut 
down _without_ flushing and receiving responses for every record produced, even 
if I want to abort everything and quit. The safest approach I can think of is 
to actually avoid issuing an EndTxn so that instead we rely on Kafka internally 
to time things out after a period of time.

—

For some context, here's my request logs from the client. Note that I write two 
ProduceRequests, read one, and then issue EndTxn (because I know I want to 
quit). The second ProduceRequest is read successfully before shutdown, but I 
ignore the results because I am shutting down. I've taken out logs related to 
consuming, but the order of the logs is unchanged:
{noformat}
[INFO] done waiting for unknown topic, metadata was successful; topic: 
2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765
[INFO] initializing producer id
[DEBUG] wrote FindCoordinator v3; err: 
[DEBUG] read FindCoordinator v3; err: 
[DEBUG] wrote InitProducerID v4; err: 
[DEBUG] read InitProducerID v4; err: 
[INFO] producer id initialization success; id: 1463, epoch: 0

[DEBUG] wrote AddPartitionsToTxn v2; err: 
[DEBUG] read AddPartitionsToTxn v2; err: 

[DEBUG] read Produce v8; err: 
[DEBUG] produced; to: 
2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765[1{15589=>19686}]
[DEBUG] wrote Produce v8; err: 

[DEBUG] wrote EndTxn v2; err: 
[DEBUG] read EndTxn v2; err: 

[DEBUG] read from broker errored, killing connection; addr: localhost:9092, id: 
1, successful_reads: 1, err: context canceled
[DEBUG] read Produce v8; err: 
[DEBUG] produced; to: 
2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765[1{skipped}]
{noformat}
And then from the broker's point of view. Across two brokers, the second 
ProduceRequest is completed after EndTxn is handled (and after the 
WriteTxnMarkers request is handled, which is the important one that hooks into 
the ProducerStateManager):
{noformat}
/// Broker 3: init producer ID
[2021-04-15 00:56:40,030] DEBUG Completed 
request:RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, clientId=kgo, 
correlationId=3) -- 

[GitHub] [kafka] kpatelatwork commented on pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits

2021-04-15 Thread GitBox


kpatelatwork commented on pull request #10528:
URL: https://github.com/apache/kafka/pull/10528#issuecomment-820517908


   @C0urante  overall looks good but as I am new to the code, I tried my best 
to review but I recommend getting a LGTM from one more reviewer also who knows 
this code better than me to ensure we don't miss something obvious?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kpatelatwork commented on a change in pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits

2021-04-15 Thread GitBox


kpatelatwork commented on a change in pull request #10528:
URL: https://github.com/apache/kafka/pull/10528#discussion_r614162085



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
##
@@ -662,20 +666,94 @@ public void testSendRecordsProducerCallbackFail() throws 
Exception {
 
 expectSendRecordProducerCallbackFail();
 
+EasyMock.expect(offsetWriter.willFlush()).andReturn(true);
+
 PowerMock.replayAll();
 
 Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, 
record2));
 assertThrows(ConnectException.class, () -> 
Whitebox.invokeMethod(workerTask, "sendRecords"));
+assertFalse(workerTask.shouldCommitOffsets());
 }
 
 @Test
-public void testSendRecordsProducerSendFailsImmediately() {
-if (!enableTopicCreation)
-// should only test with topic creation enabled
-return;
+public void testSendRecordsProducerCallbackFailInBacklog() throws 
Exception {
+/*
+1. A record is sent successfully
+2. Flush for offset commit begins
+3. Another record is dispatched to the producer and, because of 
the active offset commit, added to the backlog
+4. The producer fails to send that record and notifies the worker 
via producer callback
+5. The first offset commit succeeds as the first record has been 
sent successfully
+6. No further offset commits are attempted as the only remaining 
record has failed with a non-retriable error
+ */
+createWorkerTask();
 
 createWorkerTask();
 
+SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, "topic", 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+expectTopicCreation(TOPIC);
+
+expectSendRecordOnce(false);
+expectSendRecordProducerCallbackFail();
+
+expectOffsetFlush(true);
+EasyMock.expect(offsetWriter.willFlush()).andReturn(true).anyTimes();
+
+PowerMock.replayAll();
+
+Whitebox.setInternalState(workerTask, "toSend", 
Collections.singletonList(record1));
+Whitebox.invokeMethod(workerTask, "sendRecords");
+Whitebox.setInternalState(workerTask, "flushing", true);
+Whitebox.setInternalState(workerTask, "toSend", 
Collections.singletonList(record2));
+Whitebox.invokeMethod(workerTask, "sendRecords");
+assertTrue(workerTask.shouldCommitOffsets());
+assertTrue(workerTask.commitOffsets());
+assertFalse(workerTask.shouldCommitOffsets());
+}
+
+
+@Test
+public void 
testSendRecordsProducerCallbackFailInBacklogWithNonRetriedOffsetCommit() throws 
Exception {
+/*
+1. A record is sent successfully
+2. Flush for offset commit begins
+3. Another record is dispatched to the producer and, because of 
the active offset commit, added to the backlog
+4. The producer fails to send that record and notifies the worker 
via producer callback
+5. The first offset commit fails even though first record has been 
sent successfully, (possibly from failure to produce offsets to Kafka)
+6. No further offset commits are attempted as the new record batch 
contains the second record, which has failed with a non-retriable error
+ */
+createWorkerTask();

Review comment:
   two calls  to createWorkerTask




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kpatelatwork commented on a change in pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits

2021-04-15 Thread GitBox


kpatelatwork commented on a change in pull request #10528:
URL: https://github.com/apache/kafka/pull/10528#discussion_r614161831



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
##
@@ -662,20 +666,94 @@ public void testSendRecordsProducerCallbackFail() throws 
Exception {
 
 expectSendRecordProducerCallbackFail();
 
+EasyMock.expect(offsetWriter.willFlush()).andReturn(true);
+
 PowerMock.replayAll();
 
 Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, 
record2));
 assertThrows(ConnectException.class, () -> 
Whitebox.invokeMethod(workerTask, "sendRecords"));
+assertFalse(workerTask.shouldCommitOffsets());
 }
 
 @Test
-public void testSendRecordsProducerSendFailsImmediately() {
-if (!enableTopicCreation)
-// should only test with topic creation enabled
-return;
+public void testSendRecordsProducerCallbackFailInBacklog() throws 
Exception {
+/*
+1. A record is sent successfully
+2. Flush for offset commit begins
+3. Another record is dispatched to the producer and, because of 
the active offset commit, added to the backlog
+4. The producer fails to send that record and notifies the worker 
via producer callback
+5. The first offset commit succeeds as the first record has been 
sent successfully
+6. No further offset commits are attempted as the only remaining 
record has failed with a non-retriable error
+ */
+createWorkerTask();

Review comment:
   two calls to createWorkerTask , is this a  copy paste or I am missing 
something :)?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 opened a new pull request #10542: Kafka 12313: Streamling windowed Deserialiser configs.

2021-04-15 Thread GitBox


vamossagar12 opened a new pull request #10542:
URL: https://github.com/apache/kafka/pull/10542


   This PR aims to streamline the configurations for WindowedDeserialisers. It 
deprecates default.windowed.key.serde.inner and 
default.windowed.value.serde.inner configs in StreamConfig and adds 
window.inner.class.deserialiser. Details described here: 
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177047930


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac merged pull request #10540: MINOR: update default for field in DeleteTopicsRequest

2021-04-15 Thread GitBox


dajac merged pull request #10540:
URL: https://github.com/apache/kafka/pull/10540


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #10540: MINOR: update default for field in DeleteTopicsRequest

2021-04-15 Thread GitBox


dajac commented on pull request #10540:
URL: https://github.com/apache/kafka/pull/10540#issuecomment-820497042


   Test failures are not related. Merging to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kpatelatwork commented on a change in pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits

2021-04-15 Thread GitBox


kpatelatwork commented on a change in pull request #10528:
URL: https://github.com/apache/kafka/pull/10528#discussion_r614129645



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
##
@@ -120,6 +120,13 @@ public synchronized boolean beginFlush() {
 return true;
 }
 
+/**
+ * @return whether there's anything to flush right now.
+ */
+public boolean willFlush() {

Review comment:
   should this method be synchronized?  All  other access to `data` seems 
to be in synchronized methods




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kpatelatwork commented on a change in pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits

2021-04-15 Thread GitBox


kpatelatwork commented on a change in pull request #10528:
URL: https://github.com/apache/kafka/pull/10528#discussion_r614129645



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
##
@@ -120,6 +120,13 @@ public synchronized boolean beginFlush() {
 return true;
 }
 
+/**
+ * @return whether there's anything to flush right now.
+ */
+public boolean willFlush() {

Review comment:
   synchronized?  All  other access to `data` seems to be in synchronized 
methods




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kpatelatwork commented on a change in pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits

2021-04-15 Thread GitBox


kpatelatwork commented on a change in pull request #10528:
URL: https://github.com/apache/kafka/pull/10528#discussion_r614129645



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
##
@@ -120,6 +120,13 @@ public synchronized boolean beginFlush() {
 return true;
 }
 
+/**
+ * @return whether there's anything to flush right now.
+ */
+public boolean willFlush() {

Review comment:
   synchronized?  All access to `data` seems to be in synchronizedd




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kpatelatwork commented on a change in pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits

2021-04-15 Thread GitBox


kpatelatwork commented on a change in pull request #10528:
URL: https://github.com/apache/kafka/pull/10528#discussion_r614126665



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
##
@@ -105,6 +105,11 @@ public void remove(ConnectorTaskId id) {
 }
 
 private void commit(WorkerSourceTask workerTask) {
+if (!workerTask.shouldCommitOffsets()) {

Review comment:
   @C0urante  overall the PR looks good but somehow I feel this 
`shouldCommitOffsets` is exposing internal details and may be `commitOffsets` 
should internally encapsulate this.  
   
   How about we move `shouldCommitOffsets` check as first line in the 
`commitOffsets` method. and remove the `log.debug("{} Committing offsets", 
workerTask); `from this method as its redundant and `workerTask .commitOffsets` 
is already doing it.  Your only challenge would be `return false` is in a lot 
of places in `workerTask .commitOffsets` but we can may be convert the current 
method to a private method and add a new commitOffsets wrapper method to make 
that easy. We will also need to move `log.error("{} Failed to commit offsets", 
workerTask);` in the wrapper method.  This way all we need to do is call 
`workerTask.commitOffsets()` in try/catch here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-15 Thread GitBox


spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r614114447



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -60,20 +82,41 @@
 }
 
 private class KStreamKStreamJoinProcessor extends AbstractProcessor 
{
+private static final boolean 
DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT = false;
+
+private final Predicate>> 
recordWindowHasClosed =
+windowedKey -> windowedKey.window().start() + joinAfterMs + 
joinGraceMs < maxObservedStreamTime.get();
 
 private WindowStore otherWindow;
 private StreamsMetricsImpl metrics;
 private Sensor droppedRecordsSensor;
+private Optional, LeftOrRightValue>> 
outerJoinWindowStore = Optional.empty();
 
 @SuppressWarnings("unchecked")
 @Override
 public void init(final ProcessorContext context) {
 super.init(context);
 metrics = (StreamsMetricsImpl) context.metrics();
 droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
-otherWindow = (WindowStore) 
context.getStateStore(otherWindowName);
+otherWindow = context.getStateStore(otherWindowName);
+
+if (!internalOuterJoinFixDisabled(context.appConfigs())) {
+outerJoinWindowStore = outerJoinWindowName.map(name -> 
context.getStateStore(name));
+}
 }
 
+private boolean internalOuterJoinFixDisabled(final Map 
configs) {
+final Object value = 
configs.get(StreamsConfig.InternalConfig.INTERNAL_DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX);
+if (value == null) {
+return DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT;

Review comment:
   That constant name is confusing. I just removed the *_DEFAULT constant 
and return false when the config is not set. But the idea is that the join fix 
is enabled (disabled = false) if the config is not set.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-15 Thread GitBox


spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r614105673



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##
@@ -118,20 +142,47 @@
 final ProcessorGraphNode otherWindowedStreamsNode = new 
ProcessorGraphNode<>(otherWindowStreamProcessorName, 
otherWindowStreamProcessorParams);
 builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);
 
+Optional, 
LeftOrRightValue>>> outerJoinWindowStore = Optional.empty();
+if (leftOuter || rightOuter) {
+final String outerJoinSuffix = "-shared-outer-join-store";
+
+// Get the suffix index of the joinThisGeneratedName to build the 
outer join store name.
+final String outerJoinStoreGeneratedName = 
KStreamImpl.OUTERSHARED_NAME
++ joinThisGeneratedName.substring(
+rightOuter
+? KStreamImpl.OUTERTHIS_NAME.length()
+: KStreamImpl.JOINTHIS_NAME.length());

Review comment:
   I initially generated a name with a new index for the shared store. 
However, seems this was going to cause incompatibilities in the topology 
because the new indexed increasing. Instead, now I just get the index from one 
of the current join stores. Why doesn't make sense? Is there another way to get 
an index? Or, do I really need to append an index at the end of the shared 
store?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-15 Thread GitBox


spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r614098535



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##
@@ -118,20 +142,47 @@
 final ProcessorGraphNode otherWindowedStreamsNode = new 
ProcessorGraphNode<>(otherWindowStreamProcessorName, 
otherWindowStreamProcessorParams);
 builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);
 
+Optional, 
LeftOrRightValue>>> outerJoinWindowStore = Optional.empty();
+if (leftOuter || rightOuter) {
+final String outerJoinSuffix = "-shared-outer-join-store";

Review comment:
   There is only one store shared between left/right joins. I can use only 
one name.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.

2021-04-15 Thread GitBox


satishd commented on pull request #10271:
URL: https://github.com/apache/kafka/pull/10271#issuecomment-820438379


   @junrao This PR is rebased with the latest trunk and it is ready for review. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.

2021-04-15 Thread GitBox


satishd commented on pull request #10271:
URL: https://github.com/apache/kafka/pull/10271#issuecomment-820437818


   Thanks @kowshik for the review. Addressed them with the latest 
[5250b79](https://github.com/apache/kafka/pull/10271/commits/5250b79cef6d21c73d2b2a12ef7cd95a00e9f034).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.

2021-04-15 Thread GitBox


satishd commented on a change in pull request #10271:
URL: https://github.com/apache/kafka/pull/10271#discussion_r614083593



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serde/RemoteLogMetadataContextSerde.java
##
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage.serde;
+
+import 
org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataContext;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Message;
+import org.apache.kafka.common.protocol.ObjectSerializationCache;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecordUpdate;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemotePartitionDeleteMetadataRecord;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Serialization and deserialization for {@link RemoteLogMetadataContext}. 
This is the root serdes for the messages
+ * that are stored in internal remote log metadata topic.
+ */
+public class RemoteLogMetadataContextSerde implements 
Serde {
+
+public static final byte REMOTE_LOG_SEGMENT_METADATA_API_KEY = (byte) new 
RemoteLogSegmentMetadataRecord().apiKey();
+public static final byte REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY = 
(byte) new RemoteLogSegmentMetadataRecordUpdate().apiKey();
+public static final byte REMOTE_PARTITION_DELETE_API_KEY = (byte) new 
RemotePartitionDeleteMetadataRecord().apiKey();
+
+private final Map keyWithSerde;
+private final Deserializer rootDeserializer;
+private final Serializer rootSerializer;
+
+public RemoteLogMetadataContextSerde() {
+keyWithSerde = createInternalSerde();
+rootSerializer = (topic, data) -> serialize(data);
+rootDeserializer = (topic, data) -> deserialize(data);
+}
+
+private Map createInternalSerde() {
+Map serdes = new HashMap<>();
+serdes.put(REMOTE_LOG_SEGMENT_METADATA_API_KEY, new 
RemoteLogSegmentMetadataSerde());
+serdes.put(REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY, new 
RemoteLogSegmentMetadataUpdateSerde());
+serdes.put(REMOTE_PARTITION_DELETE_API_KEY, new 
RemotePartitionDeleteMetadataSerde());
+return serdes;
+}
+
+private byte[] serialize(RemoteLogMetadataContext 
remoteLogMetadataContext) {

Review comment:
   `ProducerRecord` key is generally used to route to a partition based on 
the key or for compaction. But we do not want to route the requests based on 
the apiKey here and it is not a compacted topic. 
   
   apiKey and version are used to unmarshal the payload. IMHO, I see that as a 
complete structure for the record value and it should be part of the value 
instead of spreading in record key and value.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.

2021-04-15 Thread GitBox


satishd commented on a change in pull request #10271:
URL: https://github.com/apache/kafka/pull/10271#discussion_r614071198



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serde/RemoteLogSegmentMetadataSerde.java
##
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage.serde;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Message;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class RemoteLogSegmentMetadataSerde implements 
RemoteLogMetadataSerdes {
+
+public Message serialize(RemoteLogSegmentMetadata data) {
+RemoteLogSegmentMetadataRecord record = new 
RemoteLogSegmentMetadataRecord()
+.setRemoteLogSegmentId(
+new 
RemoteLogSegmentMetadataRecord.RemoteLogSegmentIdEntry()
+.setTopicIdPartition(new 
RemoteLogSegmentMetadataRecord.TopicIdPartitionEntry()
+
.setId(data.remoteLogSegmentId().topicIdPartition().topicId())
+
.setName(data.remoteLogSegmentId().topicIdPartition().topicPartition()
+.topic())
+
.setPartition(data.remoteLogSegmentId().topicIdPartition().topicPartition()
+.partition()))
+.setId(data.remoteLogSegmentId().id()))
+.setStartOffset(data.startOffset())
+.setEndOffset(data.endOffset())
+.setBrokerId(data.brokerId())
+.setEventTimestampMs(data.eventTimestampMs())
+.setMaxTimestampMs(data.maxTimestampMs())
+.setSegmentSizeInBytes(data.segmentSizeInBytes())
+
.setSegmentLeaderEpochs(data.segmentLeaderEpochs().entrySet().stream()
+.map(entry -> new 
RemoteLogSegmentMetadataRecord.SegmentLeaderEpochEntry()
+.setLeaderEpoch(entry.getKey())
+
.setOffset(entry.getValue())).collect(Collectors.toList()))
+.setRemoteLogSegmentState(data.state().id());
+
+return record;
+}
+
+@Override
+public RemoteLogSegmentMetadata deserialize(byte version, ByteBuffer 
byteBuffer) {
+RemoteLogSegmentMetadataRecord record = new 
RemoteLogSegmentMetadataRecord(
+new ByteBufferAccessor(byteBuffer), version);
+
+RemoteLogSegmentId remoteLogSegmentId = 
buildRemoteLogSegmentId(record.remoteLogSegmentId());
+Map segmentLeaderEpochs = new HashMap<>();
+for (RemoteLogSegmentMetadataRecord.SegmentLeaderEpochEntry 
segmentLeaderEpoch : record.segmentLeaderEpochs()) {
+segmentLeaderEpochs.put(segmentLeaderEpoch.leaderEpoch(), 
segmentLeaderEpoch.offset());
+}
+RemoteLogSegmentMetadata remoteLogSegmentMetadata = new 
RemoteLogSegmentMetadata(remoteLogSegmentId,
+record.startOffset(), record.endOffset(), 
record.maxTimestampMs(), record.brokerId(),
+record.eventTimestampMs(), record.segmentSizeInBytes(), 
segmentLeaderEpochs);
+RemoteLogSegmentMetadataUpdate rlsmUpdate = new 
RemoteLogSegmentMetadataUpdate(remoteLogSegmentId,

Review comment:
   We did not expose `RemoteLogSegmentMetadata` constructor to take state 
as it is always created with `RemoteLogSegmentState.COPY_SEGMENT_STARTED` and 
it should always be updated with 

[GitHub] [kafka] ijuma commented on a change in pull request #10498: MINOR: remove `checksumOrNull` and `isValid` from Record

2021-04-15 Thread GitBox


ijuma commented on a change in pull request #10498:
URL: https://github.com/apache/kafka/pull/10498#discussion_r614068123



##
File path: core/src/main/scala/kafka/tools/DumpLogSegments.scala
##
@@ -279,8 +279,10 @@ object DumpLogSegments {
 
   if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
 print(" sequence: " + record.sequence + " headerKeys: " + 
record.headers.map(_.key).mkString("[", ",", "]"))
-  } else {
-print(s" crc: ${record.checksumOrNull} isvalid: 
${record.isValid}")
+  }
+  record match {
+case r: AbstractLegacyRecordBatch => print(s" crc: 
${r.checksum}}")

Review comment:
   Shall we include the `isValid` part here too?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.

2021-04-15 Thread GitBox


satishd commented on a change in pull request #10271:
URL: https://github.com/apache/kafka/pull/10271#discussion_r614066999



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serde/RemoteLogSegmentMetadataSerde.java
##
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage.serde;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Message;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class RemoteLogSegmentMetadataSerde implements 
RemoteLogMetadataSerdes {
+
+public Message serialize(RemoteLogSegmentMetadata data) {
+RemoteLogSegmentMetadataRecord record = new 
RemoteLogSegmentMetadataRecord()
+.setRemoteLogSegmentId(
+new 
RemoteLogSegmentMetadataRecord.RemoteLogSegmentIdEntry()

Review comment:
   +1 to extract them to a different method for better readability, updated 
in the latest commit. 
   
   But RemoteLogSegmentIdEntry is created respectively in 
RemoteLogSegmentRecord and RemoteogSegmentUpdateRecord and they can not be 
shared now. Is there a way to create a type and import/include it in message 
definitions? 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.

2021-04-15 Thread GitBox


satishd commented on a change in pull request #10271:
URL: https://github.com/apache/kafka/pull/10271#discussion_r614064785



##
File path: storage/src/main/resources/message/RemoteLogSegmentMetadata.json
##
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 0,
+  "type": "data",
+  "name": "RemoteLogSegmentMetadataRecord",
+  "validVersions": "0",
+  "flexibleVersions": "none",

Review comment:
   Good point,  I will add that. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.

2021-04-15 Thread GitBox


satishd commented on a change in pull request #10271:
URL: https://github.com/apache/kafka/pull/10271#discussion_r614031843



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serde/RemoteLogMetadataContextSerde.java
##
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage.serde;
+
+import 
org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataContext;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Message;
+import org.apache.kafka.common.protocol.ObjectSerializationCache;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecordUpdate;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemotePartitionDeleteMetadataRecord;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Serialization and deserialization for {@link RemoteLogMetadataContext}. 
This is the root serdes for the messages
+ * that are stored in internal remote log metadata topic.
+ */
+public class RemoteLogMetadataContextSerde implements 
Serde {
+
+public static final byte REMOTE_LOG_SEGMENT_METADATA_API_KEY = (byte) new 
RemoteLogSegmentMetadataRecord().apiKey();
+public static final byte REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY = 
(byte) new RemoteLogSegmentMetadataRecordUpdate().apiKey();
+public static final byte REMOTE_PARTITION_DELETE_API_KEY = (byte) new 
RemotePartitionDeleteMetadataRecord().apiKey();
+
+private final Map keyWithSerde;
+private final Deserializer rootDeserializer;
+private final Serializer rootSerializer;
+
+public RemoteLogMetadataContextSerde() {
+keyWithSerde = createInternalSerde();
+rootSerializer = (topic, data) -> serialize(data);
+rootDeserializer = (topic, data) -> deserialize(data);
+}
+
+private Map createInternalSerde() {
+Map serdes = new HashMap<>();
+serdes.put(REMOTE_LOG_SEGMENT_METADATA_API_KEY, new 
RemoteLogSegmentMetadataSerde());
+serdes.put(REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY, new 
RemoteLogSegmentMetadataUpdateSerde());
+serdes.put(REMOTE_PARTITION_DELETE_API_KEY, new 
RemotePartitionDeleteMetadataSerde());
+return serdes;
+}
+
+private byte[] serialize(RemoteLogMetadataContext 
remoteLogMetadataContext) {
+RemoteLogMetadataSerdes serDe = 
keyWithSerde.get(remoteLogMetadataContext.apiKey());
+if (serDe == null) {
+throw new IllegalArgumentException("Serializer for apikey: " + 
remoteLogMetadataContext.apiKey() +
+   " does not exist.");
+}
+
+@SuppressWarnings("unchecked")
+Message message = serDe.serialize(remoteLogMetadataContext.payload());
+
+return transformToBytes(message, remoteLogMetadataContext.apiKey(), 
remoteLogMetadataContext.version());
+}
+
+private RemoteLogMetadataContext deserialize(byte[] data) {
+ByteBuffer byteBuffer = ByteBuffer.wrap(data);
+byte apiKey = byteBuffer.get();
+byte version = byteBuffer.get();
+RemoteLogMetadataSerdes serDe = keyWithSerde.get(apiKey);
+if (serDe == null) {
+throw new IllegalArgumentException("Deserializer for apikey: " + 
apiKey + " does not exist.");
+}
+
+Object deserializedObj = serDe.deserialize(version, byteBuffer);
+return new RemoteLogMetadataContext(apiKey, version, deserializedObj);
+}
+
+private byte[] transformToBytes(Message message, byte apiKey, byte 
apiVersion) {

Review comment:
   I am fine with changing `transformToBytes` to `toBytes`.  But I prefer 
to keep `serialize` and `deserialize` as they are the respective 
implementations of serializer and deserializer. 




-- 
This is an automated 

[GitHub] [kafka] satishd commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.

2021-04-15 Thread GitBox


satishd commented on a change in pull request #10271:
URL: https://github.com/apache/kafka/pull/10271#discussion_r614004395



##
File path: 
storage/src/main/resources/message/RemoteLogSegmentMetadataUpdate.json
##
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 1,
+  "type": "data",
+  "name": "RemoteLogSegmentMetadataRecordUpdate",
+  "validVersions": "0",
+  "flexibleVersions": "none",
+  "fields": [
+{
+  "name": "RemoteLogSegmentId",
+  "type": "RemoteLogSegmentIdEntry",
+  "versions": "0+",
+  "about": "Unique representation of the remote log segment",
+  "fields": [
+{
+  "name": "TopicIdPartition",
+  "type": "TopicIdPartitionEntry",
+  "versions": "0+",
+  "about": "Represents unique topic partition",
+  "fields": [
+{
+  "name": "Name",
+  "type": "string",
+  "versions": "0+",
+  "about": "Topic name"
+},
+{
+  "name": "Id",
+  "type": "uuid",
+  "versions": "0+",
+  "about": "Unique identifier of the topic"
+},
+{
+  "name": "Partition",
+  "type": "int32",
+  "versions": "0+",
+  "about": "Partition number"
+}
+  ]
+},
+{
+  "name": "Id",
+  "type": "uuid",
+  "versions": "0+",
+  "about": "Unique identifier of the remote log segment"
+}
+  ]
+},
+{
+  "name": "BrokerId",

Review comment:
   Right, we will be updating the KIP with this change.

##
File path: storage/src/main/resources/message/RemotePartitionDeleteMetadata.json
##
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 2,
+  "type": "data",
+  "name": "RemotePartitionDeleteMetadataRecord",
+  "validVersions": "0",
+  "flexibleVersions": "none",
+  "fields": [
+{
+  "name": "TopicIdPartition",
+  "type": "TopicIdPartitionEntry",
+  "versions": "0+",
+  "about": "Represents unique topic partition",
+  "fields": [
+{
+  "name": "Name",
+  "type": "string",
+  "versions": "0+",
+  "about": "Topic name"
+},
+{
+  "name": "Id",
+  "type": "uuid",
+  "versions": "0+",
+  "about": "Unique identifier of the topic"
+},
+{
+  "name": "Partition",
+  "type": "int32",
+  "versions": "0+",
+  "about": "Partition number"
+}
+  ]
+},
+{
+  "name": "BrokerId",

Review comment:
   Right, we will be updating the KIP with this change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.

2021-04-15 Thread GitBox


satishd commented on a change in pull request #10271:
URL: https://github.com/apache/kafka/pull/10271#discussion_r614002154



##
File path: storage/src/main/resources/message/RemoteLogSegmentMetadata.json
##
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 0,
+  "type": "data",
+  "name": "RemoteLogSegmentMetadataRecord",
+  "validVersions": "0",
+  "flexibleVersions": "none",
+  "fields": [
+{
+  "name": "RemoteLogSegmentId",
+  "type": "RemoteLogSegmentIdEntry",
+  "versions": "0+",
+  "about": "Unique representation of the remote log segment",
+  "fields": [
+{
+  "name": "TopicIdPartition",
+  "type": "TopicIdPartitionEntry",
+  "versions": "0+",
+  "about": "Represents unique topic partition",
+  "fields": [
+{
+  "name": "Name",
+  "type": "string",
+  "versions": "0+",
+  "about": "Topic name"
+},
+{
+  "name": "Id",
+  "type": "uuid",
+  "versions": "0+",
+  "about": "Unique identifier of the topic"
+},
+{
+  "name": "Partition",
+  "type": "int32",
+  "versions": "0+",
+  "about": "Partition number"
+}
+  ]
+},
+{
+  "name": "Id",
+  "type": "uuid",
+  "versions": "0+",
+  "about": "Unique identifier of the remote log segment"
+}
+  ]
+},
+{
+  "name": "StartOffset",
+  "type": "int64",
+  "versions": "0+",
+  "about": "Start offset  of the segment."
+},
+{
+  "name": "EndOffset",
+  "type": "int64",
+  "versions": "0+",
+  "about": "End offset  of the segment."
+},
+{
+  "name": "BrokerId",

Review comment:
   Right, we will be updating the KIP with this change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10541: KAFKA-12284: increase request timeout to make tests reliable

2021-04-15 Thread GitBox


showuon commented on pull request #10541:
URL: https://github.com/apache/kafka/pull/10541#issuecomment-820346080


   Need to rework. Close it first.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon closed pull request #10541: KAFKA-12284: increase request timeout to make tests reliable

2021-04-15 Thread GitBox


showuon closed pull request #10541:
URL: https://github.com/apache/kafka/pull/10541


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12635) Mirrormaker 2 offset sync is incorrect if the target partition is empty

2021-04-15 Thread Alexis Polyzos (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17322092#comment-17322092
 ] 

Alexis Polyzos commented on KAFKA-12635:


[~yangguo1220] thanks for the reply! We tried your suggestion but unfortunately 
it didn't work. We created the consumer group with the 
kafka-console-consumer.sh and offset = 0. However, when the MM2 started the 
CheckpointConnector the offsets were translated poorly and became negative 
again.

> Mirrormaker 2 offset sync is incorrect if the target partition is empty
> ---
>
> Key: KAFKA-12635
> URL: https://issues.apache.org/jira/browse/KAFKA-12635
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.7.0
>Reporter: Frank Yi
>Assignee: Ning Zhang
>Priority: Major
>
> This bug occurs when using Mirrormaker with "sync.group.offsets.enabled = 
> true".
> If a source partition is empty, but the source consumer group's offset for 
> that partition is non-zero, then Mirrormaker sets the target consumer group's 
> offset for that partition to the literal, not translated, offset of the 
> source consumer group. This state can be reached if the source consumer group 
> consumed some records that were now deleted (like by a retention policy), or 
> if Mirrormaker replication is set to start at "latest". This bug causes the 
> target consumer group's lag for that partition to be negative and breaks 
> offset sync for that partition until lag is positive.
> The correct behavior when the source partition is empty would be to set the 
> target offset to the translated offset, not literal offset, which in this 
> case would always be 0. 
> Original email thread on this issue: 
> https://lists.apache.org/thread.html/r7c54ee5f57227367b911d4abffa72781772d8dd3b72d75eb65ee19f7%40%3Cusers.kafka.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-12662) add unit test for ProducerPerformance

2021-04-15 Thread Chun-Hao Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chun-Hao Tang reassigned KAFKA-12662:
-

Assignee: Chun-Hao Tang

> add unit test for ProducerPerformance
> -
>
> Key: KAFKA-12662
> URL: https://issues.apache.org/jira/browse/KAFKA-12662
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chun-Hao Tang
>Priority: Major
>
> ProducerPerformance is a useful tool which offers an official way to test 
> produce performance. Hence, it would be better to add enough tests for it. 
> (In fact, it has no unit tests currently).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mimaison commented on pull request #9549: KIP-145: Add SMTs, HeaderFrom, DropHeaders and InsertHeader

2021-04-15 Thread GitBox


mimaison commented on pull request #9549:
URL: https://github.com/apache/kafka/pull/9549#issuecomment-820299395


   Not sure why the build had failed, I've rekicked it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #9549: KIP-145: Add SMTs, HeaderFrom, DropHeaders and InsertHeader

2021-04-15 Thread GitBox


mimaison commented on a change in pull request #9549:
URL: https://github.com/apache/kafka/pull/9549#discussion_r613931955



##
File path: 
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java
##
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.cache.Cache;
+import org.apache.kafka.common.cache.LRUCache;
+import org.apache.kafka.common.cache.SynchronizedCache;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.util.NonEmptyListValidator;
+import org.apache.kafka.connect.transforms.util.Requirements;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.lang.String.format;
+import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
+
+public abstract class HeaderFrom> implements 
Transformation {
+
+public static final String FIELDS_FIELD = "fields";
+public static final String HEADERS_FIELD = "headers";
+public static final String OPERATION_FIELD = "operation";
+private static final String MOVE_OPERATION = "move";
+private static final String COPY_OPERATION = "copy";
+
+public static final String OVERVIEW_DOC =
+"Moves or copies fields in the key/value of a record into that 
record's headers. " +
+"Corresponding elements of " + FIELDS_FIELD + 
" and " +
+"" + HEADERS_FIELD + " together identify a 
field and the header it should be " +
+"moved or copied to. " +
+"Use the concrete transformation type designed for the 
record " +
+"key (" + Key.class.getName() + ") or value 
(" + Value.class.getName() + ").";
+
+public static final ConfigDef CONFIG_DEF = new ConfigDef()
+.define(FIELDS_FIELD, ConfigDef.Type.LIST,
+NO_DEFAULT_VALUE, new NonEmptyListValidator(),
+ConfigDef.Importance.HIGH,
+"Field names in the record whose values are to be copied 
or moved to headers.")
+.define(HEADERS_FIELD, ConfigDef.Type.LIST,
+NO_DEFAULT_VALUE, new NonEmptyListValidator(),
+ConfigDef.Importance.HIGH,
+"Header names, in the same order as the field names listed 
in the fields configuration property.")
+.define(OPERATION_FIELD, ConfigDef.Type.STRING, NO_DEFAULT_VALUE,
+ConfigDef.ValidString.in(MOVE_OPERATION, COPY_OPERATION), 
ConfigDef.Importance.HIGH,
+"Either move if the fields are to be moved to 
the headers (removed from the key/value), " +
+"or copy if the fields are to be 
copied to the headers (retained in the key/value).");
+
+enum Operation {
+MOVE(MOVE_OPERATION),
+COPY(COPY_OPERATION);
+
+private final String name;
+
+Operation(String name) {
+this.name = name;
+}
+
+static Operation fromName(String name) {
+switch (name) {
+case MOVE_OPERATION:
+return MOVE;
+case COPY_OPERATION:
+return COPY;
+default:
+throw new IllegalArgumentException();

Review comment:
   Ok, that's fair enough. Thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on a change in pull request #9549: KIP-145: Add SMTs, HeaderFrom, DropHeaders and InsertHeader

2021-04-15 Thread GitBox


tombentley commented on a change in pull request #9549:
URL: https://github.com/apache/kafka/pull/9549#discussion_r613926107



##
File path: 
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java
##
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.cache.Cache;
+import org.apache.kafka.common.cache.LRUCache;
+import org.apache.kafka.common.cache.SynchronizedCache;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.util.NonEmptyListValidator;
+import org.apache.kafka.connect.transforms.util.Requirements;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.lang.String.format;
+import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
+
+public abstract class HeaderFrom> implements 
Transformation {
+
+public static final String FIELDS_FIELD = "fields";
+public static final String HEADERS_FIELD = "headers";
+public static final String OPERATION_FIELD = "operation";
+private static final String MOVE_OPERATION = "move";
+private static final String COPY_OPERATION = "copy";
+
+public static final String OVERVIEW_DOC =
+"Moves or copies fields in the key/value of a record into that 
record's headers. " +
+"Corresponding elements of " + FIELDS_FIELD + 
" and " +
+"" + HEADERS_FIELD + " together identify a 
field and the header it should be " +
+"moved or copied to. " +
+"Use the concrete transformation type designed for the 
record " +
+"key (" + Key.class.getName() + ") or value 
(" + Value.class.getName() + ").";
+
+public static final ConfigDef CONFIG_DEF = new ConfigDef()
+.define(FIELDS_FIELD, ConfigDef.Type.LIST,
+NO_DEFAULT_VALUE, new NonEmptyListValidator(),
+ConfigDef.Importance.HIGH,
+"Field names in the record whose values are to be copied 
or moved to headers.")
+.define(HEADERS_FIELD, ConfigDef.Type.LIST,
+NO_DEFAULT_VALUE, new NonEmptyListValidator(),
+ConfigDef.Importance.HIGH,
+"Header names, in the same order as the field names listed 
in the fields configuration property.")
+.define(OPERATION_FIELD, ConfigDef.Type.STRING, NO_DEFAULT_VALUE,
+ConfigDef.ValidString.in(MOVE_OPERATION, COPY_OPERATION), 
ConfigDef.Importance.HIGH,
+"Either move if the fields are to be moved to 
the headers (removed from the key/value), " +
+"or copy if the fields are to be 
copied to the headers (retained in the key/value).");
+
+enum Operation {
+MOVE(MOVE_OPERATION),
+COPY(COPY_OPERATION);
+
+private final String name;
+
+Operation(String name) {
+this.name = name;
+}
+
+static Operation fromName(String name) {
+switch (name) {
+case MOVE_OPERATION:
+return MOVE;
+case COPY_OPERATION:
+return COPY;
+default:
+throw new IllegalArgumentException();

Review comment:
   It is impossible due to the `ConfigDef.ValidString.in(MOVE_OPERATION, 
COPY_OPERATION)`, so this is really an assertion failure. The line number in 
the stacktrace would be enough to track it down if it ever did happen due to a 
later refactoring, so imho an error message is of no value. But I'm happy 

[GitHub] [kafka] mimaison commented on a change in pull request #9549: KIP-145: Add SMTs, HeaderFrom, DropHeaders and InsertHeader

2021-04-15 Thread GitBox


mimaison commented on a change in pull request #9549:
URL: https://github.com/apache/kafka/pull/9549#discussion_r613913901



##
File path: 
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java
##
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.cache.Cache;
+import org.apache.kafka.common.cache.LRUCache;
+import org.apache.kafka.common.cache.SynchronizedCache;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.util.NonEmptyListValidator;
+import org.apache.kafka.connect.transforms.util.Requirements;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.lang.String.format;
+import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
+
+public abstract class HeaderFrom> implements 
Transformation {
+
+public static final String FIELDS_FIELD = "fields";
+public static final String HEADERS_FIELD = "headers";
+public static final String OPERATION_FIELD = "operation";
+private static final String MOVE_OPERATION = "move";
+private static final String COPY_OPERATION = "copy";
+
+public static final String OVERVIEW_DOC =
+"Moves or copies fields in the key/value of a record into that 
record's headers. " +
+"Corresponding elements of " + FIELDS_FIELD + 
" and " +
+"" + HEADERS_FIELD + " together identify a 
field and the header it should be " +
+"moved or copied to. " +
+"Use the concrete transformation type designed for the 
record " +
+"key (" + Key.class.getName() + ") or value 
(" + Value.class.getName() + ").";
+
+public static final ConfigDef CONFIG_DEF = new ConfigDef()
+.define(FIELDS_FIELD, ConfigDef.Type.LIST,
+NO_DEFAULT_VALUE, new NonEmptyListValidator(),
+ConfigDef.Importance.HIGH,
+"Field names in the record whose values are to be copied 
or moved to headers.")
+.define(HEADERS_FIELD, ConfigDef.Type.LIST,
+NO_DEFAULT_VALUE, new NonEmptyListValidator(),
+ConfigDef.Importance.HIGH,
+"Header names, in the same order as the field names listed 
in the fields configuration property.")
+.define(OPERATION_FIELD, ConfigDef.Type.STRING, NO_DEFAULT_VALUE,
+ConfigDef.ValidString.in(MOVE_OPERATION, COPY_OPERATION), 
ConfigDef.Importance.HIGH,
+"Either move if the fields are to be moved to 
the headers (removed from the key/value), " +
+"or copy if the fields are to be 
copied to the headers (retained in the key/value).");
+
+enum Operation {
+MOVE(MOVE_OPERATION),
+COPY(COPY_OPERATION);
+
+private final String name;
+
+Operation(String name) {
+this.name = name;
+}
+
+static Operation fromName(String name) {
+switch (name) {
+case MOVE_OPERATION:
+return MOVE;
+case COPY_OPERATION:
+return COPY;
+default:
+throw new IllegalArgumentException();

Review comment:
   Even though I don't think it's reachable by users, should we have a 
message here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact 

[GitHub] [kafka] kowshik commented on a change in pull request #10478: KAFKA-12553: Refactor recovery logic to introduce LogLoader

2021-04-15 Thread GitBox


kowshik commented on a change in pull request #10478:
URL: https://github.com/apache/kafka/pull/10478#discussion_r613795734



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -255,19 +261,21 @@ case object SnapshotGenerated extends 
LogStartOffsetIncrementReason {
 @threadsafe
 class Log(@volatile private var _dir: File,
   @volatile var config: LogConfig,
+  val segments: LogSegments,

Review comment:
   I took a stab at this. Things changed a bit after I did the refactoring 
that Jun suggested 
[here](https://github.com/apache/kafka/pull/10478#discussion_r610933966). In 
the latest version of the PR, `LogComponents` has been renamed to a class 
called `LoadedLogOffsets` which now only contains (logStartOffset, 
recoveryPoint, nextOffsetMetadata). It seemed reasonable to just pass these in 
as 3 separate parameters into `Log`, since the `LeaderEpochFileCache` and 
`ProducerStateManager` are no longer inside the `LoadedLogOffsets` object those 
have to anyway be passed in separately.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.

2021-04-15 Thread GitBox


kowshik commented on a change in pull request #10271:
URL: https://github.com/apache/kafka/pull/10271#discussion_r613882988



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serde/RemoteLogMetadataContextSerde.java
##
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage.serde;
+
+import 
org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataContext;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Message;
+import org.apache.kafka.common.protocol.ObjectSerializationCache;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecordUpdate;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemotePartitionDeleteMetadataRecord;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Serialization and deserialization for {@link RemoteLogMetadataContext}. 
This is the root serdes for the messages
+ * that are stored in internal remote log metadata topic.
+ */
+public class RemoteLogMetadataContextSerde implements 
Serde {
+
+public static final byte REMOTE_LOG_SEGMENT_METADATA_API_KEY = (byte) new 
RemoteLogSegmentMetadataRecord().apiKey();
+public static final byte REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY = 
(byte) new RemoteLogSegmentMetadataRecordUpdate().apiKey();
+public static final byte REMOTE_PARTITION_DELETE_API_KEY = (byte) new 
RemotePartitionDeleteMetadataRecord().apiKey();
+
+private final Map keyWithSerde;
+private final Deserializer rootDeserializer;
+private final Serializer rootSerializer;
+
+public RemoteLogMetadataContextSerde() {
+keyWithSerde = createInternalSerde();
+rootSerializer = (topic, data) -> serialize(data);
+rootDeserializer = (topic, data) -> deserialize(data);
+}
+
+private Map createInternalSerde() {
+Map serdes = new HashMap<>();
+serdes.put(REMOTE_LOG_SEGMENT_METADATA_API_KEY, new 
RemoteLogSegmentMetadataSerde());
+serdes.put(REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY, new 
RemoteLogSegmentMetadataUpdateSerde());
+serdes.put(REMOTE_PARTITION_DELETE_API_KEY, new 
RemotePartitionDeleteMetadataSerde());
+return serdes;
+}
+
+private byte[] serialize(RemoteLogMetadataContext 
remoteLogMetadataContext) {

Review comment:
   The `ProducerRecord` can hold a key and a value. It seems like we could 
store the API key in `ProducerRecord.key()` and store the serialized payload in 
`ProducerRecord.value()`. Why not take that route instead of serializing to a 
single byte array here containing (apiKey, version, payload)? 
   
   
https://kafka.apache.org/26/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
   
   This is of course assuming that `RemoteLogMetadataContextSerde` will only be 
used for serializing/deserializing events to/from a Kafka topic (as the class 
doc suggests).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >