[jira] [Updated] (KAFKA-12676) Improve sticky general assignor underlying algorithm for the imbalanced case
[ https://issues.apache.org/jira/browse/KAFKA-12676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-12676: -- Description: As discussed in KAFKA-12675, we think the general assignor algorithm might be able to improve to make some edge cases more balanced, and improve the scalability and performance. Ref: The algorithm is here: [https://github.com/twmb/franz-go/blob/master/pkg/kgo/internal|https://github.com/twmb/franz-go/blob/master/pkg/kgo/internal/sticky/graph.go] [https://issues.apache.org/jira/browse/KAFKA-12675?focusedCommentId=17322603=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17322603/sticky/graph.go|https://github.com/twmb/franz-go/blob/master/pkg/kgo/internal/sticky/graph.go] was: As discussed in KAFKA-12675, we think the general assignor algorithm might be able to improve to make some edge cases more balanced, and improve the scalability and performance. Ref: The algorithm is here: [https://github.com/twmb/franz-go/blob/master/pkg/kgo/internal|https://github.com/twmb/franz-go/blob/master/pkg/kgo/internal/sticky/graph.go] [The algorithm is here: |https://github.com/twmb/franz-go/blob/master/pkg/kgo/internal/sticky/graph.go] [https://github.com/twmb/franz-go/blob/master/pkg/kgo/internal/sticky/graph.go] [/sticky/graph.go|https://github.com/twmb/franz-go/blob/master/pkg/kgo/internal/sticky/graph.go] > Improve sticky general assignor underlying algorithm for the imbalanced case > > > Key: KAFKA-12676 > URL: https://issues.apache.org/jira/browse/KAFKA-12676 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Priority: Major > > As discussed in KAFKA-12675, we think the general assignor algorithm might be > able to improve to make some edge cases more balanced, and improve the > scalability and performance. > Ref: The algorithm is here: > [https://github.com/twmb/franz-go/blob/master/pkg/kgo/internal|https://github.com/twmb/franz-go/blob/master/pkg/kgo/internal/sticky/graph.go] > > [https://issues.apache.org/jira/browse/KAFKA-12675?focusedCommentId=17322603=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17322603/sticky/graph.go|https://github.com/twmb/franz-go/blob/master/pkg/kgo/internal/sticky/graph.go] > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12675) Improve sticky general assignor scalability and performance
[ https://issues.apache.org/jira/browse/KAFKA-12675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17322608#comment-17322608 ] Luke Chen commented on KAFKA-12675: --- [~twmb], I see. I've created another ticket KAFKA-12676 to address your suggestion. We can make incremental improvement for this. Thanks for suggestion! > Improve sticky general assignor scalability and performance > --- > > Key: KAFKA-12675 > URL: https://issues.apache.org/jira/browse/KAFKA-12675 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > Currently, we have "general assignor" for non-equal subscription case and > "constrained assignor" for all equal subscription case. There's a performance > test for constrained assignor with: > topicCount = {color:#ff}500{color}; > partitionCount = {color:#ff}2000{color}; > consumerCount = {color:#ff}2000{color}; > in _testLargeAssignmentAndGroupWithUniformSubscription,_ total 1 million > partitions and we can complete the assignment within 2 second in my machine. > However, if we let 1 of the consumer subscribe to only 1 topic, it'll use > "general assignor", and the result with the same setting as above is: > *OutOfMemory,* > Even we down the count to: > topicCount = {color:#ff}50{color}; > partitionCount = 1{color:#ff}000{color}; > consumerCount = 1{color:#ff}000{color}; > We still got *OutOfMemory*. > With this setting: > topicCount = {color:#ff}50{color}; > partitionCount = 8{color:#ff}00{color}; > consumerCount = 8{color:#ff}00{color}; > We can complete in 10 seconds in my machine, which is still slow. > > Since we are going to set default assignment strategy to > "CooperativeStickyAssignor" soon, we should improve the scalability and > performance for sticky general assignor. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12676) Improve sticky general assignor underlying algorithm for the imbalanced case
Luke Chen created KAFKA-12676: - Summary: Improve sticky general assignor underlying algorithm for the imbalanced case Key: KAFKA-12676 URL: https://issues.apache.org/jira/browse/KAFKA-12676 Project: Kafka Issue Type: Improvement Reporter: Luke Chen As discussed in KAFKA-12675, we think the general assignor algorithm might be able to improve to make some edge cases more balanced, and improve the scalability and performance. Ref: The algorithm is here: [https://github.com/twmb/franz-go/blob/master/pkg/kgo/internal|https://github.com/twmb/franz-go/blob/master/pkg/kgo/internal/sticky/graph.go] [The algorithm is here: |https://github.com/twmb/franz-go/blob/master/pkg/kgo/internal/sticky/graph.go] [https://github.com/twmb/franz-go/blob/master/pkg/kgo/internal/sticky/graph.go] [/sticky/graph.go|https://github.com/twmb/franz-go/blob/master/pkg/kgo/internal/sticky/graph.go] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-12675) Improve sticky general assignor scalability and performance
[ https://issues.apache.org/jira/browse/KAFKA-12675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17322603#comment-17322603 ] Travis Bischel edited comment on KAFKA-12675 at 4/16/21, 4:25 AM: -- What I mean to say is that the logic powering the existing sticky algorithm is heuristic and not truly balanced, and that the logic itself can be changed to be more exact to the sticky goals while being much more efficient. That is, changes can be made for the imbalanced case similar to how [~ableegoldman] made changes to the balanced case (for KAFKA-9987), and these changes will more exactly fulfill the goal of sticky balancing while being more efficient. This does not change how things are balanced / it does not change the actual sticky aspect. Basically, improving the underlying algorithm for the imbalanced case directly fulfills the goals of this ticket to improve the scalability and performance. For some numbers: {noformat} $ go test -run nothing -bench Java -benchmem -v -benchtime 60s goos: darwin goarch: amd64 pkg: github.com/twmb/franz-go/pkg/kgo/internal/sticky cpu: Intel(R) Core(TM) i7-1068NG7 CPU @ 2.30GHz BenchmarkJava BenchmarkJava/large sticky_test.go:1419: avg 2.696451608s per 1 balances of 2000 members and 100 total partitions sticky_test.go:1419: avg 2.573368814s per 26 balances of 2000 members and 100 total partitions BenchmarkJava/large-8 262573370133 ns/op 531478500 B/op 1038983 allocs/op BenchmarkJava/large_imbalance sticky_test.go:1419: avg 13.798672936s per 1 balances of 2001 members and 100 total partitions sticky_test.go:1419: avg 9.581320518s per 4 balances of 2001 members and 100 total partitions sticky_test.go:1419: avg 9.626729812s per 7 balances of 2001 members and 100 total partitions BenchmarkJava/large_imbalance-879626739151 ns/op8535692965 B/op 1039100 allocs/op BenchmarkJava/medium sticky_test.go:1419: avg 77.798053ms per 1 balances of 1000 members and 5 total partitions sticky_test.go:1419: avg 72.271454ms per 100 balances of 1000 members and 5 total partitions sticky_test.go:1419: avg 72.044377ms per 996 balances of 1000 members and 5 total partitions BenchmarkJava/medium-8 996 72044411 ns/op22502623 B/op 56085 allocs/op BenchmarkJava/medium_imbalance sticky_test.go:1419: avg 216.340842ms per 1 balances of 1001 members and 5 total partitions sticky_test.go:1419: avg 217.385765ms per 100 balances of 1001 members and 5 total partitions sticky_test.go:1419: avg 218.218478ms per 331 balances of 1001 members and 5 total partitions BenchmarkJava/medium_imbalance-8 331 218218666 ns/op222795358 B/op 56097 allocs/op BenchmarkJava/small sticky_test.go:1419: avg 52.22238ms per 1 balances of 800 members and 4 total partitions sticky_test.go:1419: avg 50.190192ms per 100 balances of 800 members and 4 total partitions sticky_test.go:1419: avg 50.252975ms per 1434 balances of 800 members and 4 total partitions BenchmarkJava/small-8 1434 50253022 ns/op18823337 B/op 44906 allocs/op BenchmarkJava/small_imbalance sticky_test.go:1419: avg 149.416236ms per 1 balances of 801 members and 4 total partitions sticky_test.go:1419: avg 149.050743ms per 100 balances of 801 members and 4 total partitions sticky_test.go:1419: avg 149.224721ms per 482 balances of 801 members and 4 total partitions BenchmarkJava/small_imbalance-8 482 149224854 ns/op147060761 B/op 44914 allocs/op {noformat} I've just pushed the code for this benchmark in [this commit|https://github.com/twmb/franz-go/commit/e0c960e094e8f100924411f6c5fb514b79fc761a]. This is still clearly using a decent amount of memory (up to 8G in the imbalanced case), but I spent a good amount of time already optimizing how much memory this can consume. I'm sure I can lose some speed to drop some memory usage. But, as it stands, 1mil partitions and 2000 members just has to take up some memory. What I mean by heuristic is that the general assignor does not really have much reasoning for what it is doing, it just somewhat tries a bunch of things and then says "good enough" at a certain point. It also tries a bunch of things, checks to see if what it did got a better balance score, and if not, discards what it tried. This can be seen on lines 642 and 643 of [AbstractStickyAssignor.java|https://github.com/apache/kafka/blob/637c44c976c115b7e770a6fd9e62e8822051b45b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java#L642-L643]: {noformat} // if we are not preserving existing assignments
[jira] [Comment Edited] (KAFKA-12675) Improve sticky general assignor scalability and performance
[ https://issues.apache.org/jira/browse/KAFKA-12675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17322603#comment-17322603 ] Travis Bischel edited comment on KAFKA-12675 at 4/16/21, 4:24 AM: -- What I mean to say is that the logic powering the existing cooperative-sticky algorithm is heuristic and not truly balanced, and that the logic itself can be changed to be more exact to the cooperative-sticky goals while being much more efficient. That is, changes can be made for the imbalanced case similar to how [~ableegoldman] made changes to the balanced case (for KAFKA-9987), and these changes will more exactly fulfill the goal of cooperative sticky while being more efficient. This does not change how things are balanced / it does not change the actual sticky aspect. Basically, improving the underlying algorithm for the imbalanced case directly fulfills the goals of this ticket to improve the scalability and performance. For some numbers: {noformat} $ go test -run nothing -bench Java -benchmem -v -benchtime 60s goos: darwin goarch: amd64 pkg: github.com/twmb/franz-go/pkg/kgo/internal/sticky cpu: Intel(R) Core(TM) i7-1068NG7 CPU @ 2.30GHz BenchmarkJava BenchmarkJava/large sticky_test.go:1419: avg 2.696451608s per 1 balances of 2000 members and 100 total partitions sticky_test.go:1419: avg 2.573368814s per 26 balances of 2000 members and 100 total partitions BenchmarkJava/large-8 262573370133 ns/op 531478500 B/op 1038983 allocs/op BenchmarkJava/large_imbalance sticky_test.go:1419: avg 13.798672936s per 1 balances of 2001 members and 100 total partitions sticky_test.go:1419: avg 9.581320518s per 4 balances of 2001 members and 100 total partitions sticky_test.go:1419: avg 9.626729812s per 7 balances of 2001 members and 100 total partitions BenchmarkJava/large_imbalance-879626739151 ns/op8535692965 B/op 1039100 allocs/op BenchmarkJava/medium sticky_test.go:1419: avg 77.798053ms per 1 balances of 1000 members and 5 total partitions sticky_test.go:1419: avg 72.271454ms per 100 balances of 1000 members and 5 total partitions sticky_test.go:1419: avg 72.044377ms per 996 balances of 1000 members and 5 total partitions BenchmarkJava/medium-8 996 72044411 ns/op22502623 B/op 56085 allocs/op BenchmarkJava/medium_imbalance sticky_test.go:1419: avg 216.340842ms per 1 balances of 1001 members and 5 total partitions sticky_test.go:1419: avg 217.385765ms per 100 balances of 1001 members and 5 total partitions sticky_test.go:1419: avg 218.218478ms per 331 balances of 1001 members and 5 total partitions BenchmarkJava/medium_imbalance-8 331 218218666 ns/op222795358 B/op 56097 allocs/op BenchmarkJava/small sticky_test.go:1419: avg 52.22238ms per 1 balances of 800 members and 4 total partitions sticky_test.go:1419: avg 50.190192ms per 100 balances of 800 members and 4 total partitions sticky_test.go:1419: avg 50.252975ms per 1434 balances of 800 members and 4 total partitions BenchmarkJava/small-8 1434 50253022 ns/op18823337 B/op 44906 allocs/op BenchmarkJava/small_imbalance sticky_test.go:1419: avg 149.416236ms per 1 balances of 801 members and 4 total partitions sticky_test.go:1419: avg 149.050743ms per 100 balances of 801 members and 4 total partitions sticky_test.go:1419: avg 149.224721ms per 482 balances of 801 members and 4 total partitions BenchmarkJava/small_imbalance-8 482 149224854 ns/op147060761 B/op 44914 allocs/op {noformat} I've just pushed the code for this benchmark in [this commit|https://github.com/twmb/franz-go/commit/e0c960e094e8f100924411f6c5fb514b79fc761a]. This is still clearly using a decent amount of memory (up to 8G in the imbalanced case), but I spent a good amount of time already optimizing how much memory this can consume. I'm sure I can lose some speed to drop some memory usage. But, as it stands, 1mil partitions and 2000 members just has to take up some memory. What I mean by heuristic is that the general assignor does not really have much reasoning for what it is doing, it just somewhat tries a bunch of things and then says "good enough" at a certain point. It also tries a bunch of things, checks to see if what it did got a better balance score, and if not, discards what it tried. This can be seen on lines 642 and 643 of [AbstractStickyAssignor.java|https://github.com/apache/kafka/blob/637c44c976c115b7e770a6fd9e62e8822051b45b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java#L642-L643]: {noformat} // if we are not
[jira] [Comment Edited] (KAFKA-12675) Improve sticky general assignor scalability and performance
[ https://issues.apache.org/jira/browse/KAFKA-12675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17322603#comment-17322603 ] Travis Bischel edited comment on KAFKA-12675 at 4/16/21, 4:10 AM: -- What I mean to say is that the logic powering the existing cooperative-sticky algorithm is heuristic and not truly balanced, and that the logic itself can be changed to be more exact to the cooperative-sticky goals while being much more efficient. That is, changes can be made for the imbalanced case similar to how [~ableegoldman] made changes to the balanced case (for KAFKA-9987), and these changes will more exactly fulfill the goal of cooperative sticky while being more efficient. This does not change how things are balanced / it does not change the actual sticky aspect. Basically, improving the underlying algorithm for the imbalanced case directly fulfills the goals of this ticket to improve the scalability and performance. I'll edit this comment shortly with some benchmarking numbers. was (Author: twmb): What I mean to say is that the logic powering the existing cooperative-sticky algorithm is heuristic and not truly balanced, and that the logic itself can be changed to be more exact to the cooperative-sticky goals while being much more efficient. That is, changes can be made for the imbalanced case similar to how [~ableegoldman] made changes to the balanced case, and these changes will more exactly fulfill the goal of cooperative sticky while being more efficient. This does not change how things are balanced / it does not change the actual sticky aspect. Basically, improving the underlying algorithm for the imbalanced case directly fulfills the goals of this ticket to improve the scalability and performance. I'll edit this comment shortly with some benchmarking numbers. > Improve sticky general assignor scalability and performance > --- > > Key: KAFKA-12675 > URL: https://issues.apache.org/jira/browse/KAFKA-12675 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > Currently, we have "general assignor" for non-equal subscription case and > "constrained assignor" for all equal subscription case. There's a performance > test for constrained assignor with: > topicCount = {color:#ff}500{color}; > partitionCount = {color:#ff}2000{color}; > consumerCount = {color:#ff}2000{color}; > in _testLargeAssignmentAndGroupWithUniformSubscription,_ total 1 million > partitions and we can complete the assignment within 2 second in my machine. > However, if we let 1 of the consumer subscribe to only 1 topic, it'll use > "general assignor", and the result with the same setting as above is: > *OutOfMemory,* > Even we down the count to: > topicCount = {color:#ff}50{color}; > partitionCount = 1{color:#ff}000{color}; > consumerCount = 1{color:#ff}000{color}; > We still got *OutOfMemory*. > With this setting: > topicCount = {color:#ff}50{color}; > partitionCount = 8{color:#ff}00{color}; > consumerCount = 8{color:#ff}00{color}; > We can complete in 10 seconds in my machine, which is still slow. > > Since we are going to set default assignment strategy to > "CooperativeStickyAssignor" soon, we should improve the scalability and > performance for sticky general assignor. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12675) Improve sticky general assignor scalability and performance
[ https://issues.apache.org/jira/browse/KAFKA-12675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17322603#comment-17322603 ] Travis Bischel commented on KAFKA-12675: What I mean to say is that the logic powering the existing cooperative-sticky algorithm is heuristic and not truly balanced, and that the logic itself can be changed to be more exact to the cooperative-sticky goals while being much more efficient. That is, changes can be made for the imbalanced case similar to how [~ableegoldman] made changes to the balanced case, and these changes will more exactly fulfill the goal of cooperative sticky while being more efficient. This does not change how things are balanced / it does not change the actual sticky aspect. Basically, improving the underlying algorithm for the imbalanced case directly fulfills the goals of this ticket to improve the scalability and performance. I'll edit this comment shortly with some benchmarking numbers. > Improve sticky general assignor scalability and performance > --- > > Key: KAFKA-12675 > URL: https://issues.apache.org/jira/browse/KAFKA-12675 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > Currently, we have "general assignor" for non-equal subscription case and > "constrained assignor" for all equal subscription case. There's a performance > test for constrained assignor with: > topicCount = {color:#ff}500{color}; > partitionCount = {color:#ff}2000{color}; > consumerCount = {color:#ff}2000{color}; > in _testLargeAssignmentAndGroupWithUniformSubscription,_ total 1 million > partitions and we can complete the assignment within 2 second in my machine. > However, if we let 1 of the consumer subscribe to only 1 topic, it'll use > "general assignor", and the result with the same setting as above is: > *OutOfMemory,* > Even we down the count to: > topicCount = {color:#ff}50{color}; > partitionCount = 1{color:#ff}000{color}; > consumerCount = 1{color:#ff}000{color}; > We still got *OutOfMemory*. > With this setting: > topicCount = {color:#ff}50{color}; > partitionCount = 8{color:#ff}00{color}; > consumerCount = 8{color:#ff}00{color}; > We can complete in 10 seconds in my machine, which is still slow. > > Since we are going to set default assignment strategy to > "CooperativeStickyAssignor" soon, we should improve the scalability and > performance for sticky general assignor. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-12675) Improve sticky general assignor scalability and performance
[ https://issues.apache.org/jira/browse/KAFKA-12675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17322599#comment-17322599 ] Luke Chen edited comment on KAFKA-12675 at 4/16/21, 3:59 AM: - [~twmb], thanks for suggestion. But in this ticket, I'd like to focus on improving the scalability and performance first. I agree the whole algorithm can be improved, but that needs to go through KIP and have more discussion to be able to go on. Thanks anyway, I'll think about it. :) was (Author: showuon): [~twmb], thanks for suggestion. But in this ticket, I'd like to improve the scalability and performance first. I agree the whole algorithm can be improved, too, but that needs to go through KIP and have more discussion to be able to go on. Thanks anyway, I'll think about it. :) > Improve sticky general assignor scalability and performance > --- > > Key: KAFKA-12675 > URL: https://issues.apache.org/jira/browse/KAFKA-12675 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > Currently, we have "general assignor" for non-equal subscription case and > "constrained assignor" for all equal subscription case. There's a performance > test for constrained assignor with: > topicCount = {color:#ff}500{color}; > partitionCount = {color:#ff}2000{color}; > consumerCount = {color:#ff}2000{color}; > in _testLargeAssignmentAndGroupWithUniformSubscription,_ total 1 million > partitions and we can complete the assignment within 2 second in my machine. > However, if we let 1 of the consumer subscribe to only 1 topic, it'll use > "general assignor", and the result with the same setting as above is: > *OutOfMemory,* > Even we down the count to: > topicCount = {color:#ff}50{color}; > partitionCount = 1{color:#ff}000{color}; > consumerCount = 1{color:#ff}000{color}; > We still got *OutOfMemory*. > With this setting: > topicCount = {color:#ff}50{color}; > partitionCount = 8{color:#ff}00{color}; > consumerCount = 8{color:#ff}00{color}; > We can complete in 10 seconds in my machine, which is still slow. > > Since we are going to set default assignment strategy to > "CooperativeStickyAssignor" soon, we should improve the scalability and > performance for sticky general assignor. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12675) Improve sticky general assignor scalability and performance
[ https://issues.apache.org/jira/browse/KAFKA-12675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17322599#comment-17322599 ] Luke Chen commented on KAFKA-12675: --- [~twmb], thanks for suggestion. But in this ticket, I'd like to improve the scalability and performance first. I agree the whole algorithm can be improved, too, but that needs to go through KIP and have more discussion to be able to go on. Thanks anyway, I'll think about it. :) > Improve sticky general assignor scalability and performance > --- > > Key: KAFKA-12675 > URL: https://issues.apache.org/jira/browse/KAFKA-12675 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > Currently, we have "general assignor" for non-equal subscription case and > "constrained assignor" for all equal subscription case. There's a performance > test for constrained assignor with: > topicCount = {color:#ff}500{color}; > partitionCount = {color:#ff}2000{color}; > consumerCount = {color:#ff}2000{color}; > in _testLargeAssignmentAndGroupWithUniformSubscription,_ total 1 million > partitions and we can complete the assignment within 2 second in my machine. > However, if we let 1 of the consumer subscribe to only 1 topic, it'll use > "general assignor", and the result with the same setting as above is: > *OutOfMemory,* > Even we down the count to: > topicCount = {color:#ff}50{color}; > partitionCount = 1{color:#ff}000{color}; > consumerCount = 1{color:#ff}000{color}; > We still got *OutOfMemory*. > With this setting: > topicCount = {color:#ff}50{color}; > partitionCount = 8{color:#ff}00{color}; > consumerCount = 8{color:#ff}00{color}; > We can complete in 10 seconds in my machine, which is still slow. > > Since we are going to set default assignment strategy to > "CooperativeStickyAssignor" soon, we should improve the scalability and > performance for sticky general assignor. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma commented on pull request #10545: KAFKA-12672: Added config for raft testing server
ijuma commented on pull request #10545: URL: https://github.com/apache/kafka/pull/10545#issuecomment-820886172 Merged to trunk and cherry-picked to 2.8. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #10545: KAFKA-12672: Added config for raft testing server
ijuma merged pull request #10545: URL: https://github.com/apache/kafka/pull/10545 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins
mjsax commented on a change in pull request #10462: URL: https://github.com/apache/kafka/pull/10462#discussion_r614538992 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java ## @@ -486,6 +486,327 @@ public void testJoin() { } } +@Test +public void testImprovedLeftJoin() { +final StreamsBuilder builder = new StreamsBuilder(); + +final KStream stream1; +final KStream stream2; +final KStream joined; +final MockProcessorSupplier supplier = new MockProcessorSupplier<>(); + +stream1 = builder.stream(topic1, consumed); +stream2 = builder.stream(topic2, consumed); +joined = stream1.leftJoin( +stream2, +MockValueJoiner.TOSTRING_JOINER, +JoinWindows.of(ofMillis(100)).grace(ofMillis(0)), +StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())); +joined.process(supplier); + +try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { +final TestInputTopic inputTopic1 = +driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final TestInputTopic inputTopic2 = +driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final MockProcessor processor = supplier.theCapturedProcessor(); + +/* + * Verifies left non-joined records are emitted by a record processed in the left topic + */ + +long windowStart = 0; + +// No joins detected; No null-joins emitted +inputTopic1.pipeInput(0, "A0", windowStart + 1); +inputTopic1.pipeInput(1, "A1", windowStart + 2); +inputTopic1.pipeInput(0, "A0-0", windowStart + 3); +processor.checkAndClearProcessResult(); + +// Join detected; No null-joins emitted +inputTopic2.pipeInput(1, "a1", windowStart + 3); +processor.checkAndClearProcessResult( +new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3)); + +// Dummy record in left topic will emit expired non-joined records from the left topic +inputTopic1.pipeInput(2, "dummy", windowStart + 401); +processor.checkAndClearProcessResult( +new KeyValueTimestamp<>(0, "A0+null", windowStart + 1), +new KeyValueTimestamp<>(0, "A0-0+null", windowStart + 3)); + +// Flush internal non-joined state store by joining the dummy record +inputTopic2.pipeInput(2, "dummy", windowStart + 401); +processor.checkAndClearProcessResult( +new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 401)); + +/* + * Verifies left non-joined records are emitted by a record processed in the right topic + */ + +windowStart = windowStart + 301; + +// No joins detected; No null-joins emitted +inputTopic1.pipeInput(0, "A0", windowStart + 1); +inputTopic1.pipeInput(1, "A1", windowStart + 2); +inputTopic1.pipeInput(0, "A0-0", windowStart + 3); +processor.checkAndClearProcessResult(); + +// Join detected; No null-joins emitted +inputTopic2.pipeInput(1, "a1", windowStart + 3); +processor.checkAndClearProcessResult( +new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3)); + +// Dummy record in right topic will emit expired non-joined records from the left topic +inputTopic2.pipeInput(2, "dummy", windowStart + 401); +processor.checkAndClearProcessResult( +new KeyValueTimestamp<>(0, "A0+null", windowStart + 1), +new KeyValueTimestamp<>(0, "A0-0+null", windowStart + 3)); + +// Flush internal non-joined state store by joining the dummy record +inputTopic1.pipeInput(2, "dummy", windowStart + 402); +processor.checkAndClearProcessResult( +new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402)); + +/* + * Verifies right non-joined records are not emitted by record processed in the left topic + */ + +windowStart = windowStart + 401; + +// No joins detected; No null-joins emitted +inputTopic2.pipeInput(0, "A0", windowStart + 1); +inputTopic2.pipeInput(1, "A1", windowStart + 2); +inputTopic2.pipeInput(0, "A0-0", windowStart + 3); +processor.checkAndClearProcessResult(); + +// Join detected; No null-joins emitted +inputTopic1.pipeInput(1, "a1", windowStart + 3); +processor.checkAndClearProcessResult( +new
[GitHub] [kafka] mjsax commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins
mjsax commented on a change in pull request #10462: URL: https://github.com/apache/kafka/pull/10462#discussion_r614538557 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java ## @@ -486,6 +486,327 @@ public void testJoin() { } } +@Test +public void testImprovedLeftJoin() { +final StreamsBuilder builder = new StreamsBuilder(); + +final KStream stream1; +final KStream stream2; +final KStream joined; +final MockProcessorSupplier supplier = new MockProcessorSupplier<>(); + +stream1 = builder.stream(topic1, consumed); +stream2 = builder.stream(topic2, consumed); +joined = stream1.leftJoin( +stream2, +MockValueJoiner.TOSTRING_JOINER, +JoinWindows.of(ofMillis(100)).grace(ofMillis(0)), +StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())); +joined.process(supplier); + +try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { +final TestInputTopic inputTopic1 = +driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final TestInputTopic inputTopic2 = +driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final MockProcessor processor = supplier.theCapturedProcessor(); + +/* + * Verifies left non-joined records are emitted by a record processed in the left topic + */ + +long windowStart = 0; + +// No joins detected; No null-joins emitted +inputTopic1.pipeInput(0, "A0", windowStart + 1); +inputTopic1.pipeInput(1, "A1", windowStart + 2); +inputTopic1.pipeInput(0, "A0-0", windowStart + 3); +processor.checkAndClearProcessResult(); + +// Join detected; No null-joins emitted +inputTopic2.pipeInput(1, "a1", windowStart + 3); +processor.checkAndClearProcessResult( +new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3)); + +// Dummy record in left topic will emit expired non-joined records from the left topic +inputTopic1.pipeInput(2, "dummy", windowStart + 401); +processor.checkAndClearProcessResult( +new KeyValueTimestamp<>(0, "A0+null", windowStart + 1), +new KeyValueTimestamp<>(0, "A0-0+null", windowStart + 3)); + +// Flush internal non-joined state store by joining the dummy record +inputTopic2.pipeInput(2, "dummy", windowStart + 401); +processor.checkAndClearProcessResult( +new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 401)); + +/* + * Verifies left non-joined records are emitted by a record processed in the right topic + */ + +windowStart = windowStart + 301; + +// No joins detected; No null-joins emitted +inputTopic1.pipeInput(0, "A0", windowStart + 1); +inputTopic1.pipeInput(1, "A1", windowStart + 2); +inputTopic1.pipeInput(0, "A0-0", windowStart + 3); +processor.checkAndClearProcessResult(); + +// Join detected; No null-joins emitted +inputTopic2.pipeInput(1, "a1", windowStart + 3); +processor.checkAndClearProcessResult( +new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3)); + +// Dummy record in right topic will emit expired non-joined records from the left topic +inputTopic2.pipeInput(2, "dummy", windowStart + 401); +processor.checkAndClearProcessResult( +new KeyValueTimestamp<>(0, "A0+null", windowStart + 1), +new KeyValueTimestamp<>(0, "A0-0+null", windowStart + 3)); + +// Flush internal non-joined state store by joining the dummy record +inputTopic1.pipeInput(2, "dummy", windowStart + 402); +processor.checkAndClearProcessResult( +new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402)); + +/* + * Verifies right non-joined records are not emitted by record processed in the left topic + */ + +windowStart = windowStart + 401; + +// No joins detected; No null-joins emitted +inputTopic2.pipeInput(0, "A0", windowStart + 1); +inputTopic2.pipeInput(1, "A1", windowStart + 2); +inputTopic2.pipeInput(0, "A0-0", windowStart + 3); +processor.checkAndClearProcessResult(); + +// Join detected; No null-joins emitted +inputTopic1.pipeInput(1, "a1", windowStart + 3); +processor.checkAndClearProcessResult( +new
[GitHub] [kafka] mjsax commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins
mjsax commented on a change in pull request #10462: URL: https://github.com/apache/kafka/pull/10462#discussion_r614538486 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java ## @@ -486,6 +486,327 @@ public void testJoin() { } } +@Test +public void testImprovedLeftJoin() { +final StreamsBuilder builder = new StreamsBuilder(); + +final KStream stream1; +final KStream stream2; +final KStream joined; +final MockProcessorSupplier supplier = new MockProcessorSupplier<>(); + +stream1 = builder.stream(topic1, consumed); +stream2 = builder.stream(topic2, consumed); +joined = stream1.leftJoin( +stream2, +MockValueJoiner.TOSTRING_JOINER, +JoinWindows.of(ofMillis(100)).grace(ofMillis(0)), +StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())); +joined.process(supplier); + +try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { +final TestInputTopic inputTopic1 = +driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final TestInputTopic inputTopic2 = +driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final MockProcessor processor = supplier.theCapturedProcessor(); + +/* + * Verifies left non-joined records are emitted by a record processed in the left topic + */ + +long windowStart = 0; + +// No joins detected; No null-joins emitted +inputTopic1.pipeInput(0, "A0", windowStart + 1); +inputTopic1.pipeInput(1, "A1", windowStart + 2); +inputTopic1.pipeInput(0, "A0-0", windowStart + 3); +processor.checkAndClearProcessResult(); + +// Join detected; No null-joins emitted +inputTopic2.pipeInput(1, "a1", windowStart + 3); +processor.checkAndClearProcessResult( +new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3)); + +// Dummy record in left topic will emit expired non-joined records from the left topic +inputTopic1.pipeInput(2, "dummy", windowStart + 401); +processor.checkAndClearProcessResult( +new KeyValueTimestamp<>(0, "A0+null", windowStart + 1), +new KeyValueTimestamp<>(0, "A0-0+null", windowStart + 3)); + +// Flush internal non-joined state store by joining the dummy record +inputTopic2.pipeInput(2, "dummy", windowStart + 401); +processor.checkAndClearProcessResult( +new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 401)); + +/* + * Verifies left non-joined records are emitted by a record processed in the right topic + */ + +windowStart = windowStart + 301; + +// No joins detected; No null-joins emitted +inputTopic1.pipeInput(0, "A0", windowStart + 1); +inputTopic1.pipeInput(1, "A1", windowStart + 2); +inputTopic1.pipeInput(0, "A0-0", windowStart + 3); +processor.checkAndClearProcessResult(); + +// Join detected; No null-joins emitted +inputTopic2.pipeInput(1, "a1", windowStart + 3); +processor.checkAndClearProcessResult( +new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3)); + +// Dummy record in right topic will emit expired non-joined records from the left topic +inputTopic2.pipeInput(2, "dummy", windowStart + 401); +processor.checkAndClearProcessResult( +new KeyValueTimestamp<>(0, "A0+null", windowStart + 1), +new KeyValueTimestamp<>(0, "A0-0+null", windowStart + 3)); + +// Flush internal non-joined state store by joining the dummy record +inputTopic1.pipeInput(2, "dummy", windowStart + 402); +processor.checkAndClearProcessResult( +new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402)); + +/* + * Verifies right non-joined records are not emitted by record processed in the left topic + */ + +windowStart = windowStart + 401; + +// No joins detected; No null-joins emitted +inputTopic2.pipeInput(0, "A0", windowStart + 1); +inputTopic2.pipeInput(1, "A1", windowStart + 2); +inputTopic2.pipeInput(0, "A0-0", windowStart + 3); +processor.checkAndClearProcessResult(); + +// Join detected; No null-joins emitted +inputTopic1.pipeInput(1, "a1", windowStart + 3); +processor.checkAndClearProcessResult( +new
[GitHub] [kafka] mjsax commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins
mjsax commented on a change in pull request #10462: URL: https://github.com/apache/kafka/pull/10462#discussion_r614538200 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java ## @@ -486,6 +486,327 @@ public void testJoin() { } } +@Test +public void testImprovedLeftJoin() { Review comment: Should this test not be added to `KStreamKStreamLeftJoinTest.java` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12675) Improve sticky general assignor scalability and performance
[ https://issues.apache.org/jira/browse/KAFKA-12675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17322593#comment-17322593 ] Travis Bischel commented on KAFKA-12675: An option to evaluate is the algorithm I devised in my franz-go client, which translates the balancing into a graph and uses A* search to perform an exact balance much more efficiently. I noticed that the existing Java algorithm is heuristic based, and I have a few tests in my repo showing edge cases that the existing heuristic algorithm cannot really handle. The algorithm is here: https://github.com/twmb/franz-go/blob/master/pkg/kgo/internal/sticky/graph.go with the option to switch into that algorithm in the sticky.go file. > Improve sticky general assignor scalability and performance > --- > > Key: KAFKA-12675 > URL: https://issues.apache.org/jira/browse/KAFKA-12675 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > Currently, we have "general assignor" for non-equal subscription case and > "constrained assignor" for all equal subscription case. There's a performance > test for constrained assignor with: > topicCount = {color:#ff}500{color}; > partitionCount = {color:#ff}2000{color}; > consumerCount = {color:#ff}2000{color}; > in _testLargeAssignmentAndGroupWithUniformSubscription,_ total 1 million > partitions and we can complete the assignment within 2 second in my machine. > However, if we let 1 of the consumer subscribe to only 1 topic, it'll use > "general assignor", and the result with the same setting as above is: > *OutOfMemory,* > Even we down the count to: > topicCount = {color:#ff}50{color}; > partitionCount = 1{color:#ff}000{color}; > consumerCount = 1{color:#ff}000{color}; > We still got *OutOfMemory*. > With this setting: > topicCount = {color:#ff}50{color}; > partitionCount = 8{color:#ff}00{color}; > consumerCount = 8{color:#ff}00{color}; > We can complete in 10 seconds in my machine, which is still slow. > > Since we are going to set default assignment strategy to > "CooperativeStickyAssignor" soon, we should improve the scalability and > performance for sticky general assignor. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins
mjsax commented on a change in pull request #10462: URL: https://github.com/apache/kafka/pull/10462#discussion_r614537097 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ## @@ -93,23 +136,118 @@ public void process(final K key, final V1 value) { } boolean needOuterJoin = outer; +boolean joinFound = false; final long inputRecordTimestamp = context().timestamp(); final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs); final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs); +maxObservedStreamTime.advance(inputRecordTimestamp); + try (final WindowStoreIterator iter = otherWindow.fetch(key, timeFrom, timeTo)) { while (iter.hasNext()) { needOuterJoin = false; +joinFound = true; final KeyValue otherRecord = iter.next(); +final long otherRecordTimestamp = otherRecord.key; Review comment: Yes, it a bug in the current implementation... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins
mjsax commented on a change in pull request #10462: URL: https://github.com/apache/kafka/pull/10462#discussion_r614536735 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java ## @@ -211,6 +263,66 @@ private void assertUniqueStoreNames(final WindowBytesStoreSupplier supplier, return builder; } +@SuppressWarnings("unchecked") +private static StoreBuilder, LeftOrRightValue>> outerJoinWindowStoreBuilder(final String storeName, + final JoinWindows windows, + final StreamJoinedInternal streamJoinedInternal) { +final StoreBuilder, LeftOrRightValue>> builder = new TimeOrderedWindowStoreBuilder, LeftOrRightValue>( +persistentTimeOrderedWindowStore( +storeName + "-store", +Duration.ofMillis(windows.size() + windows.gracePeriodMs()), +Duration.ofMillis(windows.size()) +), +new KeyAndJoinSideSerde<>(streamJoinedInternal.keySerde()), +new LeftOrRightValueSerde(streamJoinedInternal.valueSerde(), streamJoinedInternal.otherValueSerde()), +Time.SYSTEM Review comment: Ok. Maybe good enough as-is. (Fair point that we don't mock it in other stores either -- maybe there was never any demand to be able to mock it. As you said, we could change it as follow up if needed.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins
mjsax commented on a change in pull request #10462: URL: https://github.com/apache/kafka/pull/10462#discussion_r614536011 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java ## @@ -118,20 +142,47 @@ final ProcessorGraphNode otherWindowedStreamsNode = new ProcessorGraphNode<>(otherWindowStreamProcessorName, otherWindowStreamProcessorParams); builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode); +Optional, LeftOrRightValue>>> outerJoinWindowStore = Optional.empty(); +if (leftOuter || rightOuter) { +final String outerJoinSuffix = "-shared-outer-join-store"; + +// Get the suffix index of the joinThisGeneratedName to build the outer join store name. +final String outerJoinStoreGeneratedName = KStreamImpl.OUTERSHARED_NAME ++ joinThisGeneratedName.substring( +rightOuter +? KStreamImpl.OUTERTHIS_NAME.length() +: KStreamImpl.JOINTHIS_NAME.length()); Review comment: I agree that we should not use one more index to avoid compatibility issues... Maybe the question is really (just for my better understanding), what would the name be, ie, could be give a concrete example (with and without `Named` parameter)? That is also why I asked for a test using `TopologyDescription` -- makes it easier to wrap my head around. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12675) Improve sticky general assignor scalability and performance
[ https://issues.apache.org/jira/browse/KAFKA-12675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-12675: -- Description: Currently, we have "general assignor" for non-equal subscription case and "constrained assignor" for all equal subscription case. There's a performance test for constrained assignor with: topicCount = {color:#ff}500{color}; partitionCount = {color:#ff}2000{color}; consumerCount = {color:#ff}2000{color}; in _testLargeAssignmentAndGroupWithUniformSubscription,_ total 1 million partitions and we can complete the assignment within 2 second in my machine. However, if we let 1 of the consumer subscribe to only 1 topic, it'll use "general assignor", and the result with the same setting as above is: *OutOfMemory,* Even we down the count to: topicCount = {color:#ff}50{color}; partitionCount = 1{color:#ff}000{color}; consumerCount = 1{color:#ff}000{color}; We still got *OutOfMemory*. With this setting: topicCount = {color:#ff}50{color}; partitionCount = 8{color:#ff}00{color}; consumerCount = 8{color:#ff}00{color}; We can complete in 10 seconds in my machine, which is still slow. Since we are going to set default assignment strategy to "CooperativeStickyAssignor" soon, we should improve the scalability and performance for sticky general assignor. was: Currently, we have "general assignor" for non-equal subscription case and "constrained assignor" for all equal subscription case. There's a performance test for constrained assignor with: topicCount = {color:#ff}500{color}; partitionCount = {color:#ff}2000{color}; consumerCount = {color:#ff}2000{color}; in _testLargeAssignmentAndGroupWithUniformSubscription,_ total 1 million partitions and we can complete the assignment within 2 second in my machine. However, if we let 1 of the consumer subscribe to only 1 topic, it'll use "general assignor", and the result with the same setting as above is: *OutOfMemory,* Even we down the count to: topicCount = {color:#ff}50{color}; partitionCount = 1{color:#ff}000{color}; consumerCount = 1{color:#ff}000{color}; We still got *OutOfMemory*. With this setting: topicCount = {color:#ff}50{color}; partitionCount = 8{color:#ff}00{color}; consumerCount = 8{color:#ff}00{color}; We can complete with 10 seconds in my machine, which is still slow. Since we are going to set default assignment strategy to "CooperativeStickyAssignor" soon, we should improve the scalability and performance for sticky general assignor. > Improve sticky general assignor scalability and performance > --- > > Key: KAFKA-12675 > URL: https://issues.apache.org/jira/browse/KAFKA-12675 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > Currently, we have "general assignor" for non-equal subscription case and > "constrained assignor" for all equal subscription case. There's a performance > test for constrained assignor with: > topicCount = {color:#ff}500{color}; > partitionCount = {color:#ff}2000{color}; > consumerCount = {color:#ff}2000{color}; > in _testLargeAssignmentAndGroupWithUniformSubscription,_ total 1 million > partitions and we can complete the assignment within 2 second in my machine. > However, if we let 1 of the consumer subscribe to only 1 topic, it'll use > "general assignor", and the result with the same setting as above is: > *OutOfMemory,* > Even we down the count to: > topicCount = {color:#ff}50{color}; > partitionCount = 1{color:#ff}000{color}; > consumerCount = 1{color:#ff}000{color}; > We still got *OutOfMemory*. > With this setting: > topicCount = {color:#ff}50{color}; > partitionCount = 8{color:#ff}00{color}; > consumerCount = 8{color:#ff}00{color}; > We can complete in 10 seconds in my machine, which is still slow. > > Since we are going to set default assignment strategy to > "CooperativeStickyAssignor" soon, we should improve the scalability and > performance for sticky general assignor. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12675) Improve sticky general assignor scalability and performance
Luke Chen created KAFKA-12675: - Summary: Improve sticky general assignor scalability and performance Key: KAFKA-12675 URL: https://issues.apache.org/jira/browse/KAFKA-12675 Project: Kafka Issue Type: Improvement Reporter: Luke Chen Assignee: Luke Chen Currently, we have "general assignor" for non-equal subscription case and "constrained assignor" for all equal subscription case. There's a performance test for constrained assignor with: topicCount = {color:#ff}500{color}; partitionCount = {color:#ff}2000{color}; consumerCount = {color:#ff}2000{color}; in _testLargeAssignmentAndGroupWithUniformSubscription,_ total 1 million partitions and we can complete the assignment within 2 second in my machine. However, if we let 1 of the consumer subscribe to only 1 topic, it'll use "general assignor", and the result with the same setting as above is: *OutOfMemory,* Even we down the count to: topicCount = {color:#ff}50{color}; partitionCount = 1{color:#ff}000{color}; consumerCount = 1{color:#ff}000{color}; We still got *OutOfMemory*. With this setting: topicCount = {color:#ff}50{color}; partitionCount = 8{color:#ff}00{color}; consumerCount = 8{color:#ff}00{color}; We can complete with 10 seconds in my machine, which is still slow. Since we are going to set default assignment strategy to "CooperativeStickyAssignor" soon, we should improve the scalability and performance for sticky general assignor. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins
mjsax commented on a change in pull request #10462: URL: https://github.com/apache/kafka/pull/10462#discussion_r614534849 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java ## @@ -118,20 +142,47 @@ final ProcessorGraphNode otherWindowedStreamsNode = new ProcessorGraphNode<>(otherWindowStreamProcessorName, otherWindowStreamProcessorParams); builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode); +Optional, LeftOrRightValue>>> outerJoinWindowStore = Optional.empty(); +if (leftOuter || rightOuter) { +final String outerJoinSuffix = "-shared-outer-join-store"; Review comment: Sorry for the confusion. What I mean was, if it's in `leftJoin()` we could use `-shared-left-join-store` and if it's an `outerJoin()` we could use `-shared-outer-join-store` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12674) Client failover takes 2-4 seconds on clean broker shutdown
Gwen Shapira created KAFKA-12674: Summary: Client failover takes 2-4 seconds on clean broker shutdown Key: KAFKA-12674 URL: https://issues.apache.org/jira/browse/KAFKA-12674 Project: Kafka Issue Type: Bug Affects Versions: 2.7.0 Reporter: Gwen Shapira I ran two perf-producer clients against a 4-broker cluster running AWS, behind ELB. And then did a rolling restart, taking down one broker at a time using controlled shutdown. I got the following errors on every broker shutdown: {{[2021-04-16 01:31:39,846] WARN [Producer clientId=producer-1] Received invalid metadata error in produce request on partition perf-test-3 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition.. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)}} {{[2021-04-16 01:44:22,691] WARN [Producer clientId=producer-1] Connection to node 0 (b0-pkc-7yrmj.us-east-2.aws.confluent.cloud/3.140.123.43:9092) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue. (org.apache.kafka.clients.NetworkClient)}} The "Connection to node... terminated" error continued for 2-4 seconds. It looks like the metadata request was repeatedly sent to the node that just went down. I'd expect it to go on an existing connection to one of the live nodes. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] junrao commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.
junrao commented on a change in pull request #10271: URL: https://github.com/apache/kafka/pull/10271#discussion_r614448708 ## File path: storage/src/main/resources/message/RemotePartitionDeleteMetadata.json ## @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 2, + "type": "data", + "name": "RemotePartitionDeleteMetadataRecord", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ +{ + "name": "TopicIdPartition", + "type": "TopicIdPartitionEntry", + "versions": "0+", + "about": "Represents unique topic partition.", + "fields": [ +{ + "name": "Name", + "type": "string", + "versions": "0+", + "about": "Topic name." +}, +{ + "name": "Id", + "type": "uuid", + "versions": "0+", + "about": "Unique identifier of the topic." +}, +{ + "name": "Partition", + "type": "int32", + "versions": "0+", + "about": "Partition number." +} + ] +}, +{ + "name": "BrokerId", + "type": "int32", + "versions": "0+", + "about": "Broker (controller or leader) id from which this event is created. DELETE_PARTITION_MARKED is sent by the controller. DELETE_PARTITION_STARTED and DELETE_PARTITION_FINISHED are sent by remote log metadata topic partition leader." +}, +{ + "name": "EventTimestampMs", Review comment: The KIP has an epoch field. Should we add it here? ## File path: storage/src/main/resources/META-INF/services/org.apache.kafka.common.config.provider.ConfigProvider ## @@ -0,0 +1,16 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + #http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.common.config.provider.FileConfigProvider Review comment: Hmm, what is this file for? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-12468) Initial offsets are copied from source to target cluster
[ https://issues.apache.org/jira/browse/KAFKA-12468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17322537#comment-17322537 ] Jitesh Motwani edited comment on KAFKA-12468 at 4/15/21, 11:52 PM: --- [~askldjd] what do you mean by run 10 consecutive full mirror ? 10 separate processes on the same box? https://issues.apache.org/jira/browse/KAFKA-12468?focusedCommentId=17312826=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17312826 was (Author: jitesh88): [~askldjd] what do you mean by run 10 consecutive full mirror ? https://issues.apache.org/jira/browse/KAFKA-12468?focusedCommentId=17312826=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17312826 > Initial offsets are copied from source to target cluster > > > Key: KAFKA-12468 > URL: https://issues.apache.org/jira/browse/KAFKA-12468 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.7.0 >Reporter: Bart De Neuter >Priority: Major > > We have an active-passive setup where the 3 connectors from mirror maker 2 > (heartbeat, checkpoint and source) are running on a dedicated Kafka connect > cluster on the target cluster. > Offset syncing is enabled as specified by KIP-545. But when activated, it > seems the offsets from the source cluster are initially copied to the target > cluster without translation. This causes a negative lag for all synced > consumer groups. Only when we reset the offsets for each topic/partition on > the target cluster and produce a record on the topic/partition in the source, > the sync starts working correctly. > I would expect that the consumer groups are synced but that the current > offsets of the source cluster are not copied to the target cluster. > This is the configuration we are currently using: > Heartbeat connector > > {code:xml} > { > "name": "mm2-mirror-heartbeat", > "config": { > "name": "mm2-mirror-heartbeat", > "connector.class": > "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector", > "source.cluster.alias": "eventador", > "target.cluster.alias": "msk", > "source.cluster.bootstrap.servers": "", > "target.cluster.bootstrap.servers": "", > "topics": ".*", > "groups": ".*", > "tasks.max": "1", > "replication.policy.class": "CustomReplicationPolicy", > "sync.group.offsets.enabled": "true", > "sync.group.offsets.interval.seconds": "5", > "emit.checkpoints.enabled": "true", > "emit.checkpoints.interval.seconds": "30", > "emit.heartbeats.interval.seconds": "30", > "key.converter": " > org.apache.kafka.connect.converters.ByteArrayConverter", > "value.converter": > "org.apache.kafka.connect.converters.ByteArrayConverter" > } > } > {code} > Checkpoint connector: > {code:xml} > { > "name": "mm2-mirror-checkpoint", > "config": { > "name": "mm2-mirror-checkpoint", > "connector.class": > "org.apache.kafka.connect.mirror.MirrorCheckpointConnector", > "source.cluster.alias": "eventador", > "target.cluster.alias": "msk", > "source.cluster.bootstrap.servers": "", > "target.cluster.bootstrap.servers": "", > "topics": ".*", > "groups": ".*", > "tasks.max": "40", > "replication.policy.class": "CustomReplicationPolicy", > "sync.group.offsets.enabled": "true", > "sync.group.offsets.interval.seconds": "5", > "emit.checkpoints.enabled": "true", > "emit.checkpoints.interval.seconds": "30", > "emit.heartbeats.interval.seconds": "30", > "key.converter": " > org.apache.kafka.connect.converters.ByteArrayConverter", > "value.converter": > "org.apache.kafka.connect.converters.ByteArrayConverter" > } > } > {code} > Source connector: > {code:xml} > { > "name": "mm2-mirror-source", > "config": { > "name": "mm2-mirror-source", > "connector.class": > "org.apache.kafka.connect.mirror.MirrorSourceConnector", > "source.cluster.alias": "eventador", > "target.cluster.alias": "msk", > "source.cluster.bootstrap.servers": "", > "target.cluster.bootstrap.servers": "", > "topics": ".*", > "groups": ".*", > "tasks.max": "40", > "replication.policy.class": "CustomReplicationPolicy", > "sync.group.offsets.enabled": "true", > "sync.group.offsets.interval.seconds": "5", > "emit.checkpoints.enabled": "true", > "emit.checkpoints.interval.seconds": "30", > "emit.heartbeats.interval.seconds": "30", > "key.converter": " > org.apache.kafka.connect.converters.ByteArrayConverter", > "value.converter": > "org.apache.kafka.connect.converters.ByteArrayConverter" > } > } > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12468) Initial offsets are copied from source to target cluster
[ https://issues.apache.org/jira/browse/KAFKA-12468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17322537#comment-17322537 ] Jitesh Motwani commented on KAFKA-12468: [~askldjd] what do you mean by run 10 consecutive full mirror ? https://issues.apache.org/jira/browse/KAFKA-12468?focusedCommentId=17312826=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17312826 > Initial offsets are copied from source to target cluster > > > Key: KAFKA-12468 > URL: https://issues.apache.org/jira/browse/KAFKA-12468 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.7.0 >Reporter: Bart De Neuter >Priority: Major > > We have an active-passive setup where the 3 connectors from mirror maker 2 > (heartbeat, checkpoint and source) are running on a dedicated Kafka connect > cluster on the target cluster. > Offset syncing is enabled as specified by KIP-545. But when activated, it > seems the offsets from the source cluster are initially copied to the target > cluster without translation. This causes a negative lag for all synced > consumer groups. Only when we reset the offsets for each topic/partition on > the target cluster and produce a record on the topic/partition in the source, > the sync starts working correctly. > I would expect that the consumer groups are synced but that the current > offsets of the source cluster are not copied to the target cluster. > This is the configuration we are currently using: > Heartbeat connector > > {code:xml} > { > "name": "mm2-mirror-heartbeat", > "config": { > "name": "mm2-mirror-heartbeat", > "connector.class": > "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector", > "source.cluster.alias": "eventador", > "target.cluster.alias": "msk", > "source.cluster.bootstrap.servers": "", > "target.cluster.bootstrap.servers": "", > "topics": ".*", > "groups": ".*", > "tasks.max": "1", > "replication.policy.class": "CustomReplicationPolicy", > "sync.group.offsets.enabled": "true", > "sync.group.offsets.interval.seconds": "5", > "emit.checkpoints.enabled": "true", > "emit.checkpoints.interval.seconds": "30", > "emit.heartbeats.interval.seconds": "30", > "key.converter": " > org.apache.kafka.connect.converters.ByteArrayConverter", > "value.converter": > "org.apache.kafka.connect.converters.ByteArrayConverter" > } > } > {code} > Checkpoint connector: > {code:xml} > { > "name": "mm2-mirror-checkpoint", > "config": { > "name": "mm2-mirror-checkpoint", > "connector.class": > "org.apache.kafka.connect.mirror.MirrorCheckpointConnector", > "source.cluster.alias": "eventador", > "target.cluster.alias": "msk", > "source.cluster.bootstrap.servers": "", > "target.cluster.bootstrap.servers": "", > "topics": ".*", > "groups": ".*", > "tasks.max": "40", > "replication.policy.class": "CustomReplicationPolicy", > "sync.group.offsets.enabled": "true", > "sync.group.offsets.interval.seconds": "5", > "emit.checkpoints.enabled": "true", > "emit.checkpoints.interval.seconds": "30", > "emit.heartbeats.interval.seconds": "30", > "key.converter": " > org.apache.kafka.connect.converters.ByteArrayConverter", > "value.converter": > "org.apache.kafka.connect.converters.ByteArrayConverter" > } > } > {code} > Source connector: > {code:xml} > { > "name": "mm2-mirror-source", > "config": { > "name": "mm2-mirror-source", > "connector.class": > "org.apache.kafka.connect.mirror.MirrorSourceConnector", > "source.cluster.alias": "eventador", > "target.cluster.alias": "msk", > "source.cluster.bootstrap.servers": "", > "target.cluster.bootstrap.servers": "", > "topics": ".*", > "groups": ".*", > "tasks.max": "40", > "replication.policy.class": "CustomReplicationPolicy", > "sync.group.offsets.enabled": "true", > "sync.group.offsets.interval.seconds": "5", > "emit.checkpoints.enabled": "true", > "emit.checkpoints.interval.seconds": "30", > "emit.heartbeats.interval.seconds": "30", > "key.converter": " > org.apache.kafka.connect.converters.ByteArrayConverter", > "value.converter": > "org.apache.kafka.connect.converters.ByteArrayConverter" > } > } > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on pull request #10294: KAFKA-12450: Remove deprecated methods from ReadOnlyWindowStore
guozhangwang commented on pull request #10294: URL: https://github.com/apache/kafka/pull/10294#issuecomment-820756041 @jeqo could you rebase the PR while I review it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #10544: KAFKA-12648: minimum changes for error handling namedTopologies
wcarlson5 commented on a change in pull request #10544: URL: https://github.com/apache/kafka/pull/10544#discussion_r614398250 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -471,6 +472,10 @@ private void replaceStreamThread(final Throwable throwable) { private void handleStreamsUncaughtException(final Throwable throwable, final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { +if (throwable instanceof NamedTopologyStreamsException) { +String name = ((NamedTopologyStreamsException) throwable).getTopologyName(); +((StreamThread) Thread.currentThread()).deprioritizeNamedTopology(name); Review comment: The thread is guaranteed to be handled on the dying thread. Though we might want to call this on the new thread EDIT: we def will need to track the failed topologies in `KafkaStreams`, though tracking successes across restarts will be an issue -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #10544: KAFKA-12648: minimum changes for error handling namedTopologies
wcarlson5 commented on a change in pull request #10544: URL: https://github.com/apache/kafka/pull/10544#discussion_r614398250 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -471,6 +472,10 @@ private void replaceStreamThread(final Throwable throwable) { private void handleStreamsUncaughtException(final Throwable throwable, final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { +if (throwable instanceof NamedTopologyStreamsException) { +String name = ((NamedTopologyStreamsException) throwable).getTopologyName(); +((StreamThread) Thread.currentThread()).deprioritizeNamedTopology(name); Review comment: The thread is guaranteed to be handled on the dying thread. Though we might want to call this on the new thread EDIT: we def will need to track the failed topologies in `KafkaStreams`, though tracking successes will be an issue -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10463: KAFKA-12670: support unclean.leader.election.enable in KRaft mode
cmccabe commented on a change in pull request #10463: URL: https://github.com/apache/kafka/pull/10463#discussion_r614404680 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -1119,6 +1148,113 @@ void validateManualPartitionAssignment(List assignment, } } +/** + * Handle legacy configuration alterations. + */ +ControllerResult> legacyAlterConfigs( +Map> newConfigs) { +ControllerResult> result = +configurationControl.legacyAlterConfigs(newConfigs); +return alterConfigs(result); +} + +/** + * Handle incremental configuration alterations. + */ +ControllerResult> incrementalAlterConfigs( +Map>> configChanges) { +ControllerResult> result = +configurationControl.incrementalAlterConfigs(configChanges); +return alterConfigs(result); +} + +/** + * If important controller configurations were changed, generate records which will + * apply the changes. + */ +ControllerResult> alterConfigs( +ControllerResult> result) { +ElectionStrategizer strategizer = examineConfigAlterations(result.records()); +boolean isAtomic = true; +List records = result.records(); +if (strategizer != null) { +records.addAll(handleLeaderElectionConfigChanges(strategizer)); +isAtomic = false; Review comment: I changed the approach here in the latest PR. Now, rather than combining the records into a single list, I schedule another task to scan through the offline partitions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #10544: KAFKA-12648: minimum changes for error handling namedTopologies
wcarlson5 commented on a change in pull request #10544: URL: https://github.com/apache/kafka/pull/10544#discussion_r614398757 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -1198,7 +1210,7 @@ int process(final int maxNumRecords, final Time time) { task.recordProcessBatchTime(now - then); } } - +reprioritizeTasks(); Review comment: if processing succeeds ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java ## @@ -43,6 +43,8 @@ private final Map allTasksPerId = new TreeMap<>(); private final Map readOnlyTasksPerId = Collections.unmodifiableMap(allTasksPerId); private final Collection readOnlyTasks = Collections.unmodifiableCollection(allTasksPerId.values()); +private final Collection misbehavingTasks = new HashSet<>(); +private final Collection taskJail = new HashSet<>(); Review comment: working title ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -471,6 +472,10 @@ private void replaceStreamThread(final Throwable throwable) { private void handleStreamsUncaughtException(final Throwable throwable, final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { +if (throwable instanceof NamedTopologyStreamsException) { +String name = ((NamedTopologyStreamsException) throwable).getTopologyName(); +((StreamThread) Thread.currentThread()).deprioritizeNamedTopology(name); Review comment: The thread is guaranteed to be handled on the dying thread. Though we might want to call this on the new thread ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -471,6 +472,10 @@ private void replaceStreamThread(final Throwable throwable) { private void handleStreamsUncaughtException(final Throwable throwable, final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { +if (throwable instanceof NamedTopologyStreamsException) { Review comment: we should consider exceptions that were not StreamExceptions as well I think ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java ## @@ -304,4 +310,26 @@ void addTask(final Task task) { } allTasksPerId.put(task.id(), task); } + +public void deprioritizeNamedTopology(String name) { +for (Task task: readOnlyTasks) { +if (task.id().namedTopology().equals(name)){ +misbehavingTasks.add(task); +} +} +for (Task task: misbehavingTasks) { Review comment: I think suspending will work. however I am not sure it will survive thread restarts. I may need to store the topology names that are in each state to repopulate each of these lists in new threads -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #10545: KAFKA-12672: Added config for raft testing server
bbejeck commented on pull request #10545: URL: https://github.com/apache/kafka/pull/10545#issuecomment-820665466 @ijuma updated README -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12574) Deprecate eos-alpha
[ https://issues.apache.org/jira/browse/KAFKA-12574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17322417#comment-17322417 ] Guozhang Wang commented on KAFKA-12574: --- Hi [~feyman] I'm aware of KAFKA-9689 as well, and also discussed with [~boyang] about that. I also feel that, doing how swapping online is quite hard, and also would bring in much complexity with the code as well, and hence suggested we moving towards the direction to enforce the config change. > Deprecate eos-alpha > --- > > Key: KAFKA-12574 > URL: https://issues.apache.org/jira/browse/KAFKA-12574 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: A. Sophie Blee-Goldman >Priority: Blocker > Labels: needs-kip > Fix For: 3.0.0 > > > In KIP-447 we introduced a new thread-producer which is capable of > exactly-once semantics across multiple tasks. The new mode of EOS, called > eos-beta, is intended to eventually be the preferred processing mode for EOS > as it improves the performance and scaling of partitions/tasks. The only > downside is that it requires brokers to be on version 2.5+ in order to > understand the latest APIs that are necessary for this thread-producer. > We should consider deprecating the eos-alpha config, ie > StreamsConfig.EXACTLY_ONCE, to encourage new. & existing EOS users to migrate > to the new-and-improved processing mode, and upgrade their brokers if > necessary. > Eventually we would like to be able to remove the eos-alpha code paths from > Streams as this will help to simplify the logic and reduce the processing > mode branching. But since this will break client-broker compatibility, and > 2.5 is still a relatively recent version, we probably can't actually remove > eos-alpha in the near future -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe commented on pull request #10505: MINOR: fix some bugs in ControllerApis.scala
cmccabe commented on pull request #10505: URL: https://github.com/apache/kafka/pull/10505#issuecomment-820659355 @chia7712: I re-ran the test failures locally and couldn't get them to occur. The failures I saw were mostly streams tests... those have have been a bit flaky lately... I hope we can fix this soon :( Also, closer to home, `RaftClusterTest#testCreateClusterAndCreateAndManyTopicsWithManyPartitions` seems to flake periodically on Jenkins (even before this PR), but I can't seem to reproduce it locally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #10545: KAFKA-12672: Added config for raft testing server
ijuma commented on pull request #10545: URL: https://github.com/apache/kafka/pull/10545#issuecomment-820659248 Thanks for the PR. Don't you need to update the multi-node-quorum section? https://github.com/apache/kafka/tree/trunk/raft#run-multi-node-quorum -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck opened a new pull request #10545: KAFKA-12672: Added config for raft testing server
bbejeck opened a new pull request #10545: URL: https://github.com/apache/kafka/pull/10545 Adding a property to the `raft/config/kraft.properties` for running the raft test server in development. For testing I ran `./bin/test-kraft-server-start.sh --config config/kraft.properties` and validated the test server started running with a throughput test. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins
spena commented on a change in pull request #10462: URL: https://github.com/apache/kafka/pull/10462#discussion_r614313362 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ## @@ -93,23 +136,118 @@ public void process(final K key, final V1 value) { } boolean needOuterJoin = outer; +boolean joinFound = false; final long inputRecordTimestamp = context().timestamp(); final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs); final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs); +maxObservedStreamTime.advance(inputRecordTimestamp); + try (final WindowStoreIterator iter = otherWindow.fetch(key, timeFrom, timeTo)) { while (iter.hasNext()) { needOuterJoin = false; +joinFound = true; final KeyValue otherRecord = iter.next(); +final long otherRecordTimestamp = otherRecord.key; + +// Emit expired records before the joined record to keep time ordering +emitExpiredNonJoinedOuterRecordsExcept(key, otherRecordTimestamp); + context().forward( key, joiner.apply(key, value, otherRecord.value), -To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key))); +To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp))); +} + +// Emit all expired records before adding a new non-joined record to the store. Otherwise, +// the put() call will advance the stream time, which causes records out of the retention +// period to be deleted, thus not being emitted later. +if (!joinFound && inputRecordTimestamp == maxObservedStreamTime.get()) { +emitExpiredNonJoinedOuterRecords(); } if (needOuterJoin) { -context().forward(key, joiner.apply(key, value, null)); +// The maxStreamTime contains the max time observed in both sides of the join. +// Having access to the time observed in the other join side fixes the following +// problem: +// +// Say we have a window size of 5 seconds +// 1. A non-joined record wth time T10 is seen in the left-topic (maxLeftStreamTime: 10) +// The record is not processed yet, and is added to the outer-join store +// 2. A non-joined record with time T2 is seen in the right-topic (maxRightStreamTime: 2) +// The record is not processed yet, and is added to the outer-join store +// 3. A joined record with time T11 is seen in the left-topic (maxLeftStreamTime: 11) +// It is time to look at the expired records. T10 and T2 should be emitted, but +// because T2 was late, then it is not fetched by the window store, so it is not processed +// +// See KStreamKStreamLeftJoinTest.testLowerWindowBound() tests +// +// the condition below allows us to process the late record without the need +// to hold it in the temporary outer store +if (!outerJoinWindowStore.isPresent() || timeTo < maxObservedStreamTime.get()) { +context().forward(key, joiner.apply(key, value, null)); +} else { +outerJoinWindowStore.ifPresent(store -> store.put( +KeyAndJoinSide.make(isLeftJoin, key), +LeftOrRightValue.make(isLeftJoin, value), +inputRecordTimestamp)); +} +} +} +} + +private void emitExpiredNonJoinedOuterRecords() { +outerJoinWindowStore.ifPresent(store -> +emitExpiredNonJoinedOuterRecords(store, recordWindowHasClosed)); +} + +private void emitExpiredNonJoinedOuterRecordsExcept(final K key, final long timestamp) { +outerJoinWindowStore.ifPresent(store -> { +final KeyAndJoinSide keyAndJoinSide = KeyAndJoinSide.make(!isLeftJoin, key); + +// Emit all expired records except the just found non-joined key. We need +// to emit all expired records before calling put(), otherwise the internal +// stream time will advance and may cause records out of the retention period to +// be deleted. +emitExpiredNonJoinedOuterRecords(store, +
[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins
spena commented on a change in pull request #10462: URL: https://github.com/apache/kafka/pull/10462#discussion_r614312924 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java ## @@ -211,6 +263,66 @@ private void assertUniqueStoreNames(final WindowBytesStoreSupplier supplier, return builder; } +@SuppressWarnings("unchecked") +private static StoreBuilder, LeftOrRightValue>> outerJoinWindowStoreBuilder(final String storeName, Review comment: Done ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ## @@ -60,20 +82,41 @@ } private class KStreamKStreamJoinProcessor extends AbstractProcessor { +private static final boolean DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT = false; Review comment: Done ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ## @@ -60,20 +82,41 @@ } private class KStreamKStreamJoinProcessor extends AbstractProcessor { +private static final boolean DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT = false; + +private final Predicate>> recordWindowHasClosed = +windowedKey -> windowedKey.window().start() + joinAfterMs + joinGraceMs < maxObservedStreamTime.get(); private WindowStore otherWindow; private StreamsMetricsImpl metrics; private Sensor droppedRecordsSensor; +private Optional, LeftOrRightValue>> outerJoinWindowStore = Optional.empty(); @SuppressWarnings("unchecked") @Override public void init(final ProcessorContext context) { super.init(context); metrics = (StreamsMetricsImpl) context.metrics(); droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics); -otherWindow = (WindowStore) context.getStateStore(otherWindowName); +otherWindow = context.getStateStore(otherWindowName); + +if (!internalOuterJoinFixDisabled(context.appConfigs())) { +outerJoinWindowStore = outerJoinWindowName.map(name -> context.getStateStore(name)); +} } +private boolean internalOuterJoinFixDisabled(final Map configs) { +final Object value = configs.get(StreamsConfig.InternalConfig.INTERNAL_DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX); +if (value == null) { +return DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT; Review comment: Done ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ## @@ -60,20 +82,41 @@ } private class KStreamKStreamJoinProcessor extends AbstractProcessor { +private static final boolean DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT = false; + +private final Predicate>> recordWindowHasClosed = +windowedKey -> windowedKey.window().start() + joinAfterMs + joinGraceMs < maxObservedStreamTime.get(); private WindowStore otherWindow; private StreamsMetricsImpl metrics; private Sensor droppedRecordsSensor; +private Optional, LeftOrRightValue>> outerJoinWindowStore = Optional.empty(); @SuppressWarnings("unchecked") @Override public void init(final ProcessorContext context) { super.init(context); metrics = (StreamsMetricsImpl) context.metrics(); droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics); -otherWindow = (WindowStore) context.getStateStore(otherWindowName); +otherWindow = context.getStateStore(otherWindowName); + +if (!internalOuterJoinFixDisabled(context.appConfigs())) { +outerJoinWindowStore = outerJoinWindowName.map(name -> context.getStateStore(name)); +} } +private boolean internalOuterJoinFixDisabled(final Map configs) { +final Object value = configs.get(StreamsConfig.InternalConfig.INTERNAL_DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX); +if (value == null) { +return DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT; +} + +if (value instanceof Boolean) { +return (Boolean) value; +} else { Review comment: Done ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ## @@ -60,20 +82,41 @@ } private class KStreamKStreamJoinProcessor extends AbstractProcessor { +private static final boolean DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT = false; + +private final Predicate>>
[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins
spena commented on a change in pull request #10462: URL: https://github.com/apache/kafka/pull/10462#discussion_r614312224 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java ## @@ -211,6 +263,66 @@ private void assertUniqueStoreNames(final WindowBytesStoreSupplier supplier, return builder; } +@SuppressWarnings("unchecked") +private static StoreBuilder, LeftOrRightValue>> outerJoinWindowStoreBuilder(final String storeName, + final JoinWindows windows, + final StreamJoinedInternal streamJoinedInternal) { +final StoreBuilder, LeftOrRightValue>> builder = new TimeOrderedWindowStoreBuilder, LeftOrRightValue>( +persistentTimeOrderedWindowStore( +storeName + "-store", +Duration.ofMillis(windows.size() + windows.gracePeriodMs()), +Duration.ofMillis(windows.size()) +), +new KeyAndJoinSideSerde<>(streamJoinedInternal.keySerde()), +new LeftOrRightValueSerde(streamJoinedInternal.valueSerde(), streamJoinedInternal.otherValueSerde()), +Time.SYSTEM Review comment: It will require more changes just to allow that. The `KStreamImplJoin` constructor, where we could overload to pass a `Time` mock object, is only used by the `KStreamImplJoinImpl` class. The tests use the `StreamsBuilder` to create the joins, and they do not accept a Time object. Also, the `Stores` class, which is called by `KStreamImplJoin`, does not mock it. Maybe because the same code changes required just for that? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #10539: Minor: Move trogdor out of tools and into its own project
cmccabe merged pull request #10539: URL: https://github.com/apache/kafka/pull/10539 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins
spena commented on a change in pull request #10462: URL: https://github.com/apache/kafka/pull/10462#discussion_r614304330 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ## @@ -93,23 +136,118 @@ public void process(final K key, final V1 value) { } boolean needOuterJoin = outer; +boolean joinFound = false; final long inputRecordTimestamp = context().timestamp(); final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs); final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs); +maxObservedStreamTime.advance(inputRecordTimestamp); + try (final WindowStoreIterator iter = otherWindow.fetch(key, timeFrom, timeTo)) { while (iter.hasNext()) { needOuterJoin = false; +joinFound = true; final KeyValue otherRecord = iter.next(); +final long otherRecordTimestamp = otherRecord.key; Review comment: Interesting. Is this a current bug with the old join semantics? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins
spena commented on a change in pull request #10462: URL: https://github.com/apache/kafka/pull/10462#discussion_r614303123 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ## @@ -93,23 +136,118 @@ public void process(final K key, final V1 value) { } boolean needOuterJoin = outer; +boolean joinFound = false; final long inputRecordTimestamp = context().timestamp(); final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs); final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs); +maxObservedStreamTime.advance(inputRecordTimestamp); + try (final WindowStoreIterator iter = otherWindow.fetch(key, timeFrom, timeTo)) { while (iter.hasNext()) { needOuterJoin = false; +joinFound = true; final KeyValue otherRecord = iter.next(); +final long otherRecordTimestamp = otherRecord.key; + +// Emit expired records before the joined record to keep time ordering +emitExpiredNonJoinedOuterRecordsExcept(key, otherRecordTimestamp); + context().forward( key, joiner.apply(key, value, otherRecord.value), -To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key))); +To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp))); +} + +// Emit all expired records before adding a new non-joined record to the store. Otherwise, +// the put() call will advance the stream time, which causes records out of the retention +// period to be deleted, thus not being emitted later. +if (!joinFound && inputRecordTimestamp == maxObservedStreamTime.get()) { +emitExpiredNonJoinedOuterRecords(); } if (needOuterJoin) { -context().forward(key, joiner.apply(key, value, null)); +// The maxStreamTime contains the max time observed in both sides of the join. +// Having access to the time observed in the other join side fixes the following +// problem: +// +// Say we have a window size of 5 seconds +// 1. A non-joined record wth time T10 is seen in the left-topic (maxLeftStreamTime: 10) +// The record is not processed yet, and is added to the outer-join store +// 2. A non-joined record with time T2 is seen in the right-topic (maxRightStreamTime: 2) +// The record is not processed yet, and is added to the outer-join store +// 3. A joined record with time T11 is seen in the left-topic (maxLeftStreamTime: 11) +// It is time to look at the expired records. T10 and T2 should be emitted, but +// because T2 was late, then it is not fetched by the window store, so it is not processed +// +// See KStreamKStreamLeftJoinTest.testLowerWindowBound() tests +// +// the condition below allows us to process the late record without the need +// to hold it in the temporary outer store +if (!outerJoinWindowStore.isPresent() || timeTo < maxObservedStreamTime.get()) { +context().forward(key, joiner.apply(key, value, null)); +} else { +outerJoinWindowStore.ifPresent(store -> store.put( +KeyAndJoinSide.make(isLeftJoin, key), +LeftOrRightValue.make(isLeftJoin, value), +inputRecordTimestamp)); +} +} +} +} + +private void emitExpiredNonJoinedOuterRecords() { +outerJoinWindowStore.ifPresent(store -> +emitExpiredNonJoinedOuterRecords(store, recordWindowHasClosed)); +} + +private void emitExpiredNonJoinedOuterRecordsExcept(final K key, final long timestamp) { +outerJoinWindowStore.ifPresent(store -> { +final KeyAndJoinSide keyAndJoinSide = KeyAndJoinSide.make(!isLeftJoin, key); + +// Emit all expired records except the just found non-joined key. We need +// to emit all expired records before calling put(), otherwise the internal +// stream time will advance and may cause records out of the retention period to +// be deleted. +emitExpiredNonJoinedOuterRecords(store, +
[jira] [Updated] (KAFKA-12672) Running test-kraft-server-start results in error
[ https://issues.apache.org/jira/browse/KAFKA-12672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-12672: Description: Running the {{test-kraft-server-start}} script in the {{raft}} module results in this error {code:java} ERROR Exiting raft server due to fatal exception (kafka.tools.TestRaftServer$) java.lang.IllegalArgumentException: No enum constant org.apache.kafka.common.security.auth.SecurityProtocol. at java.lang.Enum.valueOf(Enum.java:238) at org.apache.kafka.common.security.auth.SecurityProtocol.valueOf(SecurityProtocol.java:26) at org.apache.kafka.common.security.auth.SecurityProtocol.forName(SecurityProtocol.java:72) at kafka.raft.KafkaRaftManager.$anonfun$buildNetworkClient$1(RaftManager.scala:256) at scala.collection.immutable.Map$Map4.getOrElse(Map.scala:530) at kafka.raft.KafkaRaftManager.buildNetworkClient(RaftManager.scala:256) at kafka.raft.KafkaRaftManager.buildNetworkChannel(RaftManager.scala:234) at kafka.raft.KafkaRaftManager.(RaftManager.scala:126) at kafka.tools.TestRaftServer.startup(TestRaftServer.scala:88) at kafka.tools.TestRaftServer$.main(TestRaftServer.scala:442) at kafka.tools.TestRaftServer.main(TestRaftServer.scala) {code} Looks like the listener property in the config is not getting picked up as an empty string gets passed to {{SecurityProtocol.forName}} EDIT: The issue is the properties file needs to have a {{controller.listener.names}} property with just values of the names was: Running the {{test-kraft-server-start}} script in the {{raft}} module results in this error {code:java} ERROR Exiting raft server due to fatal exception (kafka.tools.TestRaftServer$) java.lang.IllegalArgumentException: No enum constant org.apache.kafka.common.security.auth.SecurityProtocol. at java.lang.Enum.valueOf(Enum.java:238) at org.apache.kafka.common.security.auth.SecurityProtocol.valueOf(SecurityProtocol.java:26) at org.apache.kafka.common.security.auth.SecurityProtocol.forName(SecurityProtocol.java:72) at kafka.raft.KafkaRaftManager.$anonfun$buildNetworkClient$1(RaftManager.scala:256) at scala.collection.immutable.Map$Map4.getOrElse(Map.scala:530) at kafka.raft.KafkaRaftManager.buildNetworkClient(RaftManager.scala:256) at kafka.raft.KafkaRaftManager.buildNetworkChannel(RaftManager.scala:234) at kafka.raft.KafkaRaftManager.(RaftManager.scala:126) at kafka.tools.TestRaftServer.startup(TestRaftServer.scala:88) at kafka.tools.TestRaftServer$.main(TestRaftServer.scala:442) at kafka.tools.TestRaftServer.main(TestRaftServer.scala) {code} Looks like the listener property in the config is not getting picked up as an empty string gets passed to {{SecurityProtocol.forName}} > Running test-kraft-server-start results in error > > > Key: KAFKA-12672 > URL: https://issues.apache.org/jira/browse/KAFKA-12672 > Project: Kafka > Issue Type: Bug >Reporter: Bill Bejeck >Assignee: Bill Bejeck >Priority: Major > > Running the {{test-kraft-server-start}} script in the {{raft}} module results > in this error > > {code:java} > ERROR Exiting raft server due to fatal exception (kafka.tools.TestRaftServer$) > java.lang.IllegalArgumentException: No enum constant > org.apache.kafka.common.security.auth.SecurityProtocol. > at java.lang.Enum.valueOf(Enum.java:238) > at > org.apache.kafka.common.security.auth.SecurityProtocol.valueOf(SecurityProtocol.java:26) > at > org.apache.kafka.common.security.auth.SecurityProtocol.forName(SecurityProtocol.java:72) > at > kafka.raft.KafkaRaftManager.$anonfun$buildNetworkClient$1(RaftManager.scala:256) > at scala.collection.immutable.Map$Map4.getOrElse(Map.scala:530) > at kafka.raft.KafkaRaftManager.buildNetworkClient(RaftManager.scala:256) > at > kafka.raft.KafkaRaftManager.buildNetworkChannel(RaftManager.scala:234) > at kafka.raft.KafkaRaftManager.(RaftManager.scala:126) > at kafka.tools.TestRaftServer.startup(TestRaftServer.scala:88) > at kafka.tools.TestRaftServer$.main(TestRaftServer.scala:442) > at kafka.tools.TestRaftServer.main(TestRaftServer.scala) > {code} > Looks like the listener property in the config is not getting picked up as an > empty string gets passed to {{SecurityProtocol.forName}} > EDIT: The issue is the properties file needs to have a > {{controller.listener.names}} property with just values of the names -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12673) Kafka and zookeeper dependency
suhas created KAFKA-12673: - Summary: Kafka and zookeeper dependency Key: KAFKA-12673 URL: https://issues.apache.org/jira/browse/KAFKA-12673 Project: Kafka Issue Type: Task Reporter: suhas We are using kafka version 2.4.1 in our cluster. We are not able to upgrade zookeeper latest version 3.7.0 version alone as it having dependency with kafka. So changing zookeeper version alone in our Kafka cluster is incompatible change. Looking forward for the resolution. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] wcarlson5 commented on pull request #10544: KAFKA-12648: minimal changes for error handling namedTopologies
wcarlson5 commented on pull request #10544: URL: https://github.com/apache/kafka/pull/10544#issuecomment-820620451 @ableegoldman This is what I think the error handling should look like. We can try to categorize more but I think we should make this best effort and we can add to it as we go. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 opened a new pull request #10544: KAFKA-12648: minimal changes for error handling namedTopologies
wcarlson5 opened a new pull request #10544: URL: https://github.com/apache/kafka/pull/10544 changed the obvious ones to attribute to a named topology. This might be all we need, we can always add more if they come up. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12671) Out of order processing with a transactional producer can lead to a stuck LastStableOffset
[ https://issues.apache.org/jira/browse/KAFKA-12671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Travis Bischel updated KAFKA-12671: --- Description: If there is pathological processing of incoming produce requests and EndTxn requests, then the LastStableOffset can get stuck, which will block consuming in READ_COMMITTED mode. To transactionally produce, the standard flow is to InitProducerId, and then loop AddPartitionsToTxn -> Produce+ -> EndTxn. The AddPartitionsToTxn is responsible for fencing and adding partitions to a transaction, and the end transaction is responsible for finishing the transaction. Producing itself is mostly uninvolved with the proper fencing / ending flow, but produce requests are required to be after AddPartitionsToTxn and before EndTxn. When a ProduceRequest is handled, Kafka uses an internal ProducerStateManager to mildly manage transactions. The ProducerStateManager is completely independent of the TxnCoordinator, and its guarantees are relatively weak. The ProducerStateManager handles two types of "batches" being added: a data batch and a transaction marker. When a data batch is added, a "transaction" is begun and tied to the producer ID that is producing the batch. When a transaction marker is handled, the ProducerStateManager removes the transaction for the producer ID (roughly). EndTxn is what triggers transaction markers to be sent to the ProducerStateManager. In essence, EndTxn is the one part of the transactional producer flow that talks across both the TxnCoordinator and the ProducerStateManager. If a ProduceRequest is issued before EndTxn, but handled internally in Kafka after EndTxn, then the ProduceRequest will begin a new transaction in the ProducerStateManager. If the client was disconnecting, and the EndTxn was the final request issued, the new transaction created in ProducerStateManager is orphaned and nothing can clean it up. The LastStableOffset then hangs based off of this hung transaction. This same problem can be triggered by a produce request that is issued with a transactional ID outside of the context of a transaction at all (no AddPartitionsToTxn). This problem cannot be triggered by producing for so long that the transaction expires; the difference here is that the transaction coordinator bumps the epoch for the producer ID, thus producing again with the old epoch does not work. Theoretically, we are supposed have unlimited retries on produce requests, but in the context of wanting to abort everything and shut down, this is not always feasible. As it currently stands, I'm not sure there's a truly safe way to shut down _without_ flushing and receiving responses for every record produced, even if I want to abort everything and quit. The safest approach I can think of is to actually avoid issuing an EndTxn so that instead we rely on Kafka internally to time things out after a period of time. — For some context, here's my request logs from the client. Note that I write two ProduceRequests, read one, and then issue EndTxn (because I know I want to quit). The second ProduceRequest is read successfully before shutdown, but I ignore the results because I am shutting down. I've taken out logs related to consuming, but the order of the logs is unchanged: {noformat} [INFO] done waiting for unknown topic, metadata was successful; topic: 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765 [INFO] initializing producer id [DEBUG] wrote FindCoordinator v3; err: [DEBUG] read FindCoordinator v3; err: [DEBUG] wrote InitProducerID v4; err: [DEBUG] read InitProducerID v4; err: [INFO] producer id initialization success; id: 1463, epoch: 0 [DEBUG] wrote AddPartitionsToTxn v2; err: [DEBUG] read AddPartitionsToTxn v2; err: [DEBUG] wrote Produce v8; err: [DEBUG] read Produce v8; err: [DEBUG] produced; to: 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765[1{15589=>19686}] [DEBUG] wrote Produce v8; err: [DEBUG] wrote EndTxn v2; err: [DEBUG] read EndTxn v2; err: [DEBUG] read from broker errored, killing connection; addr: localhost:9092, id: 1, successful_reads: 1, err: context canceled [DEBUG] read Produce v8; err: [DEBUG] produced; to: 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765[1{skipped}] {noformat} And then from the broker's point of view. Across two brokers, the second ProduceRequest is completed after EndTxn is handled (and after the WriteTxnMarkers request is handled, which is the important one that hooks into the ProducerStateManager): {noformat} /// Broker 3: init producer ID [2021-04-15 00:56:40,030] DEBUG Completed request:RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, clientId=kgo, correlationId=3) --
[GitHub] [kafka] cmccabe merged pull request #10505: MINOR: fix some bugs in ControllerApis.scala
cmccabe merged pull request #10505: URL: https://github.com/apache/kafka/pull/10505 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on pull request #10543: KAFKA-12648: tag uses of StreamsException to add Named to
wcarlson5 commented on pull request #10543: URL: https://github.com/apache/kafka/pull/10543#issuecomment-820610403 @ableegoldman I tagged all the cases of StreamsExceptions that I think can be attributed to a Sub topology. I may have missed some and some might not be attributable. Also there might be other error types so I am relying on all exceptions being wrapped in a streamsException before they reach the handler. However I know I have seen one thing in soak where that is not the case. But it is a start -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 opened a new pull request #10543: KAFKA-12648: tag uses of StreamsException to add Named to
wcarlson5 opened a new pull request #10543: URL: https://github.com/apache/kafka/pull/10543 change each StreamsException so that it has the sub topology name in it ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #10505: MINOR: fix some bugs in ControllerApis.scala
chia7712 commented on a change in pull request #10505: URL: https://github.com/apache/kafka/pull/10505#discussion_r614266078 ## File path: core/src/main/scala/kafka/server/ControllerApis.scala ## @@ -242,87 +253,99 @@ class ControllerApis(val requestChannel: RequestChannel, val toAuthenticate = new util.HashSet[String] toAuthenticate.addAll(providedNames) val idToName = new util.HashMap[Uuid, String] -controller.findTopicNames(providedIds).get().forEach { (id, nameOrError) => - if (nameOrError.isError) { -appendResponse(null, id, nameOrError.error()) - } else { -toAuthenticate.add(nameOrError.result()) -idToName.put(id, nameOrError.result()) - } -} -// Get the list of deletable topics (those we can delete) and the list of describeable -// topics. If a topic can't be deleted or described, we have to act like it doesn't -// exist, even when it does. -val topicsToAuthenticate = toAuthenticate.asScala -val (describeable, deletable) = if (hasClusterAuth) { - (topicsToAuthenticate.toSet, topicsToAuthenticate.toSet) -} else { - (getDescribableTopics(topicsToAuthenticate), getDeletableTopics(topicsToAuthenticate)) -} -// For each topic that was provided by ID, check if authentication failed. -// If so, remove it from the idToName map and create an error response for it. -val iterator = idToName.entrySet().iterator() -while (iterator.hasNext) { - val entry = iterator.next() - val id = entry.getKey - val name = entry.getValue - if (!deletable.contains(name)) { -if (describeable.contains(name)) { - appendResponse(name, id, new ApiError(TOPIC_AUTHORIZATION_FAILED)) +controller.findTopicNames(providedIds).thenCompose(topicNames => { Review comment: Sure. I will file a PR to deal with it if I get a way. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10505: MINOR: fix some bugs in ControllerApis.scala
cmccabe commented on a change in pull request #10505: URL: https://github.com/apache/kafka/pull/10505#discussion_r614264455 ## File path: core/src/main/scala/kafka/server/ControllerApis.scala ## @@ -242,87 +253,99 @@ class ControllerApis(val requestChannel: RequestChannel, val toAuthenticate = new util.HashSet[String] toAuthenticate.addAll(providedNames) val idToName = new util.HashMap[Uuid, String] -controller.findTopicNames(providedIds).get().forEach { (id, nameOrError) => - if (nameOrError.isError) { -appendResponse(null, id, nameOrError.error()) - } else { -toAuthenticate.add(nameOrError.result()) -idToName.put(id, nameOrError.result()) - } -} -// Get the list of deletable topics (those we can delete) and the list of describeable -// topics. If a topic can't be deleted or described, we have to act like it doesn't -// exist, even when it does. -val topicsToAuthenticate = toAuthenticate.asScala -val (describeable, deletable) = if (hasClusterAuth) { - (topicsToAuthenticate.toSet, topicsToAuthenticate.toSet) -} else { - (getDescribableTopics(topicsToAuthenticate), getDeletableTopics(topicsToAuthenticate)) -} -// For each topic that was provided by ID, check if authentication failed. -// If so, remove it from the idToName map and create an error response for it. -val iterator = idToName.entrySet().iterator() -while (iterator.hasNext) { - val entry = iterator.next() - val id = entry.getKey - val name = entry.getValue - if (!deletable.contains(name)) { -if (describeable.contains(name)) { - appendResponse(name, id, new ApiError(TOPIC_AUTHORIZATION_FAILED)) +controller.findTopicNames(providedIds).thenCompose(topicNames => { Review comment: When I remove the braces I get this compiler error: ``` > Task :core:compileScala FAILED [Error] /home/cmccabe/src/kafka1/core/src/main/scala/kafka/server/ControllerApis.scala:270: ')' expected but 'val' found. [Error] /home/cmccabe/src/kafka1/core/src/main/scala/kafka/server/ControllerApis.scala:278: ';' expected but 'val' found. two errors found ``` Maybe let's follow up on this in a separate PR, if you can find a way to remove some of the braces and still have it compile... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12672) Running test-kraft-server-start results in error
[ https://issues.apache.org/jira/browse/KAFKA-12672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-12672: Description: Running the {{test-kraft-server-start}} script in the {{raft}} module results in this error {code:java} ERROR Exiting raft server due to fatal exception (kafka.tools.TestRaftServer$) java.lang.IllegalArgumentException: No enum constant org.apache.kafka.common.security.auth.SecurityProtocol. at java.lang.Enum.valueOf(Enum.java:238) at org.apache.kafka.common.security.auth.SecurityProtocol.valueOf(SecurityProtocol.java:26) at org.apache.kafka.common.security.auth.SecurityProtocol.forName(SecurityProtocol.java:72) at kafka.raft.KafkaRaftManager.$anonfun$buildNetworkClient$1(RaftManager.scala:256) at scala.collection.immutable.Map$Map4.getOrElse(Map.scala:530) at kafka.raft.KafkaRaftManager.buildNetworkClient(RaftManager.scala:256) at kafka.raft.KafkaRaftManager.buildNetworkChannel(RaftManager.scala:234) at kafka.raft.KafkaRaftManager.(RaftManager.scala:126) at kafka.tools.TestRaftServer.startup(TestRaftServer.scala:88) at kafka.tools.TestRaftServer$.main(TestRaftServer.scala:442) at kafka.tools.TestRaftServer.main(TestRaftServer.scala) {code} Looks like the listener property in the config is not getting picked up as an empty string gets passed to {{SecurityProtocol.forName}} was: Running the {{test-kraft-server-start}} script in the {{raft}} module results in this error {code:java} ERROR Exiting raft server due to fatal exception (kafka.tools.TestRaftServer$) java.lang.IllegalArgumentException: No enum constant org.apache.kafka.common.security.auth.SecurityProtocol. at java.lang.Enum.valueOf(Enum.java:238) at org.apache.kafka.common.security.auth.SecurityProtocol.valueOf(SecurityProtocol.java:26) at org.apache.kafka.common.security.auth.SecurityProtocol.forName(SecurityProtocol.java:72) at kafka.raft.KafkaRaftManager.$anonfun$buildNetworkClient$1(RaftManager.scala:256) at scala.collection.immutable.Map$Map4.getOrElse(Map.scala:530) at kafka.raft.KafkaRaftManager.buildNetworkClient(RaftManager.scala:256) at kafka.raft.KafkaRaftManager.buildNetworkChannel(RaftManager.scala:234) at kafka.raft.KafkaRaftManager.(RaftManager.scala:126) at kafka.tools.TestRaftServer.startup(TestRaftServer.scala:88) at kafka.tools.TestRaftServer$.main(TestRaftServer.scala:442) at kafka.tools.TestRaftServer.main(TestRaftServer.scala) {code} Looks like the listener property in the config is not getting picked up > Running test-kraft-server-start results in error > > > Key: KAFKA-12672 > URL: https://issues.apache.org/jira/browse/KAFKA-12672 > Project: Kafka > Issue Type: Bug >Reporter: Bill Bejeck >Assignee: Bill Bejeck >Priority: Major > > Running the {{test-kraft-server-start}} script in the {{raft}} module results > in this error > > {code:java} > ERROR Exiting raft server due to fatal exception (kafka.tools.TestRaftServer$) > java.lang.IllegalArgumentException: No enum constant > org.apache.kafka.common.security.auth.SecurityProtocol. > at java.lang.Enum.valueOf(Enum.java:238) > at > org.apache.kafka.common.security.auth.SecurityProtocol.valueOf(SecurityProtocol.java:26) > at > org.apache.kafka.common.security.auth.SecurityProtocol.forName(SecurityProtocol.java:72) > at > kafka.raft.KafkaRaftManager.$anonfun$buildNetworkClient$1(RaftManager.scala:256) > at scala.collection.immutable.Map$Map4.getOrElse(Map.scala:530) > at kafka.raft.KafkaRaftManager.buildNetworkClient(RaftManager.scala:256) > at > kafka.raft.KafkaRaftManager.buildNetworkChannel(RaftManager.scala:234) > at kafka.raft.KafkaRaftManager.(RaftManager.scala:126) > at kafka.tools.TestRaftServer.startup(TestRaftServer.scala:88) > at kafka.tools.TestRaftServer$.main(TestRaftServer.scala:442) > at kafka.tools.TestRaftServer.main(TestRaftServer.scala) > {code} > Looks like the listener property in the config is not getting picked up as an > empty string gets passed to {{SecurityProtocol.forName}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] Iskuskov commented on pull request #9541: KAFKA-10675: Add schema name to ConnectSchema.validateValue() error message
Iskuskov commented on pull request #9541: URL: https://github.com/apache/kafka/pull/9541#issuecomment-820596386 @chia7712, @ijuma, could you take a look at this please? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12672) Running test-kraft-server-start results in error
Bill Bejeck created KAFKA-12672: --- Summary: Running test-kraft-server-start results in error Key: KAFKA-12672 URL: https://issues.apache.org/jira/browse/KAFKA-12672 Project: Kafka Issue Type: Bug Reporter: Bill Bejeck Assignee: Bill Bejeck Running the {{test-kraft-server-start}} script in the {{raft}} module results in this error {code:java} ERROR Exiting raft server due to fatal exception (kafka.tools.TestRaftServer$) java.lang.IllegalArgumentException: No enum constant org.apache.kafka.common.security.auth.SecurityProtocol. at java.lang.Enum.valueOf(Enum.java:238) at org.apache.kafka.common.security.auth.SecurityProtocol.valueOf(SecurityProtocol.java:26) at org.apache.kafka.common.security.auth.SecurityProtocol.forName(SecurityProtocol.java:72) at kafka.raft.KafkaRaftManager.$anonfun$buildNetworkClient$1(RaftManager.scala:256) at scala.collection.immutable.Map$Map4.getOrElse(Map.scala:530) at kafka.raft.KafkaRaftManager.buildNetworkClient(RaftManager.scala:256) at kafka.raft.KafkaRaftManager.buildNetworkChannel(RaftManager.scala:234) at kafka.raft.KafkaRaftManager.(RaftManager.scala:126) at kafka.tools.TestRaftServer.startup(TestRaftServer.scala:88) at kafka.tools.TestRaftServer$.main(TestRaftServer.scala:442) at kafka.tools.TestRaftServer.main(TestRaftServer.scala) {code} Looks like the listener property in the config is not getting picked up -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe merged pull request #10538: MINOR: shutdown KafkaScheduler at appropriate time
cmccabe merged pull request #10538: URL: https://github.com/apache/kafka/pull/10538 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kpatelatwork commented on a change in pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits
kpatelatwork commented on a change in pull request #10528: URL: https://github.com/apache/kafka/pull/10528#discussion_r614230413 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java ## @@ -105,6 +105,11 @@ public void remove(ConnectorTaskId id) { } private void commit(WorkerSourceTask workerTask) { +if (!workerTask.shouldCommitOffsets()) { Review comment: I prefer the encapsulation approach but will leave the decision to you as it's a minor nitpick :). I prefer encapsulation because the caller to `commitOffsets` need not worry about first calling `shouldCommitOffsets` and only if it's true then I should call `commitOffsets`. Think like we don't need to do `!ArrayList.isEmpty` check before iterating over it :). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10505: MINOR: fix some bugs in ControllerApis.scala
mumrah commented on a change in pull request #10505: URL: https://github.com/apache/kafka/pull/10505#discussion_r614218074 ## File path: core/src/test/java/kafka/test/MockController.java ## @@ -197,15 +209,73 @@ private MockController(Collection initialTopics) { @Override public CompletableFuture> incrementalAlterConfigs( -Map>> configChanges, +Map>> configChanges, boolean validateOnly) { +Map results = new HashMap<>(); +for (Entry>> entry : +configChanges.entrySet()) { +ConfigResource resource = entry.getKey(); +results.put(resource, incrementalAlterResource(resource, entry.getValue(), validateOnly)); +} +CompletableFuture> future = new CompletableFuture<>(); +future.complete(results); +return future; +} + +private ApiError incrementalAlterResource(ConfigResource resource, +Map> ops, boolean validateOnly) { +for (Entry> entry : ops.entrySet()) { +AlterConfigOp.OpType opType = entry.getValue().getKey(); +if (opType != SET && opType != DELETE) { +return new ApiError(INVALID_REQUEST, "This mock does not " + +"support the " + opType + " config operation."); +} +} +if (!validateOnly) { +for (Entry> entry : ops.entrySet()) { +String key = entry.getKey(); +AlterConfigOp.OpType op = entry.getValue().getKey(); +String value = entry.getValue().getValue(); +switch (op) { +case SET: +configs.computeIfAbsent(resource, __ -> new HashMap<>()).put(key, value); Review comment: well, i thought a method reference would work here for the hash map, but I tried it and it doesn't seem to work. 樂 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] JoelWee commented on pull request #8923: KAFKA-6435: KIP-623 Add internal topics option to streamResetter
JoelWee commented on pull request #8923: URL: https://github.com/apache/kafka/pull/8923#issuecomment-820560188 Thanks @ableegoldman and @wcarlson5! Will try to get this done over the next week or so and ping you when ready :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a change in pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits
C0urante commented on a change in pull request #10528: URL: https://github.com/apache/kafka/pull/10528#discussion_r614208924 ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java ## @@ -662,20 +666,94 @@ public void testSendRecordsProducerCallbackFail() throws Exception { expectSendRecordProducerCallbackFail(); +EasyMock.expect(offsetWriter.willFlush()).andReturn(true); + PowerMock.replayAll(); Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2)); assertThrows(ConnectException.class, () -> Whitebox.invokeMethod(workerTask, "sendRecords")); +assertFalse(workerTask.shouldCommitOffsets()); } @Test -public void testSendRecordsProducerSendFailsImmediately() { -if (!enableTopicCreation) -// should only test with topic creation enabled -return; +public void testSendRecordsProducerCallbackFailInBacklog() throws Exception { +/* +1. A record is sent successfully +2. Flush for offset commit begins +3. Another record is dispatched to the producer and, because of the active offset commit, added to the backlog +4. The producer fails to send that record and notifies the worker via producer callback +5. The first offset commit succeeds as the first record has been sent successfully +6. No further offset commits are attempted as the only remaining record has failed with a non-retriable error + */ +createWorkerTask(); Review comment: Whoops! Thanks for catching this. ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java ## @@ -662,20 +666,94 @@ public void testSendRecordsProducerCallbackFail() throws Exception { expectSendRecordProducerCallbackFail(); +EasyMock.expect(offsetWriter.willFlush()).andReturn(true); + PowerMock.replayAll(); Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2)); assertThrows(ConnectException.class, () -> Whitebox.invokeMethod(workerTask, "sendRecords")); +assertFalse(workerTask.shouldCommitOffsets()); } @Test -public void testSendRecordsProducerSendFailsImmediately() { -if (!enableTopicCreation) -// should only test with topic creation enabled -return; +public void testSendRecordsProducerCallbackFailInBacklog() throws Exception { +/* +1. A record is sent successfully +2. Flush for offset commit begins +3. Another record is dispatched to the producer and, because of the active offset commit, added to the backlog +4. The producer fails to send that record and notifies the worker via producer callback +5. The first offset commit succeeds as the first record has been sent successfully +6. No further offset commits are attempted as the only remaining record has failed with a non-retriable error + */ +createWorkerTask(); createWorkerTask(); +SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); +SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + +expectTopicCreation(TOPIC); + +expectSendRecordOnce(false); +expectSendRecordProducerCallbackFail(); + +expectOffsetFlush(true); +EasyMock.expect(offsetWriter.willFlush()).andReturn(true).anyTimes(); + +PowerMock.replayAll(); + +Whitebox.setInternalState(workerTask, "toSend", Collections.singletonList(record1)); +Whitebox.invokeMethod(workerTask, "sendRecords"); +Whitebox.setInternalState(workerTask, "flushing", true); +Whitebox.setInternalState(workerTask, "toSend", Collections.singletonList(record2)); +Whitebox.invokeMethod(workerTask, "sendRecords"); +assertTrue(workerTask.shouldCommitOffsets()); +assertTrue(workerTask.commitOffsets()); +assertFalse(workerTask.shouldCommitOffsets()); +} + + +@Test +public void testSendRecordsProducerCallbackFailInBacklogWithNonRetriedOffsetCommit() throws Exception { +/* +1. A record is sent successfully +2. Flush for offset commit begins +3. Another record is dispatched to the producer and, because of the active offset commit, added to the backlog +4. The producer fails to send that record and notifies the worker via producer callback +5. The first offset commit fails even though first record has been sent successfully, (possibly from failure to produce offsets to Kafka) +6. No further offset commits are attempted as the new record batch contains
[GitHub] [kafka] C0urante commented on a change in pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits
C0urante commented on a change in pull request #10528: URL: https://github.com/apache/kafka/pull/10528#discussion_r614208181 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java ## @@ -120,6 +120,13 @@ public synchronized boolean beginFlush() { return true; } +/** + * @return whether there's anything to flush right now. + */ +public boolean willFlush() { Review comment: Good catch, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a change in pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits
C0urante commented on a change in pull request #10528: URL: https://github.com/apache/kafka/pull/10528#discussion_r614207888 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java ## @@ -105,6 +105,11 @@ public void remove(ConnectorTaskId id) { } private void commit(WorkerSourceTask workerTask) { +if (!workerTask.shouldCommitOffsets()) { Review comment: I initially wanted to do just that, but held off for two reasons: 1. The `SourceTaskOffsetCommitter` log message may be valuable to some users, especially if they've muted the `WorkerSourceTask` namespace or at least raised its level to `INFO`. 2. It's a little easier to test this logic separately by creating a separate method that can be queried on its own. The `boolean` return value of `WorkerSourceTask::commitOffsets` makes it difficult to determine if an offset commit has been attempted and failed, or skipped entirely. I'm not super attached to either approach; if you have thoughts on why one or the other is better I'm happy to hear them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12574) Deprecate eos-alpha
[ https://issues.apache.org/jira/browse/KAFKA-12574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17322296#comment-17322296 ] Haoran Xuan commented on KAFKA-12574: - Hi, previously I was working on KAFKA-9689, in which [~bchen225242] and I were thinking of leveraging the feature versioning system to automatically detect the `exactly_once_semantics` and also make the `StreamThread` able to hot swap from `eos` to `eos-beta`. During which I found that the hot swapping the `exactly_once_semantics` is kind of difficult and I haven't thought it thorough, this discussion here made me realize that maybe I'm not heading in the correct direction. > Deprecate eos-alpha > --- > > Key: KAFKA-12574 > URL: https://issues.apache.org/jira/browse/KAFKA-12574 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: A. Sophie Blee-Goldman >Priority: Blocker > Labels: needs-kip > Fix For: 3.0.0 > > > In KIP-447 we introduced a new thread-producer which is capable of > exactly-once semantics across multiple tasks. The new mode of EOS, called > eos-beta, is intended to eventually be the preferred processing mode for EOS > as it improves the performance and scaling of partitions/tasks. The only > downside is that it requires brokers to be on version 2.5+ in order to > understand the latest APIs that are necessary for this thread-producer. > We should consider deprecating the eos-alpha config, ie > StreamsConfig.EXACTLY_ONCE, to encourage new. & existing EOS users to migrate > to the new-and-improved processing mode, and upgrade their brokers if > necessary. > Eventually we would like to be able to remove the eos-alpha code paths from > Streams as this will help to simplify the logic and reduce the processing > mode branching. But since this will break client-broker compatibility, and > 2.5 is still a relatively recent version, we probably can't actually remove > eos-alpha in the near future -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] wcarlson5 commented on a change in pull request #8923: KAFKA-6435: KIP-623 Add internal topics option to streamResetter
wcarlson5 commented on a change in pull request #8923: URL: https://github.com/apache/kafka/pull/8923#discussion_r614166802 ## File path: core/src/main/scala/kafka/tools/StreamsResetter.java ## @@ -248,7 +257,7 @@ private void parseArguments(final String[] args) { .ofType(String.class) .describedAs("file name"); forceOption = optionParser.accepts("force", "Force the removal of members of the consumer group (intended to remove stopped members if a long session timeout was used). " + -"Make sure to shut down all stream applications when this option is specified to avoid unexpected rebalances."); Review comment: unnecessary whitespace change -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #8923: KAFKA-6435: KIP-623 Add internal topics option to streamResetter
wcarlson5 commented on a change in pull request #8923: URL: https://github.com/apache/kafka/pull/8923#discussion_r614166802 ## File path: core/src/main/scala/kafka/tools/StreamsResetter.java ## @@ -248,7 +257,7 @@ private void parseArguments(final String[] args) { .ofType(String.class) .describedAs("file name"); forceOption = optionParser.accepts("force", "Force the removal of members of the consumer group (intended to remove stopped members if a long session timeout was used). " + -"Make sure to shut down all stream applications when this option is specified to avoid unexpected rebalances."); Review comment: unnecessary ## File path: core/src/main/scala/kafka/tools/StreamsResetter.java ## @@ -225,6 +229,11 @@ private void parseArguments(final String[] args) { .ofType(String.class) .withValuesSeparatedBy(',') .describedAs("list"); +internalTopicsOption = optionParser.accepts("internal-topics", "Comma-separated list of internal topics to delete. Must be a subset of the internal topics marked for deletion by the default behaviour (do a dry-run without this option to view these topics).") Review comment: line is too long ## File path: core/src/main/scala/kafka/tools/StreamsResetter.java ## @@ -642,22 +651,38 @@ private boolean isIntermediateTopic(final String topic) { return options.valuesOf(intermediateTopicsOption).contains(topic); } -private void maybeDeleteInternalTopics(final Admin adminClient, final boolean dryRun) { -System.out.println("Deleting all internal/auto-created topics for application " + options.valueOf(applicationIdOption)); -final List topicsToDelete = new ArrayList<>(); -for (final String listing : allTopics) { -if (isInternalTopic(listing)) { -if (!dryRun) { -topicsToDelete.add(listing); -} else { -System.out.println("Topic: " + listing); -} +private int maybeDeleteInternalTopics(final Admin adminClient, final boolean dryRun) { +final List inferredInternalTopics = allTopics.stream() +.filter(this::isInferredInternalTopic) +.collect(Collectors.toList()); +final List specifiedInternalTopics = options.valuesOf(internalTopicsOption); +final List topicsToDelete; + +if (!specifiedInternalTopics.isEmpty()) { +final List notFoundInternalTopics = specifiedInternalTopics.stream() +.filter(topic -> !inferredInternalTopics.contains(topic)) +.collect(Collectors.toList()); +if (!notFoundInternalTopics.isEmpty()) { Review comment: something like `inferredInternalTopics.containsAll(specifiedInternalTopics)` might be easier to understand here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kpatelatwork edited a comment on pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits
kpatelatwork edited a comment on pull request #10528: URL: https://github.com/apache/kafka/pull/10528#issuecomment-820517908 @C0urante overall looks good and very good job on the tests but as I am new to the code, I tried my best to review but I recommend getting a LGTM from one more reviewer also who knows this code better than me to ensure we don't miss something obvious? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12671) Out of order processing with a transactional producer can lead to a stuck LastStableOffset
[ https://issues.apache.org/jira/browse/KAFKA-12671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Travis Bischel updated KAFKA-12671: --- Description: If there is pathological processing of incoming produce requests and EndTxn requests, then the LastStableOffset can get stuck, which will block consuming in READ_COMMITTED mode. To transactionally produce, the standard flow is to InitProducerId, and then loop AddPartitionsToTxn -> Produce+ -> EndTxn. The AddPartitionsToTxn is responsible for fencing and adding partitions to a transaction, and the end transaction is responsible for finishing the transaction. Producing itself is mostly uninvolved with the proper fencing / ending flow, but produce requests are required to be after AddPartitionsToTxn and before EndTxn. When a ProduceRequest is handled, Kafka uses an internal ProducerStateManager to mildly manage transactions. The ProducerStateManager is completely independent of the TxnCoordinator, and its guarantees are relatively weak. The ProducerStateManager handles two types of "batches" being added: a data batch and a transaction marker. When a data batch is added, a "transaction" is begun and tied to the producer ID that is producing the batch. When a transaction marker is handled, the ProducerStateManager removes the transaction for the producer ID (roughly). EndTxn is what triggers transaction markers to be sent to the ProducerStateManager. In essence, EndTxn is the one part of the transactional producer flow that talks across both the TxnCoordinator and the ProducerStateManager. If a ProduceRequest is issued before EndTxn, but handled internally in Kafka after EndTxn, then the ProduceRequest will begin a new transaction in the ProducerStateManager. If the client was disconnecting, and the EndTxn was the final request issued, the new transaction created in ProducerStateManager is orphaned and nothing can clean it up. The LastStableOffset then hangs based off of this hung transaction. This same problem can be triggered by a produce request that is issued with a transactional ID outside of the context of a transaction at all (no AddPartitionsToTxn). This problem cannot be triggered by producing for so long that the transaction expires; the difference here is that the transaction coordinator bumps the epoch for the producer ID, thus producing again with the old epoch does not work. Theoretically, we are supposed have unlimited retries on produce requests, but in the context of wanting to abort everything and shut down, this is not always feasible. As it currently stands, I'm not sure there's a truly safe way to shut down _without_ flushing and receiving responses for every record produced, even if I want to abort everything and quit. The safest approach I can think of is to actually avoid issuing an EndTxn so that instead we rely on Kafka internally to time things out after a period of time. — For some context, here's my request logs from the client. Note that I write two ProduceRequests, read one, and then issue EndTxn (because I know I want to quit). The second ProduceRequest is read successfully before shutdown, but I ignore the results because I am shutting down. I've taken out logs related to consuming, but the order of the logs is unchanged: {noformat} [INFO] done waiting for unknown topic, metadata was successful; topic: 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765 [INFO] initializing producer id [DEBUG] wrote FindCoordinator v3; err: [DEBUG] read FindCoordinator v3; err: [DEBUG] wrote InitProducerID v4; err: [DEBUG] read InitProducerID v4; err: [INFO] producer id initialization success; id: 1463, epoch: 0 [DEBUG] wrote AddPartitionsToTxn v2; err: [DEBUG] read AddPartitionsToTxn v2; err: [DEBUG] read Produce v8; err: [DEBUG] produced; to: 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765[1{15589=>19686}] [DEBUG] wrote Produce v8; err: [DEBUG] wrote EndTxn v2; err: [DEBUG] read EndTxn v2; err: [DEBUG] read from broker errored, killing connection; addr: localhost:9092, id: 1, successful_reads: 1, err: context canceled [DEBUG] read Produce v8; err: [DEBUG] produced; to: 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765[1{skipped}] {noformat} And then from the broker's point of view. Across two brokers, the second ProduceRequest is completed after EndTxn is handled (and after the WriteTxnMarkers request is handled, which is the important one that hooks into the ProducerStateManager): {noformat} /// Broker 3: init producer ID [2021-04-15 00:56:40,030] DEBUG Completed request:RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, clientId=kgo, correlationId=3) --
[GitHub] [kafka] kpatelatwork commented on pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits
kpatelatwork commented on pull request #10528: URL: https://github.com/apache/kafka/pull/10528#issuecomment-820517908 @C0urante overall looks good but as I am new to the code, I tried my best to review but I recommend getting a LGTM from one more reviewer also who knows this code better than me to ensure we don't miss something obvious? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kpatelatwork commented on a change in pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits
kpatelatwork commented on a change in pull request #10528: URL: https://github.com/apache/kafka/pull/10528#discussion_r614162085 ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java ## @@ -662,20 +666,94 @@ public void testSendRecordsProducerCallbackFail() throws Exception { expectSendRecordProducerCallbackFail(); +EasyMock.expect(offsetWriter.willFlush()).andReturn(true); + PowerMock.replayAll(); Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2)); assertThrows(ConnectException.class, () -> Whitebox.invokeMethod(workerTask, "sendRecords")); +assertFalse(workerTask.shouldCommitOffsets()); } @Test -public void testSendRecordsProducerSendFailsImmediately() { -if (!enableTopicCreation) -// should only test with topic creation enabled -return; +public void testSendRecordsProducerCallbackFailInBacklog() throws Exception { +/* +1. A record is sent successfully +2. Flush for offset commit begins +3. Another record is dispatched to the producer and, because of the active offset commit, added to the backlog +4. The producer fails to send that record and notifies the worker via producer callback +5. The first offset commit succeeds as the first record has been sent successfully +6. No further offset commits are attempted as the only remaining record has failed with a non-retriable error + */ +createWorkerTask(); createWorkerTask(); +SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); +SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + +expectTopicCreation(TOPIC); + +expectSendRecordOnce(false); +expectSendRecordProducerCallbackFail(); + +expectOffsetFlush(true); +EasyMock.expect(offsetWriter.willFlush()).andReturn(true).anyTimes(); + +PowerMock.replayAll(); + +Whitebox.setInternalState(workerTask, "toSend", Collections.singletonList(record1)); +Whitebox.invokeMethod(workerTask, "sendRecords"); +Whitebox.setInternalState(workerTask, "flushing", true); +Whitebox.setInternalState(workerTask, "toSend", Collections.singletonList(record2)); +Whitebox.invokeMethod(workerTask, "sendRecords"); +assertTrue(workerTask.shouldCommitOffsets()); +assertTrue(workerTask.commitOffsets()); +assertFalse(workerTask.shouldCommitOffsets()); +} + + +@Test +public void testSendRecordsProducerCallbackFailInBacklogWithNonRetriedOffsetCommit() throws Exception { +/* +1. A record is sent successfully +2. Flush for offset commit begins +3. Another record is dispatched to the producer and, because of the active offset commit, added to the backlog +4. The producer fails to send that record and notifies the worker via producer callback +5. The first offset commit fails even though first record has been sent successfully, (possibly from failure to produce offsets to Kafka) +6. No further offset commits are attempted as the new record batch contains the second record, which has failed with a non-retriable error + */ +createWorkerTask(); Review comment: two calls to createWorkerTask -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kpatelatwork commented on a change in pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits
kpatelatwork commented on a change in pull request #10528: URL: https://github.com/apache/kafka/pull/10528#discussion_r614161831 ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java ## @@ -662,20 +666,94 @@ public void testSendRecordsProducerCallbackFail() throws Exception { expectSendRecordProducerCallbackFail(); +EasyMock.expect(offsetWriter.willFlush()).andReturn(true); + PowerMock.replayAll(); Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2)); assertThrows(ConnectException.class, () -> Whitebox.invokeMethod(workerTask, "sendRecords")); +assertFalse(workerTask.shouldCommitOffsets()); } @Test -public void testSendRecordsProducerSendFailsImmediately() { -if (!enableTopicCreation) -// should only test with topic creation enabled -return; +public void testSendRecordsProducerCallbackFailInBacklog() throws Exception { +/* +1. A record is sent successfully +2. Flush for offset commit begins +3. Another record is dispatched to the producer and, because of the active offset commit, added to the backlog +4. The producer fails to send that record and notifies the worker via producer callback +5. The first offset commit succeeds as the first record has been sent successfully +6. No further offset commits are attempted as the only remaining record has failed with a non-retriable error + */ +createWorkerTask(); Review comment: two calls to createWorkerTask , is this a copy paste or I am missing something :)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 opened a new pull request #10542: Kafka 12313: Streamling windowed Deserialiser configs.
vamossagar12 opened a new pull request #10542: URL: https://github.com/apache/kafka/pull/10542 This PR aims to streamline the configurations for WindowedDeserialisers. It deprecates default.windowed.key.serde.inner and default.windowed.value.serde.inner configs in StreamConfig and adds window.inner.class.deserialiser. Details described here: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177047930 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac merged pull request #10540: MINOR: update default for field in DeleteTopicsRequest
dajac merged pull request #10540: URL: https://github.com/apache/kafka/pull/10540 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #10540: MINOR: update default for field in DeleteTopicsRequest
dajac commented on pull request #10540: URL: https://github.com/apache/kafka/pull/10540#issuecomment-820497042 Test failures are not related. Merging to trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kpatelatwork commented on a change in pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits
kpatelatwork commented on a change in pull request #10528: URL: https://github.com/apache/kafka/pull/10528#discussion_r614129645 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java ## @@ -120,6 +120,13 @@ public synchronized boolean beginFlush() { return true; } +/** + * @return whether there's anything to flush right now. + */ +public boolean willFlush() { Review comment: should this method be synchronized? All other access to `data` seems to be in synchronized methods -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kpatelatwork commented on a change in pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits
kpatelatwork commented on a change in pull request #10528: URL: https://github.com/apache/kafka/pull/10528#discussion_r614129645 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java ## @@ -120,6 +120,13 @@ public synchronized boolean beginFlush() { return true; } +/** + * @return whether there's anything to flush right now. + */ +public boolean willFlush() { Review comment: synchronized? All other access to `data` seems to be in synchronized methods -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kpatelatwork commented on a change in pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits
kpatelatwork commented on a change in pull request #10528: URL: https://github.com/apache/kafka/pull/10528#discussion_r614129645 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java ## @@ -120,6 +120,13 @@ public synchronized boolean beginFlush() { return true; } +/** + * @return whether there's anything to flush right now. + */ +public boolean willFlush() { Review comment: synchronized? All access to `data` seems to be in synchronizedd -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kpatelatwork commented on a change in pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits
kpatelatwork commented on a change in pull request #10528: URL: https://github.com/apache/kafka/pull/10528#discussion_r614126665 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java ## @@ -105,6 +105,11 @@ public void remove(ConnectorTaskId id) { } private void commit(WorkerSourceTask workerTask) { +if (!workerTask.shouldCommitOffsets()) { Review comment: @C0urante overall the PR looks good but somehow I feel this `shouldCommitOffsets` is exposing internal details and may be `commitOffsets` should internally encapsulate this. How about we move `shouldCommitOffsets` check as first line in the `commitOffsets` method. and remove the `log.debug("{} Committing offsets", workerTask); `from this method as its redundant and `workerTask .commitOffsets` is already doing it. Your only challenge would be `return false` is in a lot of places in `workerTask .commitOffsets` but we can may be convert the current method to a private method and add a new commitOffsets wrapper method to make that easy. We will also need to move `log.error("{} Failed to commit offsets", workerTask);` in the wrapper method. This way all we need to do is call `workerTask.commitOffsets()` in try/catch here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins
spena commented on a change in pull request #10462: URL: https://github.com/apache/kafka/pull/10462#discussion_r614114447 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ## @@ -60,20 +82,41 @@ } private class KStreamKStreamJoinProcessor extends AbstractProcessor { +private static final boolean DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT = false; + +private final Predicate>> recordWindowHasClosed = +windowedKey -> windowedKey.window().start() + joinAfterMs + joinGraceMs < maxObservedStreamTime.get(); private WindowStore otherWindow; private StreamsMetricsImpl metrics; private Sensor droppedRecordsSensor; +private Optional, LeftOrRightValue>> outerJoinWindowStore = Optional.empty(); @SuppressWarnings("unchecked") @Override public void init(final ProcessorContext context) { super.init(context); metrics = (StreamsMetricsImpl) context.metrics(); droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics); -otherWindow = (WindowStore) context.getStateStore(otherWindowName); +otherWindow = context.getStateStore(otherWindowName); + +if (!internalOuterJoinFixDisabled(context.appConfigs())) { +outerJoinWindowStore = outerJoinWindowName.map(name -> context.getStateStore(name)); +} } +private boolean internalOuterJoinFixDisabled(final Map configs) { +final Object value = configs.get(StreamsConfig.InternalConfig.INTERNAL_DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX); +if (value == null) { +return DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT; Review comment: That constant name is confusing. I just removed the *_DEFAULT constant and return false when the config is not set. But the idea is that the join fix is enabled (disabled = false) if the config is not set. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins
spena commented on a change in pull request #10462: URL: https://github.com/apache/kafka/pull/10462#discussion_r614105673 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java ## @@ -118,20 +142,47 @@ final ProcessorGraphNode otherWindowedStreamsNode = new ProcessorGraphNode<>(otherWindowStreamProcessorName, otherWindowStreamProcessorParams); builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode); +Optional, LeftOrRightValue>>> outerJoinWindowStore = Optional.empty(); +if (leftOuter || rightOuter) { +final String outerJoinSuffix = "-shared-outer-join-store"; + +// Get the suffix index of the joinThisGeneratedName to build the outer join store name. +final String outerJoinStoreGeneratedName = KStreamImpl.OUTERSHARED_NAME ++ joinThisGeneratedName.substring( +rightOuter +? KStreamImpl.OUTERTHIS_NAME.length() +: KStreamImpl.JOINTHIS_NAME.length()); Review comment: I initially generated a name with a new index for the shared store. However, seems this was going to cause incompatibilities in the topology because the new indexed increasing. Instead, now I just get the index from one of the current join stores. Why doesn't make sense? Is there another way to get an index? Or, do I really need to append an index at the end of the shared store? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins
spena commented on a change in pull request #10462: URL: https://github.com/apache/kafka/pull/10462#discussion_r614098535 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java ## @@ -118,20 +142,47 @@ final ProcessorGraphNode otherWindowedStreamsNode = new ProcessorGraphNode<>(otherWindowStreamProcessorName, otherWindowStreamProcessorParams); builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode); +Optional, LeftOrRightValue>>> outerJoinWindowStore = Optional.empty(); +if (leftOuter || rightOuter) { +final String outerJoinSuffix = "-shared-outer-join-store"; Review comment: There is only one store shared between left/right joins. I can use only one name. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.
satishd commented on pull request #10271: URL: https://github.com/apache/kafka/pull/10271#issuecomment-820438379 @junrao This PR is rebased with the latest trunk and it is ready for review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.
satishd commented on pull request #10271: URL: https://github.com/apache/kafka/pull/10271#issuecomment-820437818 Thanks @kowshik for the review. Addressed them with the latest [5250b79](https://github.com/apache/kafka/pull/10271/commits/5250b79cef6d21c73d2b2a12ef7cd95a00e9f034). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.
satishd commented on a change in pull request #10271: URL: https://github.com/apache/kafka/pull/10271#discussion_r614083593 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serde/RemoteLogMetadataContextSerde.java ## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage.serde; + +import org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataContext; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Message; +import org.apache.kafka.common.protocol.ObjectSerializationCache; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecordUpdate; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemotePartitionDeleteMetadataRecord; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + +/** + * Serialization and deserialization for {@link RemoteLogMetadataContext}. This is the root serdes for the messages + * that are stored in internal remote log metadata topic. + */ +public class RemoteLogMetadataContextSerde implements Serde { + +public static final byte REMOTE_LOG_SEGMENT_METADATA_API_KEY = (byte) new RemoteLogSegmentMetadataRecord().apiKey(); +public static final byte REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY = (byte) new RemoteLogSegmentMetadataRecordUpdate().apiKey(); +public static final byte REMOTE_PARTITION_DELETE_API_KEY = (byte) new RemotePartitionDeleteMetadataRecord().apiKey(); + +private final Map keyWithSerde; +private final Deserializer rootDeserializer; +private final Serializer rootSerializer; + +public RemoteLogMetadataContextSerde() { +keyWithSerde = createInternalSerde(); +rootSerializer = (topic, data) -> serialize(data); +rootDeserializer = (topic, data) -> deserialize(data); +} + +private Map createInternalSerde() { +Map serdes = new HashMap<>(); +serdes.put(REMOTE_LOG_SEGMENT_METADATA_API_KEY, new RemoteLogSegmentMetadataSerde()); +serdes.put(REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY, new RemoteLogSegmentMetadataUpdateSerde()); +serdes.put(REMOTE_PARTITION_DELETE_API_KEY, new RemotePartitionDeleteMetadataSerde()); +return serdes; +} + +private byte[] serialize(RemoteLogMetadataContext remoteLogMetadataContext) { Review comment: `ProducerRecord` key is generally used to route to a partition based on the key or for compaction. But we do not want to route the requests based on the apiKey here and it is not a compacted topic. apiKey and version are used to unmarshal the payload. IMHO, I see that as a complete structure for the record value and it should be part of the value instead of spreading in record key and value. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.
satishd commented on a change in pull request #10271: URL: https://github.com/apache/kafka/pull/10271#discussion_r614071198 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serde/RemoteLogSegmentMetadataSerde.java ## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage.serde; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Message; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; + +public class RemoteLogSegmentMetadataSerde implements RemoteLogMetadataSerdes { + +public Message serialize(RemoteLogSegmentMetadata data) { +RemoteLogSegmentMetadataRecord record = new RemoteLogSegmentMetadataRecord() +.setRemoteLogSegmentId( +new RemoteLogSegmentMetadataRecord.RemoteLogSegmentIdEntry() +.setTopicIdPartition(new RemoteLogSegmentMetadataRecord.TopicIdPartitionEntry() + .setId(data.remoteLogSegmentId().topicIdPartition().topicId()) + .setName(data.remoteLogSegmentId().topicIdPartition().topicPartition() +.topic()) + .setPartition(data.remoteLogSegmentId().topicIdPartition().topicPartition() +.partition())) +.setId(data.remoteLogSegmentId().id())) +.setStartOffset(data.startOffset()) +.setEndOffset(data.endOffset()) +.setBrokerId(data.brokerId()) +.setEventTimestampMs(data.eventTimestampMs()) +.setMaxTimestampMs(data.maxTimestampMs()) +.setSegmentSizeInBytes(data.segmentSizeInBytes()) + .setSegmentLeaderEpochs(data.segmentLeaderEpochs().entrySet().stream() +.map(entry -> new RemoteLogSegmentMetadataRecord.SegmentLeaderEpochEntry() +.setLeaderEpoch(entry.getKey()) + .setOffset(entry.getValue())).collect(Collectors.toList())) +.setRemoteLogSegmentState(data.state().id()); + +return record; +} + +@Override +public RemoteLogSegmentMetadata deserialize(byte version, ByteBuffer byteBuffer) { +RemoteLogSegmentMetadataRecord record = new RemoteLogSegmentMetadataRecord( +new ByteBufferAccessor(byteBuffer), version); + +RemoteLogSegmentId remoteLogSegmentId = buildRemoteLogSegmentId(record.remoteLogSegmentId()); +Map segmentLeaderEpochs = new HashMap<>(); +for (RemoteLogSegmentMetadataRecord.SegmentLeaderEpochEntry segmentLeaderEpoch : record.segmentLeaderEpochs()) { +segmentLeaderEpochs.put(segmentLeaderEpoch.leaderEpoch(), segmentLeaderEpoch.offset()); +} +RemoteLogSegmentMetadata remoteLogSegmentMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, +record.startOffset(), record.endOffset(), record.maxTimestampMs(), record.brokerId(), +record.eventTimestampMs(), record.segmentSizeInBytes(), segmentLeaderEpochs); +RemoteLogSegmentMetadataUpdate rlsmUpdate = new RemoteLogSegmentMetadataUpdate(remoteLogSegmentId, Review comment: We did not expose `RemoteLogSegmentMetadata` constructor to take state as it is always created with `RemoteLogSegmentState.COPY_SEGMENT_STARTED` and it should always be updated with
[GitHub] [kafka] ijuma commented on a change in pull request #10498: MINOR: remove `checksumOrNull` and `isValid` from Record
ijuma commented on a change in pull request #10498: URL: https://github.com/apache/kafka/pull/10498#discussion_r614068123 ## File path: core/src/main/scala/kafka/tools/DumpLogSegments.scala ## @@ -279,8 +279,10 @@ object DumpLogSegments { if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) { print(" sequence: " + record.sequence + " headerKeys: " + record.headers.map(_.key).mkString("[", ",", "]")) - } else { -print(s" crc: ${record.checksumOrNull} isvalid: ${record.isValid}") + } + record match { +case r: AbstractLegacyRecordBatch => print(s" crc: ${r.checksum}}") Review comment: Shall we include the `isValid` part here too? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.
satishd commented on a change in pull request #10271: URL: https://github.com/apache/kafka/pull/10271#discussion_r614066999 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serde/RemoteLogSegmentMetadataSerde.java ## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage.serde; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Message; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; + +public class RemoteLogSegmentMetadataSerde implements RemoteLogMetadataSerdes { + +public Message serialize(RemoteLogSegmentMetadata data) { +RemoteLogSegmentMetadataRecord record = new RemoteLogSegmentMetadataRecord() +.setRemoteLogSegmentId( +new RemoteLogSegmentMetadataRecord.RemoteLogSegmentIdEntry() Review comment: +1 to extract them to a different method for better readability, updated in the latest commit. But RemoteLogSegmentIdEntry is created respectively in RemoteLogSegmentRecord and RemoteogSegmentUpdateRecord and they can not be shared now. Is there a way to create a type and import/include it in message definitions? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.
satishd commented on a change in pull request #10271: URL: https://github.com/apache/kafka/pull/10271#discussion_r614064785 ## File path: storage/src/main/resources/message/RemoteLogSegmentMetadata.json ## @@ -0,0 +1,126 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 0, + "type": "data", + "name": "RemoteLogSegmentMetadataRecord", + "validVersions": "0", + "flexibleVersions": "none", Review comment: Good point, I will add that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.
satishd commented on a change in pull request #10271: URL: https://github.com/apache/kafka/pull/10271#discussion_r614031843 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serde/RemoteLogMetadataContextSerde.java ## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage.serde; + +import org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataContext; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Message; +import org.apache.kafka.common.protocol.ObjectSerializationCache; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecordUpdate; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemotePartitionDeleteMetadataRecord; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + +/** + * Serialization and deserialization for {@link RemoteLogMetadataContext}. This is the root serdes for the messages + * that are stored in internal remote log metadata topic. + */ +public class RemoteLogMetadataContextSerde implements Serde { + +public static final byte REMOTE_LOG_SEGMENT_METADATA_API_KEY = (byte) new RemoteLogSegmentMetadataRecord().apiKey(); +public static final byte REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY = (byte) new RemoteLogSegmentMetadataRecordUpdate().apiKey(); +public static final byte REMOTE_PARTITION_DELETE_API_KEY = (byte) new RemotePartitionDeleteMetadataRecord().apiKey(); + +private final Map keyWithSerde; +private final Deserializer rootDeserializer; +private final Serializer rootSerializer; + +public RemoteLogMetadataContextSerde() { +keyWithSerde = createInternalSerde(); +rootSerializer = (topic, data) -> serialize(data); +rootDeserializer = (topic, data) -> deserialize(data); +} + +private Map createInternalSerde() { +Map serdes = new HashMap<>(); +serdes.put(REMOTE_LOG_SEGMENT_METADATA_API_KEY, new RemoteLogSegmentMetadataSerde()); +serdes.put(REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY, new RemoteLogSegmentMetadataUpdateSerde()); +serdes.put(REMOTE_PARTITION_DELETE_API_KEY, new RemotePartitionDeleteMetadataSerde()); +return serdes; +} + +private byte[] serialize(RemoteLogMetadataContext remoteLogMetadataContext) { +RemoteLogMetadataSerdes serDe = keyWithSerde.get(remoteLogMetadataContext.apiKey()); +if (serDe == null) { +throw new IllegalArgumentException("Serializer for apikey: " + remoteLogMetadataContext.apiKey() + + " does not exist."); +} + +@SuppressWarnings("unchecked") +Message message = serDe.serialize(remoteLogMetadataContext.payload()); + +return transformToBytes(message, remoteLogMetadataContext.apiKey(), remoteLogMetadataContext.version()); +} + +private RemoteLogMetadataContext deserialize(byte[] data) { +ByteBuffer byteBuffer = ByteBuffer.wrap(data); +byte apiKey = byteBuffer.get(); +byte version = byteBuffer.get(); +RemoteLogMetadataSerdes serDe = keyWithSerde.get(apiKey); +if (serDe == null) { +throw new IllegalArgumentException("Deserializer for apikey: " + apiKey + " does not exist."); +} + +Object deserializedObj = serDe.deserialize(version, byteBuffer); +return new RemoteLogMetadataContext(apiKey, version, deserializedObj); +} + +private byte[] transformToBytes(Message message, byte apiKey, byte apiVersion) { Review comment: I am fine with changing `transformToBytes` to `toBytes`. But I prefer to keep `serialize` and `deserialize` as they are the respective implementations of serializer and deserializer. -- This is an automated
[GitHub] [kafka] satishd commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.
satishd commented on a change in pull request #10271: URL: https://github.com/apache/kafka/pull/10271#discussion_r614004395 ## File path: storage/src/main/resources/message/RemoteLogSegmentMetadataUpdate.json ## @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 1, + "type": "data", + "name": "RemoteLogSegmentMetadataRecordUpdate", + "validVersions": "0", + "flexibleVersions": "none", + "fields": [ +{ + "name": "RemoteLogSegmentId", + "type": "RemoteLogSegmentIdEntry", + "versions": "0+", + "about": "Unique representation of the remote log segment", + "fields": [ +{ + "name": "TopicIdPartition", + "type": "TopicIdPartitionEntry", + "versions": "0+", + "about": "Represents unique topic partition", + "fields": [ +{ + "name": "Name", + "type": "string", + "versions": "0+", + "about": "Topic name" +}, +{ + "name": "Id", + "type": "uuid", + "versions": "0+", + "about": "Unique identifier of the topic" +}, +{ + "name": "Partition", + "type": "int32", + "versions": "0+", + "about": "Partition number" +} + ] +}, +{ + "name": "Id", + "type": "uuid", + "versions": "0+", + "about": "Unique identifier of the remote log segment" +} + ] +}, +{ + "name": "BrokerId", Review comment: Right, we will be updating the KIP with this change. ## File path: storage/src/main/resources/message/RemotePartitionDeleteMetadata.json ## @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 2, + "type": "data", + "name": "RemotePartitionDeleteMetadataRecord", + "validVersions": "0", + "flexibleVersions": "none", + "fields": [ +{ + "name": "TopicIdPartition", + "type": "TopicIdPartitionEntry", + "versions": "0+", + "about": "Represents unique topic partition", + "fields": [ +{ + "name": "Name", + "type": "string", + "versions": "0+", + "about": "Topic name" +}, +{ + "name": "Id", + "type": "uuid", + "versions": "0+", + "about": "Unique identifier of the topic" +}, +{ + "name": "Partition", + "type": "int32", + "versions": "0+", + "about": "Partition number" +} + ] +}, +{ + "name": "BrokerId", Review comment: Right, we will be updating the KIP with this change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.
satishd commented on a change in pull request #10271: URL: https://github.com/apache/kafka/pull/10271#discussion_r614002154 ## File path: storage/src/main/resources/message/RemoteLogSegmentMetadata.json ## @@ -0,0 +1,126 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 0, + "type": "data", + "name": "RemoteLogSegmentMetadataRecord", + "validVersions": "0", + "flexibleVersions": "none", + "fields": [ +{ + "name": "RemoteLogSegmentId", + "type": "RemoteLogSegmentIdEntry", + "versions": "0+", + "about": "Unique representation of the remote log segment", + "fields": [ +{ + "name": "TopicIdPartition", + "type": "TopicIdPartitionEntry", + "versions": "0+", + "about": "Represents unique topic partition", + "fields": [ +{ + "name": "Name", + "type": "string", + "versions": "0+", + "about": "Topic name" +}, +{ + "name": "Id", + "type": "uuid", + "versions": "0+", + "about": "Unique identifier of the topic" +}, +{ + "name": "Partition", + "type": "int32", + "versions": "0+", + "about": "Partition number" +} + ] +}, +{ + "name": "Id", + "type": "uuid", + "versions": "0+", + "about": "Unique identifier of the remote log segment" +} + ] +}, +{ + "name": "StartOffset", + "type": "int64", + "versions": "0+", + "about": "Start offset of the segment." +}, +{ + "name": "EndOffset", + "type": "int64", + "versions": "0+", + "about": "End offset of the segment." +}, +{ + "name": "BrokerId", Review comment: Right, we will be updating the KIP with this change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10541: KAFKA-12284: increase request timeout to make tests reliable
showuon commented on pull request #10541: URL: https://github.com/apache/kafka/pull/10541#issuecomment-820346080 Need to rework. Close it first. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon closed pull request #10541: KAFKA-12284: increase request timeout to make tests reliable
showuon closed pull request #10541: URL: https://github.com/apache/kafka/pull/10541 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12635) Mirrormaker 2 offset sync is incorrect if the target partition is empty
[ https://issues.apache.org/jira/browse/KAFKA-12635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17322092#comment-17322092 ] Alexis Polyzos commented on KAFKA-12635: [~yangguo1220] thanks for the reply! We tried your suggestion but unfortunately it didn't work. We created the consumer group with the kafka-console-consumer.sh and offset = 0. However, when the MM2 started the CheckpointConnector the offsets were translated poorly and became negative again. > Mirrormaker 2 offset sync is incorrect if the target partition is empty > --- > > Key: KAFKA-12635 > URL: https://issues.apache.org/jira/browse/KAFKA-12635 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.7.0 >Reporter: Frank Yi >Assignee: Ning Zhang >Priority: Major > > This bug occurs when using Mirrormaker with "sync.group.offsets.enabled = > true". > If a source partition is empty, but the source consumer group's offset for > that partition is non-zero, then Mirrormaker sets the target consumer group's > offset for that partition to the literal, not translated, offset of the > source consumer group. This state can be reached if the source consumer group > consumed some records that were now deleted (like by a retention policy), or > if Mirrormaker replication is set to start at "latest". This bug causes the > target consumer group's lag for that partition to be negative and breaks > offset sync for that partition until lag is positive. > The correct behavior when the source partition is empty would be to set the > target offset to the translated offset, not literal offset, which in this > case would always be 0. > Original email thread on this issue: > https://lists.apache.org/thread.html/r7c54ee5f57227367b911d4abffa72781772d8dd3b72d75eb65ee19f7%40%3Cusers.kafka.apache.org%3E -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-12662) add unit test for ProducerPerformance
[ https://issues.apache.org/jira/browse/KAFKA-12662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chun-Hao Tang reassigned KAFKA-12662: - Assignee: Chun-Hao Tang > add unit test for ProducerPerformance > - > > Key: KAFKA-12662 > URL: https://issues.apache.org/jira/browse/KAFKA-12662 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chun-Hao Tang >Priority: Major > > ProducerPerformance is a useful tool which offers an official way to test > produce performance. Hence, it would be better to add enough tests for it. > (In fact, it has no unit tests currently). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mimaison commented on pull request #9549: KIP-145: Add SMTs, HeaderFrom, DropHeaders and InsertHeader
mimaison commented on pull request #9549: URL: https://github.com/apache/kafka/pull/9549#issuecomment-820299395 Not sure why the build had failed, I've rekicked it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a change in pull request #9549: KIP-145: Add SMTs, HeaderFrom, DropHeaders and InsertHeader
mimaison commented on a change in pull request #9549: URL: https://github.com/apache/kafka/pull/9549#discussion_r613931955 ## File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java ## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.transforms; + +import org.apache.kafka.common.cache.Cache; +import org.apache.kafka.common.cache.LRUCache; +import org.apache.kafka.common.cache.SynchronizedCache; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.header.Header; +import org.apache.kafka.connect.header.Headers; +import org.apache.kafka.connect.transforms.util.NonEmptyListValidator; +import org.apache.kafka.connect.transforms.util.Requirements; +import org.apache.kafka.connect.transforms.util.SchemaUtil; +import org.apache.kafka.connect.transforms.util.SimpleConfig; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.lang.String.format; +import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE; + +public abstract class HeaderFrom> implements Transformation { + +public static final String FIELDS_FIELD = "fields"; +public static final String HEADERS_FIELD = "headers"; +public static final String OPERATION_FIELD = "operation"; +private static final String MOVE_OPERATION = "move"; +private static final String COPY_OPERATION = "copy"; + +public static final String OVERVIEW_DOC = +"Moves or copies fields in the key/value of a record into that record's headers. " + +"Corresponding elements of " + FIELDS_FIELD + " and " + +"" + HEADERS_FIELD + " together identify a field and the header it should be " + +"moved or copied to. " + +"Use the concrete transformation type designed for the record " + +"key (" + Key.class.getName() + ") or value (" + Value.class.getName() + ")."; + +public static final ConfigDef CONFIG_DEF = new ConfigDef() +.define(FIELDS_FIELD, ConfigDef.Type.LIST, +NO_DEFAULT_VALUE, new NonEmptyListValidator(), +ConfigDef.Importance.HIGH, +"Field names in the record whose values are to be copied or moved to headers.") +.define(HEADERS_FIELD, ConfigDef.Type.LIST, +NO_DEFAULT_VALUE, new NonEmptyListValidator(), +ConfigDef.Importance.HIGH, +"Header names, in the same order as the field names listed in the fields configuration property.") +.define(OPERATION_FIELD, ConfigDef.Type.STRING, NO_DEFAULT_VALUE, +ConfigDef.ValidString.in(MOVE_OPERATION, COPY_OPERATION), ConfigDef.Importance.HIGH, +"Either move if the fields are to be moved to the headers (removed from the key/value), " + +"or copy if the fields are to be copied to the headers (retained in the key/value)."); + +enum Operation { +MOVE(MOVE_OPERATION), +COPY(COPY_OPERATION); + +private final String name; + +Operation(String name) { +this.name = name; +} + +static Operation fromName(String name) { +switch (name) { +case MOVE_OPERATION: +return MOVE; +case COPY_OPERATION: +return COPY; +default: +throw new IllegalArgumentException(); Review comment: Ok, that's fair enough. Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on a change in pull request #9549: KIP-145: Add SMTs, HeaderFrom, DropHeaders and InsertHeader
tombentley commented on a change in pull request #9549: URL: https://github.com/apache/kafka/pull/9549#discussion_r613926107 ## File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java ## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.transforms; + +import org.apache.kafka.common.cache.Cache; +import org.apache.kafka.common.cache.LRUCache; +import org.apache.kafka.common.cache.SynchronizedCache; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.header.Header; +import org.apache.kafka.connect.header.Headers; +import org.apache.kafka.connect.transforms.util.NonEmptyListValidator; +import org.apache.kafka.connect.transforms.util.Requirements; +import org.apache.kafka.connect.transforms.util.SchemaUtil; +import org.apache.kafka.connect.transforms.util.SimpleConfig; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.lang.String.format; +import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE; + +public abstract class HeaderFrom> implements Transformation { + +public static final String FIELDS_FIELD = "fields"; +public static final String HEADERS_FIELD = "headers"; +public static final String OPERATION_FIELD = "operation"; +private static final String MOVE_OPERATION = "move"; +private static final String COPY_OPERATION = "copy"; + +public static final String OVERVIEW_DOC = +"Moves or copies fields in the key/value of a record into that record's headers. " + +"Corresponding elements of " + FIELDS_FIELD + " and " + +"" + HEADERS_FIELD + " together identify a field and the header it should be " + +"moved or copied to. " + +"Use the concrete transformation type designed for the record " + +"key (" + Key.class.getName() + ") or value (" + Value.class.getName() + ")."; + +public static final ConfigDef CONFIG_DEF = new ConfigDef() +.define(FIELDS_FIELD, ConfigDef.Type.LIST, +NO_DEFAULT_VALUE, new NonEmptyListValidator(), +ConfigDef.Importance.HIGH, +"Field names in the record whose values are to be copied or moved to headers.") +.define(HEADERS_FIELD, ConfigDef.Type.LIST, +NO_DEFAULT_VALUE, new NonEmptyListValidator(), +ConfigDef.Importance.HIGH, +"Header names, in the same order as the field names listed in the fields configuration property.") +.define(OPERATION_FIELD, ConfigDef.Type.STRING, NO_DEFAULT_VALUE, +ConfigDef.ValidString.in(MOVE_OPERATION, COPY_OPERATION), ConfigDef.Importance.HIGH, +"Either move if the fields are to be moved to the headers (removed from the key/value), " + +"or copy if the fields are to be copied to the headers (retained in the key/value)."); + +enum Operation { +MOVE(MOVE_OPERATION), +COPY(COPY_OPERATION); + +private final String name; + +Operation(String name) { +this.name = name; +} + +static Operation fromName(String name) { +switch (name) { +case MOVE_OPERATION: +return MOVE; +case COPY_OPERATION: +return COPY; +default: +throw new IllegalArgumentException(); Review comment: It is impossible due to the `ConfigDef.ValidString.in(MOVE_OPERATION, COPY_OPERATION)`, so this is really an assertion failure. The line number in the stacktrace would be enough to track it down if it ever did happen due to a later refactoring, so imho an error message is of no value. But I'm happy
[GitHub] [kafka] mimaison commented on a change in pull request #9549: KIP-145: Add SMTs, HeaderFrom, DropHeaders and InsertHeader
mimaison commented on a change in pull request #9549: URL: https://github.com/apache/kafka/pull/9549#discussion_r613913901 ## File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java ## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.transforms; + +import org.apache.kafka.common.cache.Cache; +import org.apache.kafka.common.cache.LRUCache; +import org.apache.kafka.common.cache.SynchronizedCache; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.header.Header; +import org.apache.kafka.connect.header.Headers; +import org.apache.kafka.connect.transforms.util.NonEmptyListValidator; +import org.apache.kafka.connect.transforms.util.Requirements; +import org.apache.kafka.connect.transforms.util.SchemaUtil; +import org.apache.kafka.connect.transforms.util.SimpleConfig; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.lang.String.format; +import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE; + +public abstract class HeaderFrom> implements Transformation { + +public static final String FIELDS_FIELD = "fields"; +public static final String HEADERS_FIELD = "headers"; +public static final String OPERATION_FIELD = "operation"; +private static final String MOVE_OPERATION = "move"; +private static final String COPY_OPERATION = "copy"; + +public static final String OVERVIEW_DOC = +"Moves or copies fields in the key/value of a record into that record's headers. " + +"Corresponding elements of " + FIELDS_FIELD + " and " + +"" + HEADERS_FIELD + " together identify a field and the header it should be " + +"moved or copied to. " + +"Use the concrete transformation type designed for the record " + +"key (" + Key.class.getName() + ") or value (" + Value.class.getName() + ")."; + +public static final ConfigDef CONFIG_DEF = new ConfigDef() +.define(FIELDS_FIELD, ConfigDef.Type.LIST, +NO_DEFAULT_VALUE, new NonEmptyListValidator(), +ConfigDef.Importance.HIGH, +"Field names in the record whose values are to be copied or moved to headers.") +.define(HEADERS_FIELD, ConfigDef.Type.LIST, +NO_DEFAULT_VALUE, new NonEmptyListValidator(), +ConfigDef.Importance.HIGH, +"Header names, in the same order as the field names listed in the fields configuration property.") +.define(OPERATION_FIELD, ConfigDef.Type.STRING, NO_DEFAULT_VALUE, +ConfigDef.ValidString.in(MOVE_OPERATION, COPY_OPERATION), ConfigDef.Importance.HIGH, +"Either move if the fields are to be moved to the headers (removed from the key/value), " + +"or copy if the fields are to be copied to the headers (retained in the key/value)."); + +enum Operation { +MOVE(MOVE_OPERATION), +COPY(COPY_OPERATION); + +private final String name; + +Operation(String name) { +this.name = name; +} + +static Operation fromName(String name) { +switch (name) { +case MOVE_OPERATION: +return MOVE; +case COPY_OPERATION: +return COPY; +default: +throw new IllegalArgumentException(); Review comment: Even though I don't think it's reachable by users, should we have a message here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact
[GitHub] [kafka] kowshik commented on a change in pull request #10478: KAFKA-12553: Refactor recovery logic to introduce LogLoader
kowshik commented on a change in pull request #10478: URL: https://github.com/apache/kafka/pull/10478#discussion_r613795734 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -255,19 +261,21 @@ case object SnapshotGenerated extends LogStartOffsetIncrementReason { @threadsafe class Log(@volatile private var _dir: File, @volatile var config: LogConfig, + val segments: LogSegments, Review comment: I took a stab at this. Things changed a bit after I did the refactoring that Jun suggested [here](https://github.com/apache/kafka/pull/10478#discussion_r610933966). In the latest version of the PR, `LogComponents` has been renamed to a class called `LoadedLogOffsets` which now only contains (logStartOffset, recoveryPoint, nextOffsetMetadata). It seemed reasonable to just pass these in as 3 separate parameters into `Log`, since the `LeaderEpochFileCache` and `ProducerStateManager` are no longer inside the `LoadedLogOffsets` object those have to anyway be passed in separately. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.
kowshik commented on a change in pull request #10271: URL: https://github.com/apache/kafka/pull/10271#discussion_r613882988 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serde/RemoteLogMetadataContextSerde.java ## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage.serde; + +import org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataContext; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Message; +import org.apache.kafka.common.protocol.ObjectSerializationCache; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecordUpdate; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemotePartitionDeleteMetadataRecord; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + +/** + * Serialization and deserialization for {@link RemoteLogMetadataContext}. This is the root serdes for the messages + * that are stored in internal remote log metadata topic. + */ +public class RemoteLogMetadataContextSerde implements Serde { + +public static final byte REMOTE_LOG_SEGMENT_METADATA_API_KEY = (byte) new RemoteLogSegmentMetadataRecord().apiKey(); +public static final byte REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY = (byte) new RemoteLogSegmentMetadataRecordUpdate().apiKey(); +public static final byte REMOTE_PARTITION_DELETE_API_KEY = (byte) new RemotePartitionDeleteMetadataRecord().apiKey(); + +private final Map keyWithSerde; +private final Deserializer rootDeserializer; +private final Serializer rootSerializer; + +public RemoteLogMetadataContextSerde() { +keyWithSerde = createInternalSerde(); +rootSerializer = (topic, data) -> serialize(data); +rootDeserializer = (topic, data) -> deserialize(data); +} + +private Map createInternalSerde() { +Map serdes = new HashMap<>(); +serdes.put(REMOTE_LOG_SEGMENT_METADATA_API_KEY, new RemoteLogSegmentMetadataSerde()); +serdes.put(REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY, new RemoteLogSegmentMetadataUpdateSerde()); +serdes.put(REMOTE_PARTITION_DELETE_API_KEY, new RemotePartitionDeleteMetadataSerde()); +return serdes; +} + +private byte[] serialize(RemoteLogMetadataContext remoteLogMetadataContext) { Review comment: The `ProducerRecord` can hold a key and a value. It seems like we could store the API key in `ProducerRecord.key()` and store the serialized payload in `ProducerRecord.value()`. Why not take that route instead of serializing to a single byte array here containing (apiKey, version, payload)? https://kafka.apache.org/26/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html This is of course assuming that `RemoteLogMetadataContextSerde` will only be used for serializing/deserializing events to/from a Kafka topic (as the class doc suggests). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org