[jira] [Assigned] (KAFKA-12158) Consider better return type of RaftClient.scheduleAppend
[ https://issues.apache.org/jira/browse/KAFKA-12158?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dengziming reassigned KAFKA-12158: -- Assignee: dengziming > Consider better return type of RaftClient.scheduleAppend > > > Key: KAFKA-12158 > URL: https://issues.apache.org/jira/browse/KAFKA-12158 > Project: Kafka > Issue Type: Sub-task >Reporter: Jason Gustafson >Assignee: dengziming >Priority: Major > > Currently `RaftClient` has the following Append API: > {code} > Long scheduleAppend(int epoch, List records); > {code} > There are a few possible cases that the single return value is trying to > handle: > 1. The epoch doesn't match or we are not the current leader => return > Long.MaxValue > 2. We failed to allocate memory to write the the batch (backpressure case) => > return null > 3. We successfully scheduled the append => return the expected offset > It might be better to define a richer type so that the cases that must be > handled are clearer. At a minimum, it would be better to return > `OptionalLong` and get rid of the null case. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-12908) Load snapshot heuristic
[ https://issues.apache.org/jira/browse/KAFKA-12908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17366100#comment-17366100 ] dengziming edited comment on KAFKA-12908 at 6/20/21, 4:30 AM: -- This may depend on KAFKA-12155 was (Author: dengziming): This may be depend on KAFKA-12155 > Load snapshot heuristic > --- > > Key: KAFKA-12908 > URL: https://issues.apache.org/jira/browse/KAFKA-12908 > Project: Kafka > Issue Type: Sub-task >Reporter: Jose Armando Garcia Sancio >Assignee: dengziming >Priority: Minor > > The {{KafkaRaftCient}} implementation only forces the {{RaftClient.Listener}} > to load a snapshot only when the listener's next offset is less than the > start offset. > This is technically correct but in some cases it may be more efficient to > load a snapshot even when the next offset exists in the log. This is clearly > true when the latest snapshot has less entries than the number of records > from the next offset to the latest snapshot. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12908) Load snapshot heuristic
[ https://issues.apache.org/jira/browse/KAFKA-12908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17366100#comment-17366100 ] dengziming commented on KAFKA-12908: This may be depend on KAFKA-12155 > Load snapshot heuristic > --- > > Key: KAFKA-12908 > URL: https://issues.apache.org/jira/browse/KAFKA-12908 > Project: Kafka > Issue Type: Sub-task >Reporter: Jose Armando Garcia Sancio >Assignee: dengziming >Priority: Minor > > The {{KafkaRaftCient}} implementation only forces the {{RaftClient.Listener}} > to load a snapshot only when the listener's next offset is less than the > start offset. > This is technically correct but in some cases it may be more efficient to > load a snapshot even when the next offset exists in the log. This is clearly > true when the latest snapshot has less entries than the number of records > from the next offset to the latest snapshot. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kpatelatwork commented on pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)
kpatelatwork commented on pull request #10822: URL: https://github.com/apache/kafka/pull/10822#issuecomment-864493282 Fired up local kafka server and kafka connect and was able to curl the API locally. > ➜ kafka git:(KAFKA-4793) ✗ curl http://192.168.1.220:8083/connectors/local-file-source/status {"name":"local-file-source","connector":{"state":"RUNNING","worker_id":"192.168.1.220:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"192.168.1.220:8083"}],"type":"source"}% ➜ kafka git:(KAFKA-4793) ✗ curl -XPOST http://192.168.1.220:8083/connectors/local-file-source/restart\?includeTasks\=true\&onlyFailed\=false {"name":"local-file-source","connector":{"state":"RESTARTING","worker_id":"192.168.1.220:8083"},"tasks":[{"id":0,"state":"RESTARTING","worker_id":"192.168.1.220:8083"}],"type":"source"}% ➜ kafka git:(KAFKA-4793) ✗ curl -XPOST http://192.168.1.220:8083/connectors/local-file-source/restart\?includeTasks\=false\&onlyFailed\=false ➜ kafka git:(KAFKA-4793) ✗ curl -XPOST http://192.168.1.220:8083/connectors/local-file-source/restart\?includeTasks\=false\&onlyFailed\=true {"name":"local-file-source","connector":{"state":"RUNNING","worker_id":"192.168.1.220:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"192.168.1.220:8083"}],"type":"source"}% ➜ kafka git:(KAFKA-4793) ✗ curl -XPOST http://192.168.1.220:8083/connectors/local-file-source/restart\?includeTasks\=true\&onlyFailed\=true {"name":"local-file-source","connector":{"state":"RUNNING","worker_id":"192.168.1.220:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"192.168.1.220:8083"}],"type":"source"}% ➜ kafka git:(KAFKA-4793) ✗ curl -XPOST http://192.168.1.220:8083/connectors/local-file-source/restart\?includeTasks\=true {"name":"local-file-source","connector":{"state":"RESTARTING","worker_id":"192.168.1.220:8083"},"tasks":[{"id":0,"state":"RESTARTING","worker_id":"192.168.1.220:8083"}],"type":"source"}% -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10794: KAFKA-12677: return not_controller error in envelope response itself
showuon commented on a change in pull request #10794: URL: https://github.com/apache/kafka/pull/10794#discussion_r654866143 ## File path: clients/src/main/java/org/apache/kafka/common/requests/ApiError.java ## @@ -36,10 +37,16 @@ private final String message; public static ApiError fromThrowable(Throwable t) { +Throwable throwableToBeEncode = t; +// Future will wrap the original exception with completionException, which will return unexpected UNKNOWN_SERVER_ERROR. +if (t instanceof CompletionException) { +throwableToBeEncode = t.getCause(); Review comment: fix 1: unwrap the `CompletionException` to get the original exception inside. Even if we don't want the `NotControllerException` return back to client, we need to know it to do some check. ## File path: clients/src/main/java/org/apache/kafka/common/requests/ApiError.java ## @@ -36,10 +37,16 @@ private final String message; public static ApiError fromThrowable(Throwable t) { +Throwable throwableToBeEncode = t; +// Future will wrap the original exception with completionException, which will return unexpected UNKNOWN_SERVER_ERROR. +if (t instanceof CompletionException) { +throwableToBeEncode = t.getCause(); Review comment: fix 1: unwrap the CompletionException to get the original exception inside. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12889) log clean group consider empty log segment to avoid empty log left
[ https://issues.apache.org/jira/browse/KAFKA-12889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17366024#comment-17366024 ] Guozhang Wang commented on KAFKA-12889: --- Thanks [~iamgd67] for reporting the issue and for the fix too! > log clean group consider empty log segment to avoid empty log left > -- > > Key: KAFKA-12889 > URL: https://issues.apache.org/jira/browse/KAFKA-12889 > Project: Kafka > Issue Type: Bug > Components: log cleaner >Affects Versions: 0.10.1.1, 2.8.0, 3.1.0 >Reporter: qiang Liu >Priority: Trivial > Fix For: 3.0.0 > > > to avoid log index 4 byte relative offset overflow, log cleaner group check > log segments offset to make sure group offset range not exceed Int.MaxValue. > this offset check currentlly not cosider next is next log segment is empty, > so there will left empty log files every about 2^31 messages. > the left empty logs will be reprocessed every clean cycle, which will rewrite > it with same empty content, witch cause little no need io. > for __consumer_offsets topic, normally we can set cleanup.policy to > compact,delete to get rid of this. > my cluster is 0.10.1.1, but after aylize trunk code, it should has same > problem too. > > some of my left empty logs,(run ls -l) > -rw-r- 1 u g 0 Dec 16 2017 .index > -rw-r- 1 u g 0 Dec 16 2017 .log > -rw-r- 1 u g 0 Dec 16 2017 .timeindex > -rw-r- 1 u g 0 Jan 15 2018 002148249632.index > -rw-r- 1 u g 0 Jan 15 2018 002148249632.log > -rw-r- 1 u g 0 Jan 15 2018 002148249632.timeindex > -rw-r- 1 u g 0 Jan 27 2018 004295766494.index > -rw-r- 1 u g 0 Jan 27 2018 004295766494.log > -rw-r- 1 u g 0 Jan 27 2018 004295766494.timeindex > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang merged pull request #10818: KAFKA-12889: log clean relative index range check of group consider empty log segment to avoid too many empty log segment left
guozhangwang merged pull request #10818: URL: https://github.com/apache/kafka/pull/10818 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #10818: KAFKA-12889: log clean relative index range check of group consider empty log segment to avoid too many empty log segment left
guozhangwang commented on pull request #10818: URL: https://github.com/apache/kafka/pull/10818#issuecomment-864471492 @iamgd67 Thanks for the PR! and also thank @showuon for the reviews. I made a quick pass and it LGTM. Merging to trunk now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12971) Kakfa 1.1.x clients cache broker hostnames, client stuck when host is swapped for the same broker.id
[ https://issues.apache.org/jira/browse/KAFKA-12971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17366015#comment-17366015 ] Ismael Juma commented on KAFKA-12971: - Thanks for the report. 1.1.1 is no longer supported. Any reason why you cannot upgrade to 2.x? > Kakfa 1.1.x clients cache broker hostnames, client stuck when host is > swapped for the same broker.id > - > > Key: KAFKA-12971 > URL: https://issues.apache.org/jira/browse/KAFKA-12971 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.1.0, 1.1.1, 1.1.2 >Reporter: GEORGE LI >Priority: Major > Fix For: 2.1.2 > > > There was an upgrade of kafka-client version from 0.11 to 1.1.x to fix a bug > in 0.11 with too frequent consumer offset commits. Due to the Flink version, > it can be directly using latest 2.x kafka-client version. > {code} > Error sending fetch request (sessionId=178328175, epoch=INITIAL) to node 425: > org.apache.kafka.common.errors.DisconnectException. > {code} > some consumers were stuck with above messages with broker.id 425 had hardware > failures and got swapped with a different hostname. > Comparing the {{ClusterConnectionStates.connecting()}} of the 3 versions: > 0.11.0.3: > {code} > public void connecting(String id, long now, String host, ClientDnsLookup > clientDnsLookup) { > nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, > now, this.reconnectBackoffInitMs, host, clientDnsLookup)); > } > {code} > 1.1.x: > {code} > public void connecting(String id, long now, String host, ClientDnsLookup > clientDnsLookup) { > if (nodeState.containsKey(id)) { > NodeConnectionState connectionState = nodeState.get(id); > connectionState.lastConnectAttemptMs = now; > connectionState.state = ConnectionState.CONNECTING; > // Move to next resolved address, or if addresses are exhausted, > mark node to be re-resolved > connectionState.moveToNextAddress(); > } else { > nodeState.put(id, new > NodeConnectionState(ConnectionState.CONNECTING, now, > this.reconnectBackoffInitMs, host, clientDnsLookup)); > } > } > {code} > 2.2.x: > {code} > public void connecting(String id, long now, String host, ClientDnsLookup > clientDnsLookup) { > NodeConnectionState connectionState = nodeState.get(id); > if (connectionState != null && connectionState.host().equals(host)) { > connectionState.lastConnectAttemptMs = now; > connectionState.state = ConnectionState.CONNECTING; > // Move to next resolved address, or if addresses are exhausted, > mark node to be re-resolved > connectionState.moveToNextAddress(); > return; > } else if (connectionState != null) { > log.info("Hostname for node {} changed from {} to {}.", id, > connectionState.host(), host); > } > // Create a new NodeConnectionState if nodeState does not already > contain one > // for the specified id or if the hostname associated with the node > id changed. > nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, > now, > this.reconnectBackoffInitMs, host, clientDnsLookup)); > } > {code} > From above, the {{0.11.0.3}} is just putting the node to the NodeState > HashMap to retry with update host. > In {{1.1.x}}, it adds a logic of "caching". {{if > (nodeState.containsKey(id))}}, However, if the HOSTNAME of the broker.id is > swapped/changed, it never gets to the else block to update the NodeState with > the new hostname. > In {{2.2.x}}, it adds an additional check {{if (connectionState != null && > connectionState.host().equals(host))}}, if the Hostname changed, then called > {{nodeState.put()}} to update the host. > So from above, it looks like the 1.1.x caching logic introduced a bug of not > updating the nodeState()'s host when that is changed (e..g host failure, swap > with a different hostname, but use the same broker.id). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12971) Kakfa 1.1.x clients cache broker hostnames, client stuck when host is swapped for the same broker.id
[ https://issues.apache.org/jira/browse/KAFKA-12971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17366011#comment-17366011 ] GEORGE LI commented on KAFKA-12971: --- This issue is fixed in the {{1.1.x}} kafka client by back porting the fix in KAFKA-7890 from 2.x to invalidate the cache when the hostname is changed. > Kakfa 1.1.x clients cache broker hostnames, client stuck when host is > swapped for the same broker.id > - > > Key: KAFKA-12971 > URL: https://issues.apache.org/jira/browse/KAFKA-12971 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.1.0, 1.1.1, 1.1.2 >Reporter: GEORGE LI >Priority: Major > Fix For: 2.1.2 > > > There was an upgrade of kafka-client version from 0.11 to 1.1.x to fix a bug > in 0.11 with too frequent consumer offset commits. Due to the Flink version, > it can be directly using latest 2.x kafka-client version. > {code} > Error sending fetch request (sessionId=178328175, epoch=INITIAL) to node 425: > org.apache.kafka.common.errors.DisconnectException. > {code} > some consumers were stuck with above messages with broker.id 425 had hardware > failures and got swapped with a different hostname. > Comparing the {{ClusterConnectionStates.connecting()}} of the 3 versions: > 0.11.0.3: > {code} > public void connecting(String id, long now, String host, ClientDnsLookup > clientDnsLookup) { > nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, > now, this.reconnectBackoffInitMs, host, clientDnsLookup)); > } > {code} > 1.1.x: > {code} > public void connecting(String id, long now, String host, ClientDnsLookup > clientDnsLookup) { > if (nodeState.containsKey(id)) { > NodeConnectionState connectionState = nodeState.get(id); > connectionState.lastConnectAttemptMs = now; > connectionState.state = ConnectionState.CONNECTING; > // Move to next resolved address, or if addresses are exhausted, > mark node to be re-resolved > connectionState.moveToNextAddress(); > } else { > nodeState.put(id, new > NodeConnectionState(ConnectionState.CONNECTING, now, > this.reconnectBackoffInitMs, host, clientDnsLookup)); > } > } > {code} > 2.2.x: > {code} > public void connecting(String id, long now, String host, ClientDnsLookup > clientDnsLookup) { > NodeConnectionState connectionState = nodeState.get(id); > if (connectionState != null && connectionState.host().equals(host)) { > connectionState.lastConnectAttemptMs = now; > connectionState.state = ConnectionState.CONNECTING; > // Move to next resolved address, or if addresses are exhausted, > mark node to be re-resolved > connectionState.moveToNextAddress(); > return; > } else if (connectionState != null) { > log.info("Hostname for node {} changed from {} to {}.", id, > connectionState.host(), host); > } > // Create a new NodeConnectionState if nodeState does not already > contain one > // for the specified id or if the hostname associated with the node > id changed. > nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, > now, > this.reconnectBackoffInitMs, host, clientDnsLookup)); > } > {code} > From above, the {{0.11.0.3}} is just putting the node to the NodeState > HashMap to retry with update host. > In {{1.1.x}}, it adds a logic of "caching". {{if > (nodeState.containsKey(id))}}, However, if the HOSTNAME of the broker.id is > swapped/changed, it never gets to the else block to update the NodeState with > the new hostname. > In {{2.2.x}}, it adds an additional check {{if (connectionState != null && > connectionState.host().equals(host))}}, if the Hostname changed, then called > {{nodeState.put()}} to update the host. > So from above, it looks like the 1.1.x caching logic introduced a bug of not > updating the nodeState()'s host when that is changed (e..g host failure, swap > with a different hostname, but use the same broker.id). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12971) Kakfa 1.1.x clients cache broker hostnames, client stuck when host is swapped for the same broker.id
GEORGE LI created KAFKA-12971: - Summary: Kakfa 1.1.x clients cache broker hostnames, client stuck when host is swapped for the same broker.id Key: KAFKA-12971 URL: https://issues.apache.org/jira/browse/KAFKA-12971 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 1.1.1, 1.1.0, 1.1.2 Reporter: GEORGE LI Fix For: 2.1.2 There was an upgrade of kafka-client version from 0.11 to 1.1.x to fix a bug in 0.11 with too frequent consumer offset commits. Due to the Flink version, it can be directly using latest 2.x kafka-client version. {code} Error sending fetch request (sessionId=178328175, epoch=INITIAL) to node 425: org.apache.kafka.common.errors.DisconnectException. {code} some consumers were stuck with above messages with broker.id 425 had hardware failures and got swapped with a different hostname. Comparing the {{ClusterConnectionStates.connecting()}} of the 3 versions: 0.11.0.3: {code} public void connecting(String id, long now, String host, ClientDnsLookup clientDnsLookup) { nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, now, this.reconnectBackoffInitMs, host, clientDnsLookup)); } {code} 1.1.x: {code} public void connecting(String id, long now, String host, ClientDnsLookup clientDnsLookup) { if (nodeState.containsKey(id)) { NodeConnectionState connectionState = nodeState.get(id); connectionState.lastConnectAttemptMs = now; connectionState.state = ConnectionState.CONNECTING; // Move to next resolved address, or if addresses are exhausted, mark node to be re-resolved connectionState.moveToNextAddress(); } else { nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, now, this.reconnectBackoffInitMs, host, clientDnsLookup)); } } {code} 2.2.x: {code} public void connecting(String id, long now, String host, ClientDnsLookup clientDnsLookup) { NodeConnectionState connectionState = nodeState.get(id); if (connectionState != null && connectionState.host().equals(host)) { connectionState.lastConnectAttemptMs = now; connectionState.state = ConnectionState.CONNECTING; // Move to next resolved address, or if addresses are exhausted, mark node to be re-resolved connectionState.moveToNextAddress(); return; } else if (connectionState != null) { log.info("Hostname for node {} changed from {} to {}.", id, connectionState.host(), host); } // Create a new NodeConnectionState if nodeState does not already contain one // for the specified id or if the hostname associated with the node id changed. nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, now, this.reconnectBackoffInitMs, host, clientDnsLookup)); } {code} >From above, the {{0.11.0.3}} is just putting the node to the NodeState HashMap >to retry with update host. In {{1.1.x}}, it adds a logic of "caching". {{if (nodeState.containsKey(id))}}, However, if the HOSTNAME of the broker.id is swapped/changed, it never gets to the else block to update the NodeState with the new hostname. In {{2.2.x}}, it adds an additional check {{if (connectionState != null && connectionState.host().equals(host))}}, if the Hostname changed, then called {{nodeState.put()}} to update the host. So from above, it looks like the 1.1.x caching logic introduced a bug of not updating the nodeState()'s host when that is changed (e..g host failure, swap with a different hostname, but use the same broker.id). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on pull request #10869: KAFKA-10546: Deprecate old PAPI
guozhangwang commented on pull request #10869: URL: https://github.com/apache/kafka/pull/10869#issuecomment-864424835 @vvcephei I took a quick look at the more recent commits (for `KStream.java` the part you wanted me to take another look is around the javadoc copy-paste errors, right?), still looks good to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 merged pull request #10905: MINOR Addressed minor typos in READMEs.
chia7712 merged pull request #10905: URL: https://github.com/apache/kafka/pull/10905 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 merged pull request #10908: MINOR: fix round_trip_fault_test.py - don't assign replicas to nonexi…
chia7712 merged pull request #10908: URL: https://github.com/apache/kafka/pull/10908 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-12908) Load snapshot heuristic
[ https://issues.apache.org/jira/browse/KAFKA-12908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dengziming reassigned KAFKA-12908: -- Assignee: dengziming > Load snapshot heuristic > --- > > Key: KAFKA-12908 > URL: https://issues.apache.org/jira/browse/KAFKA-12908 > Project: Kafka > Issue Type: Sub-task >Reporter: Jose Armando Garcia Sancio >Assignee: dengziming >Priority: Minor > > The {{KafkaRaftCient}} implementation only forces the {{RaftClient.Listener}} > to load a snapshot only when the listener's next offset is less than the > start offset. > This is technically correct but in some cases it may be more efficient to > load a snapshot even when the next offset exists in the log. This is clearly > true when the latest snapshot has less entries than the number of records > from the next offset to the latest snapshot. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 opened a new pull request #10908: MINOR: fix round_trip_fault_test.py - don't assign replicas to nonexi…
chia7712 opened a new pull request #10908: URL: https://github.com/apache/kafka/pull/10908 The broker id starts with 1 (https://github.com/apache/kafka/blob/trunk/tests/kafkatest/services/kafka/kafka.py#L207) so `round_trip_fault_test.py` do assign replica to nonexistent broker. The interesting story is that it fails on KRaft only. KRaft mode check the existent ids (https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java#L950). By contrast, zk mode have no such check and the `min.insync.replicas` is set with `1` so it can work on zk mode even though there is one replica is off-line. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-12243) Add toString methods to some of the classes introduced by this Epic
[ https://issues.apache.org/jira/browse/KAFKA-12243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] loboxu reassigned KAFKA-12243: -- Assignee: loboxu > Add toString methods to some of the classes introduced by this Epic > --- > > Key: KAFKA-12243 > URL: https://issues.apache.org/jira/browse/KAFKA-12243 > Project: Kafka > Issue Type: Sub-task >Reporter: Jose Armando Garcia Sancio >Assignee: loboxu >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-12243) Add toString methods to some of the classes introduced by this Epic
[ https://issues.apache.org/jira/browse/KAFKA-12243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] loboxu reassigned KAFKA-12243: -- Assignee: (was: loboxu) > Add toString methods to some of the classes introduced by this Epic > --- > > Key: KAFKA-12243 > URL: https://issues.apache.org/jira/browse/KAFKA-12243 > Project: Kafka > Issue Type: Sub-task >Reporter: Jose Armando Garcia Sancio >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-12333) KafkaMetadataLog and MockLock should validate that appended epochs are monotonically
[ https://issues.apache.org/jira/browse/KAFKA-12333?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] loboxu reassigned KAFKA-12333: -- Assignee: (was: loboxu) > KafkaMetadataLog and MockLock should validate that appended epochs are > monotonically > > > Key: KAFKA-12333 > URL: https://issues.apache.org/jira/browse/KAFKA-12333 > Project: Kafka > Issue Type: Sub-task > Components: replication >Reporter: Jose Armando Garcia Sancio >Priority: Major > > Both the MockLog and KafkaMetadataLog should only allow appendAsLeader and > appendAsFollower with monotonically increasing epochs. In other words the > following test in KafkaMetadataLogTest should fail: > {code:java} > @Test > def testOutOfOrderEpoch(): Unit = { > val topicPartition = new TopicPartition("cluster-metadata", 0) > val log = buildMetadataLog(tempDir, mockTime, topicPartition)val > recordFoo = new SimpleRecord("foo".getBytes()) > val currentEpoch = 3 > val initialOffset = log.endOffset().offsetlog.appendAsLeader( > MemoryRecords.withRecords(initialOffset, CompressionType.NONE, > currentEpoch, recordFoo), > currentEpoch > )// Out order epoch should throw an exception > log.appendAsLeader( > MemoryRecords.withRecords( > initialOffset + 1, CompressionType.NONE, currentEpoch - 1, recordFoo > ), > currentEpoch - 1 > )log.appendAsFollower( > MemoryRecords.withRecords( > initialOffset + 2, CompressionType.NONE, currentEpoch - 2, recordFoo > ) > ) > } {code} > The same for MockLogTest. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-12851) Flaky Test RaftEventSimulationTest.canMakeProgressIfMajorityIsReachable
[ https://issues.apache.org/jira/browse/KAFKA-12851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] loboxu reassigned KAFKA-12851: -- Assignee: (was: loboxu) > Flaky Test RaftEventSimulationTest.canMakeProgressIfMajorityIsReachable > --- > > Key: KAFKA-12851 > URL: https://issues.apache.org/jira/browse/KAFKA-12851 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: A. Sophie Blee-Goldman >Priority: Blocker > Fix For: 3.0.0 > > > Failed twice on a [PR > build|https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10755/6/testReport/] > h3. Stacktrace > org.opentest4j.AssertionFailedError: expected: but was: at > org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at > org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) at > org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:35) at > org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:162) at > org.apache.kafka.raft.RaftEventSimulationTest.canMakeProgressIfMajorityIsReachable(RaftEventSimulationTest.java:263) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Issue Comment Deleted] (KAFKA-10900) Add metrics enumerated in KIP-630
[ https://issues.apache.org/jira/browse/KAFKA-10900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] loboxu updated KAFKA-10900: --- Comment: was deleted (was: [~jagsancio] Can you give me some guidance? Where the metrics are added in the code. I have looked at the code and the logic is not very clear. Here is my understanding of the position: * GenSnapshotLatencyMs:ReplicatedCounter.handleCommit() - LoadSnapshotLatencyMs:ReplicatedCounter.handleSnapshot(() - SnapshotSizeBytes: ReplicatedCounter.handleCommit() - SnapshotLag: ReplicatedCounter.handleCommit()) > Add metrics enumerated in KIP-630 > - > > Key: KAFKA-10900 > URL: https://issues.apache.org/jira/browse/KAFKA-10900 > Project: Kafka > Issue Type: Sub-task > Components: replication >Reporter: Jose Armando Garcia Sancio >Assignee: loboxu >Priority: Major > > KIP-630 enumerates a few metrics. Makes sure that those metrics are > implemented. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12689) Remove deprecated EOS configs
[ https://issues.apache.org/jira/browse/KAFKA-12689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17365914#comment-17365914 ] loboxu commented on KAFKA-12689: Thank you very much for reminding me. I know what I should do. > Remove deprecated EOS configs > - > > Key: KAFKA-12689 > URL: https://issues.apache.org/jira/browse/KAFKA-12689 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: loboxu >Priority: Blocker > Fix For: 4.0.0 > > > In > [KIP-732|https://cwiki.apache.org/confluence/display/KAFKA/KIP-732%3A+Deprecate+eos-alpha+and+replace+eos-beta+with+eos-v2] > we deprecated the EXACTLY_ONCE and EXACTLY_ONCE_BETA configs in > StreamsConfig, to be removed in 4.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vamossagar12 commented on pull request #10877: KAFKA-12925: adding presfixScan operation for missed implementations
vamossagar12 commented on pull request #10877: URL: https://github.com/apache/kafka/pull/10877#issuecomment-864382529 @mjsax , have added the tests for the combinations that you have mentioned. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #10907: KAFKA-10000: Exactly-once support for source connectors
C0urante commented on pull request #10907: URL: https://github.com/apache/kafka/pull/10907#issuecomment-864380630 @gharris1727 you've been tremendously helpful so far in the design process; if you have time would you like to take a look at the implementation as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante opened a new pull request #10907: KAFKA-10000: Exactly-once support for source connectors
C0urante opened a new pull request #10907: URL: https://github.com/apache/kafka/pull/10907 Implements [KIP-618](https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors#KIP618:ExactlyOnceSupportforSourceConnectors-Newmetrics). There are several changes here that can be reviewed fairly independently of each other: - Support for transactional source tasks, which is largely implemented in the `ExactlyOnceWorkerSourceTask` class, its newly-introduced `AbstractWorkerSourceTask` superclass, the `Worker` class (whose API for task starts has been split up from the existing `startTask` method into separate `startSinkTask`, `startSourceTask`, and `startExactlyOnceSourceTask` methods), and the `WorkerTransactionContext` class (which is used to allow connectors to define their own transaction boundaries) - Zombie fencing logic and the use of a transactional producer for some writes to the config topic, which are done by the leader of the cluster and are largely implemented in the `DistributedHerder`, `ConfigBackingStore`, `KafkaConfigBackingStore`, and `ClusterConfigState` classes - A new method in the `Admin` API for fencing out transactional producers by ID, which is done with changes to the `Admin` interface (unsurprisingly) and the `KafkaAdminClient` class - Support for per-connector offsets topics, which touches on the `Worker`, `OffsetStorageReaderImpl`, and `OffsetStorageWriter` classes - A few new `SourceConnector` methods for communicating support for exactly-once guarantees and connector-defined transactions; these take place in the `SourceConnector` class (also unsurprisingly) and the `AbstractHerder` class Existing unit tests are expanded where applicable, and new ones have been introduced where necessary. Eight new integration tests are added, which cover scenarios including preflight validation checks, all three types of transaction boundary, graceful recovery of the leader when fenced out from the config topic by another worker, ensuring that the correct number of task producers are fenced out across generations, accurate reporting of failure to bring up tasks when fencing does not succeed (includes an ACL-secured embedded Kafka cluster to simulate one of the most likely potential causes of this issue--insufficient permissions on the targeted Kafka cluster), and the use of a custom offsets topic. Many but not all existing system tests are modified to add cases involving exactly-once source support, which helps give us reasonable confidence that the feature is agnostic with regards to rebalance protocol. A new test is added that is based on the existing bounce test, but with no sink connector and with stricter expectations for delivery guarantees (no duplicates are permitted). ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed
showuon commented on pull request #10794: URL: https://github.com/apache/kafka/pull/10794#issuecomment-864374754 @hachikuji , I checked and there's no race condition there. The reason why we didn't put `NOT_CONTROLLER` in EnvelopResponse itself is because we will build envelopeResponse **with no error** as long as we got response from handler (createTopics handler in this case) [here](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/RequestChannel.scala#L124). So, in `ControllerApi#handleCreateTopics`, we'll sendResponse when future completed, and then `RequestHandlerHelper#sendResponseMaybeThrottle`, we'll `buildResponseSend` and then go to the above link location. So, to fix this issue, I need to build EnvelopeResponse with `NotControllerError` when the response from handler is having `NotControllerError` (fix 2). But if we don't have (fix 1), we can only get `Unknown_Server_Error` in the response from handler because the `NotControllerError` is wrapped with `CompletionException`. That's my solution. Please help review. Thank you very much! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed
showuon commented on a change in pull request #10794: URL: https://github.com/apache/kafka/pull/10794#discussion_r654763984 ## File path: core/src/main/scala/kafka/network/RequestChannel.scala ## @@ -124,8 +124,15 @@ object RequestChannel extends Logging { def buildResponseSend(abstractResponse: AbstractResponse): Send = { envelope match { case Some(request) => - val responseBytes = context.buildResponseEnvelopePayload(abstractResponse) - val envelopeResponse = new EnvelopeResponse(responseBytes, Errors.NONE) + val envelopeResponse = if (abstractResponse.errorCounts().containsKey(Errors.NOT_CONTROLLER)) { +// Since it's Not Controller error response, we need to make envelope response with Not Controller error +// to notify the requester (i.e. BrokerToControllerRequestThread) to update active controller +new EnvelopeResponse(new EnvelopeResponseData() + .setErrorCode(Errors.NOT_CONTROLLER.code())) Review comment: fix 2: Make the envelope response return `NotControllerException` if the controller response has `NotControllerException`. So that we can catch the `NotControllerException` on envelopeResponse to update the active controller. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org