[jira] [Assigned] (KAFKA-12158) Consider better return type of RaftClient.scheduleAppend

2021-06-19 Thread dengziming (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12158?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dengziming reassigned KAFKA-12158:
--

Assignee: dengziming

> Consider better return type of RaftClient.scheduleAppend
> 
>
> Key: KAFKA-12158
> URL: https://issues.apache.org/jira/browse/KAFKA-12158
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jason Gustafson
>Assignee: dengziming
>Priority: Major
>
> Currently `RaftClient` has the following Append API:
> {code}
> Long scheduleAppend(int epoch, List records);
> {code}
> There are a few possible cases that the single return value is trying to 
> handle:
> 1. The epoch doesn't match or we are not the current leader => return 
> Long.MaxValue
> 2. We failed to allocate memory to write the the batch (backpressure case) => 
> return null
> 3. We successfully scheduled the append => return the expected offset
> It might be better to define a richer type so that the cases that must be 
> handled are clearer. At a minimum, it would be better to return 
> `OptionalLong` and get rid of the null case.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-12908) Load snapshot heuristic

2021-06-19 Thread dengziming (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17366100#comment-17366100
 ] 

dengziming edited comment on KAFKA-12908 at 6/20/21, 4:30 AM:
--

This may depend on KAFKA-12155


was (Author: dengziming):
This may be depend on KAFKA-12155

> Load snapshot heuristic
> ---
>
> Key: KAFKA-12908
> URL: https://issues.apache.org/jira/browse/KAFKA-12908
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jose Armando Garcia Sancio
>Assignee: dengziming
>Priority: Minor
>
> The {{KafkaRaftCient}} implementation only forces the {{RaftClient.Listener}} 
> to load a snapshot only when the listener's next offset is less than the 
> start offset.
> This is technically correct but in some cases it may be more efficient to 
> load a snapshot even when the next offset exists in the log. This is clearly 
> true when the latest snapshot has less entries than the number of records 
> from the next offset to the latest snapshot.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12908) Load snapshot heuristic

2021-06-19 Thread dengziming (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17366100#comment-17366100
 ] 

dengziming commented on KAFKA-12908:


This may be depend on KAFKA-12155

> Load snapshot heuristic
> ---
>
> Key: KAFKA-12908
> URL: https://issues.apache.org/jira/browse/KAFKA-12908
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jose Armando Garcia Sancio
>Assignee: dengziming
>Priority: Minor
>
> The {{KafkaRaftCient}} implementation only forces the {{RaftClient.Listener}} 
> to load a snapshot only when the listener's next offset is less than the 
> start offset.
> This is technically correct but in some cases it may be more efficient to 
> load a snapshot even when the next offset exists in the log. This is clearly 
> true when the latest snapshot has less entries than the number of records 
> from the next offset to the latest snapshot.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kpatelatwork commented on pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)

2021-06-19 Thread GitBox


kpatelatwork commented on pull request #10822:
URL: https://github.com/apache/kafka/pull/10822#issuecomment-864493282


   Fired up local kafka server and kafka connect and was able to curl the API 
locally.
   
   > ➜  kafka git:(KAFKA-4793) ✗ curl 
http://192.168.1.220:8083/connectors/local-file-source/status
   
{"name":"local-file-source","connector":{"state":"RUNNING","worker_id":"192.168.1.220:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"192.168.1.220:8083"}],"type":"source"}%
➜  kafka git:(KAFKA-4793) ✗ curl -XPOST 
http://192.168.1.220:8083/connectors/local-file-source/restart\?includeTasks\=true\&onlyFailed\=false
   
{"name":"local-file-source","connector":{"state":"RESTARTING","worker_id":"192.168.1.220:8083"},"tasks":[{"id":0,"state":"RESTARTING","worker_id":"192.168.1.220:8083"}],"type":"source"}%
  ➜  kafka git:(KAFKA-4793) ✗ curl -XPOST 
http://192.168.1.220:8083/connectors/local-file-source/restart\?includeTasks\=false\&onlyFailed\=false
   ➜  kafka git:(KAFKA-4793) ✗ curl -XPOST 
http://192.168.1.220:8083/connectors/local-file-source/restart\?includeTasks\=false\&onlyFailed\=true
 
   
{"name":"local-file-source","connector":{"state":"RUNNING","worker_id":"192.168.1.220:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"192.168.1.220:8083"}],"type":"source"}%
➜  kafka git:(KAFKA-4793) ✗ curl -XPOST 
http://192.168.1.220:8083/connectors/local-file-source/restart\?includeTasks\=true\&onlyFailed\=true
   
{"name":"local-file-source","connector":{"state":"RUNNING","worker_id":"192.168.1.220:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"192.168.1.220:8083"}],"type":"source"}%
➜  kafka git:(KAFKA-4793) ✗ curl -XPOST 
http://192.168.1.220:8083/connectors/local-file-source/restart\?includeTasks\=true
  
   
{"name":"local-file-source","connector":{"state":"RESTARTING","worker_id":"192.168.1.220:8083"},"tasks":[{"id":0,"state":"RESTARTING","worker_id":"192.168.1.220:8083"}],"type":"source"}%
  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10794: KAFKA-12677: return not_controller error in envelope response itself

2021-06-19 Thread GitBox


showuon commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r654866143



##
File path: clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
##
@@ -36,10 +37,16 @@
 private final String message;
 
 public static ApiError fromThrowable(Throwable t) {
+Throwable throwableToBeEncode = t;
+// Future will wrap the original exception with completionException, 
which will return unexpected UNKNOWN_SERVER_ERROR.
+if (t instanceof CompletionException) {
+throwableToBeEncode = t.getCause();

Review comment:
   fix 1: unwrap the `CompletionException` to get the original exception 
inside. Even if we don't want the `NotControllerException` return back to 
client, we need to know it to do some check.

##
File path: clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
##
@@ -36,10 +37,16 @@
 private final String message;
 
 public static ApiError fromThrowable(Throwable t) {
+Throwable throwableToBeEncode = t;
+// Future will wrap the original exception with completionException, 
which will return unexpected UNKNOWN_SERVER_ERROR.
+if (t instanceof CompletionException) {
+throwableToBeEncode = t.getCause();

Review comment:
   fix 1: unwrap the CompletionException to get the original exception 
inside.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12889) log clean group consider empty log segment to avoid empty log left

2021-06-19 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17366024#comment-17366024
 ] 

Guozhang Wang commented on KAFKA-12889:
---

Thanks [~iamgd67] for reporting the issue and for the fix too!

> log clean group consider empty log segment to avoid empty log left
> --
>
> Key: KAFKA-12889
> URL: https://issues.apache.org/jira/browse/KAFKA-12889
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Affects Versions: 0.10.1.1, 2.8.0, 3.1.0
>Reporter: qiang Liu
>Priority: Trivial
> Fix For: 3.0.0
>
>
> to avoid log index 4 byte relative offset overflow, log cleaner group check 
> log segments offset to make sure group offset range not exceed Int.MaxValue.
> this offset check currentlly not cosider next is next log segment is empty, 
> so there will left empty log files every about 2^31 messages.
> the left empty logs will be reprocessed every clean cycle, which will rewrite 
> it with same empty content, witch cause little no need io.
> for __consumer_offsets topic, normally we can set cleanup.policy to 
> compact,delete to get rid of this.
> my cluster is 0.10.1.1, but after aylize trunk code, it should has same 
> problem too.
>  
> some of my left empty logs,(run ls -l)
> -rw-r- 1 u g 0 Dec 16 2017 .index
> -rw-r- 1 u g 0 Dec 16 2017 .log
> -rw-r- 1 u g 0 Dec 16 2017 .timeindex
> -rw-r- 1 u g 0 Jan  15 2018 002148249632.index
> -rw-r- 1 u g 0 Jan  15 2018 002148249632.log
> -rw-r- 1 u g 0 Jan  15 2018 002148249632.timeindex
> -rw-r- 1 u g 0 Jan  27 2018 004295766494.index
> -rw-r- 1 u g 0 Jan  27 2018 004295766494.log
> -rw-r- 1 u g 0 Jan  27 2018 004295766494.timeindex
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang merged pull request #10818: KAFKA-12889: log clean relative index range check of group consider empty log segment to avoid too many empty log segment left

2021-06-19 Thread GitBox


guozhangwang merged pull request #10818:
URL: https://github.com/apache/kafka/pull/10818


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #10818: KAFKA-12889: log clean relative index range check of group consider empty log segment to avoid too many empty log segment left

2021-06-19 Thread GitBox


guozhangwang commented on pull request #10818:
URL: https://github.com/apache/kafka/pull/10818#issuecomment-864471492


   @iamgd67 Thanks for the PR! and also thank @showuon for the reviews. I made 
a quick pass and it LGTM. Merging to trunk now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12971) Kakfa 1.1.x clients cache broker hostnames, client stuck when host is swapped for the same broker.id

2021-06-19 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17366015#comment-17366015
 ] 

Ismael Juma commented on KAFKA-12971:
-

Thanks for the report. 1.1.1 is no longer supported. Any reason why you cannot 
upgrade to 2.x?

> Kakfa 1.1.x clients cache broker hostnames,  client stuck when host is 
> swapped for the same broker.id
> -
>
> Key: KAFKA-12971
> URL: https://issues.apache.org/jira/browse/KAFKA-12971
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.1.0, 1.1.1, 1.1.2
>Reporter: GEORGE LI
>Priority: Major
> Fix For: 2.1.2
>
>
> There was an upgrade of kafka-client version from 0.11 to 1.1.x to fix a bug 
> in 0.11 with too frequent consumer offset commits.  Due to the Flink version, 
> it can be directly using latest 2.x kafka-client version. 
> {code}
> Error sending fetch request (sessionId=178328175, epoch=INITIAL) to node 425: 
> org.apache.kafka.common.errors.DisconnectException.
> {code}
> some consumers were stuck with above messages with broker.id 425 had hardware 
> failures and got swapped with a different hostname. 
> Comparing the {{ClusterConnectionStates.connecting()}} of the 3 versions: 
> 0.11.0.3: 
> {code}
> public void connecting(String id, long now, String host, ClientDnsLookup 
> clientDnsLookup) {
> nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, 
> now, this.reconnectBackoffInitMs, host, clientDnsLookup));
> }
> {code}
> 1.1.x: 
> {code}
>  public void connecting(String id, long now, String host, ClientDnsLookup 
> clientDnsLookup) {
> if (nodeState.containsKey(id)) {
> NodeConnectionState connectionState = nodeState.get(id);
> connectionState.lastConnectAttemptMs = now;
> connectionState.state = ConnectionState.CONNECTING;
> // Move to next resolved address, or if addresses are exhausted, 
> mark node to be re-resolved
> connectionState.moveToNextAddress();
> } else {
> nodeState.put(id, new 
> NodeConnectionState(ConnectionState.CONNECTING, now,
> this.reconnectBackoffInitMs, host, clientDnsLookup));
> }
> }
> {code}
> 2.2.x: 
> {code}
> public void connecting(String id, long now, String host, ClientDnsLookup 
> clientDnsLookup) {
> NodeConnectionState connectionState = nodeState.get(id);
> if (connectionState != null && connectionState.host().equals(host)) {
> connectionState.lastConnectAttemptMs = now;
> connectionState.state = ConnectionState.CONNECTING;
> // Move to next resolved address, or if addresses are exhausted, 
> mark node to be re-resolved
> connectionState.moveToNextAddress();
> return;
> } else if (connectionState != null) {
> log.info("Hostname for node {} changed from {} to {}.", id, 
> connectionState.host(), host);
> }
> // Create a new NodeConnectionState if nodeState does not already 
> contain one
> // for the specified id or if the hostname associated with the node 
> id changed.
> nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, 
> now,
> this.reconnectBackoffInitMs, host, clientDnsLookup));
> }
> {code}
> From above, the {{0.11.0.3}} is just putting the node to the NodeState 
> HashMap to retry with update host.
> In {{1.1.x}}, it adds a logic of "caching". {{if 
> (nodeState.containsKey(id))}}, However, if the HOSTNAME of the broker.id is 
> swapped/changed, it never gets to the else block to update the NodeState with 
> the new hostname.
> In {{2.2.x}}, it adds an additional check {{if (connectionState != null && 
> connectionState.host().equals(host))}}, if the Hostname changed, then called 
> {{nodeState.put()}} to update the host.
> So from above, it looks like the 1.1.x caching logic introduced a bug of not 
> updating the nodeState()'s host when that is changed (e..g host failure, swap 
> with a different hostname, but use the same broker.id).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12971) Kakfa 1.1.x clients cache broker hostnames, client stuck when host is swapped for the same broker.id

2021-06-19 Thread GEORGE LI (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17366011#comment-17366011
 ] 

GEORGE LI commented on KAFKA-12971:
---

This issue is fixed in the {{1.1.x}} kafka client by back porting the fix in 
KAFKA-7890  from 2.x to invalidate the cache when the hostname is changed. 

> Kakfa 1.1.x clients cache broker hostnames,  client stuck when host is 
> swapped for the same broker.id
> -
>
> Key: KAFKA-12971
> URL: https://issues.apache.org/jira/browse/KAFKA-12971
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.1.0, 1.1.1, 1.1.2
>Reporter: GEORGE LI
>Priority: Major
> Fix For: 2.1.2
>
>
> There was an upgrade of kafka-client version from 0.11 to 1.1.x to fix a bug 
> in 0.11 with too frequent consumer offset commits.  Due to the Flink version, 
> it can be directly using latest 2.x kafka-client version. 
> {code}
> Error sending fetch request (sessionId=178328175, epoch=INITIAL) to node 425: 
> org.apache.kafka.common.errors.DisconnectException.
> {code}
> some consumers were stuck with above messages with broker.id 425 had hardware 
> failures and got swapped with a different hostname. 
> Comparing the {{ClusterConnectionStates.connecting()}} of the 3 versions: 
> 0.11.0.3: 
> {code}
> public void connecting(String id, long now, String host, ClientDnsLookup 
> clientDnsLookup) {
> nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, 
> now, this.reconnectBackoffInitMs, host, clientDnsLookup));
> }
> {code}
> 1.1.x: 
> {code}
>  public void connecting(String id, long now, String host, ClientDnsLookup 
> clientDnsLookup) {
> if (nodeState.containsKey(id)) {
> NodeConnectionState connectionState = nodeState.get(id);
> connectionState.lastConnectAttemptMs = now;
> connectionState.state = ConnectionState.CONNECTING;
> // Move to next resolved address, or if addresses are exhausted, 
> mark node to be re-resolved
> connectionState.moveToNextAddress();
> } else {
> nodeState.put(id, new 
> NodeConnectionState(ConnectionState.CONNECTING, now,
> this.reconnectBackoffInitMs, host, clientDnsLookup));
> }
> }
> {code}
> 2.2.x: 
> {code}
> public void connecting(String id, long now, String host, ClientDnsLookup 
> clientDnsLookup) {
> NodeConnectionState connectionState = nodeState.get(id);
> if (connectionState != null && connectionState.host().equals(host)) {
> connectionState.lastConnectAttemptMs = now;
> connectionState.state = ConnectionState.CONNECTING;
> // Move to next resolved address, or if addresses are exhausted, 
> mark node to be re-resolved
> connectionState.moveToNextAddress();
> return;
> } else if (connectionState != null) {
> log.info("Hostname for node {} changed from {} to {}.", id, 
> connectionState.host(), host);
> }
> // Create a new NodeConnectionState if nodeState does not already 
> contain one
> // for the specified id or if the hostname associated with the node 
> id changed.
> nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, 
> now,
> this.reconnectBackoffInitMs, host, clientDnsLookup));
> }
> {code}
> From above, the {{0.11.0.3}} is just putting the node to the NodeState 
> HashMap to retry with update host.
> In {{1.1.x}}, it adds a logic of "caching". {{if 
> (nodeState.containsKey(id))}}, However, if the HOSTNAME of the broker.id is 
> swapped/changed, it never gets to the else block to update the NodeState with 
> the new hostname.
> In {{2.2.x}}, it adds an additional check {{if (connectionState != null && 
> connectionState.host().equals(host))}}, if the Hostname changed, then called 
> {{nodeState.put()}} to update the host.
> So from above, it looks like the 1.1.x caching logic introduced a bug of not 
> updating the nodeState()'s host when that is changed (e..g host failure, swap 
> with a different hostname, but use the same broker.id).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12971) Kakfa 1.1.x clients cache broker hostnames, client stuck when host is swapped for the same broker.id

2021-06-19 Thread GEORGE LI (Jira)
GEORGE LI created KAFKA-12971:
-

 Summary: Kakfa 1.1.x clients cache broker hostnames,  client stuck 
when host is swapped for the same broker.id
 Key: KAFKA-12971
 URL: https://issues.apache.org/jira/browse/KAFKA-12971
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 1.1.1, 1.1.0, 1.1.2
Reporter: GEORGE LI
 Fix For: 2.1.2


There was an upgrade of kafka-client version from 0.11 to 1.1.x to fix a bug in 
0.11 with too frequent consumer offset commits.  Due to the Flink version, it 
can be directly using latest 2.x kafka-client version. 

{code}
Error sending fetch request (sessionId=178328175, epoch=INITIAL) to node 425: 
org.apache.kafka.common.errors.DisconnectException.
{code}

some consumers were stuck with above messages with broker.id 425 had hardware 
failures and got swapped with a different hostname. 

Comparing the {{ClusterConnectionStates.connecting()}} of the 3 versions: 

0.11.0.3: 

{code}
public void connecting(String id, long now, String host, ClientDnsLookup 
clientDnsLookup) {
nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, 
now, this.reconnectBackoffInitMs, host, clientDnsLookup));
}
{code}


1.1.x: 

{code}
 public void connecting(String id, long now, String host, ClientDnsLookup 
clientDnsLookup) {
if (nodeState.containsKey(id)) {
NodeConnectionState connectionState = nodeState.get(id);
connectionState.lastConnectAttemptMs = now;
connectionState.state = ConnectionState.CONNECTING;
// Move to next resolved address, or if addresses are exhausted, 
mark node to be re-resolved
connectionState.moveToNextAddress();
} else {
nodeState.put(id, new 
NodeConnectionState(ConnectionState.CONNECTING, now,
this.reconnectBackoffInitMs, host, clientDnsLookup));
}
}
{code}


2.2.x: 

{code}
public void connecting(String id, long now, String host, ClientDnsLookup 
clientDnsLookup) {
NodeConnectionState connectionState = nodeState.get(id);
if (connectionState != null && connectionState.host().equals(host)) {
connectionState.lastConnectAttemptMs = now;
connectionState.state = ConnectionState.CONNECTING;
// Move to next resolved address, or if addresses are exhausted, 
mark node to be re-resolved
connectionState.moveToNextAddress();
return;
} else if (connectionState != null) {
log.info("Hostname for node {} changed from {} to {}.", id, 
connectionState.host(), host);
}

// Create a new NodeConnectionState if nodeState does not already 
contain one
// for the specified id or if the hostname associated with the node id 
changed.
nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, 
now,
this.reconnectBackoffInitMs, host, clientDnsLookup));
}
{code}


>From above, the {{0.11.0.3}} is just putting the node to the NodeState HashMap 
>to retry with update host.

In {{1.1.x}}, it adds a logic of "caching". {{if (nodeState.containsKey(id))}}, 
However, if the HOSTNAME of the broker.id is swapped/changed, it never gets to 
the else block to update the NodeState with the new hostname.

In {{2.2.x}}, it adds an additional check {{if (connectionState != null && 
connectionState.host().equals(host))}}, if the Hostname changed, then called 
{{nodeState.put()}} to update the host.

So from above, it looks like the 1.1.x caching logic introduced a bug of not 
updating the nodeState()'s host when that is changed (e..g host failure, swap 
with a different hostname, but use the same broker.id).





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on pull request #10869: KAFKA-10546: Deprecate old PAPI

2021-06-19 Thread GitBox


guozhangwang commented on pull request #10869:
URL: https://github.com/apache/kafka/pull/10869#issuecomment-864424835


   @vvcephei I took a quick look at the more recent commits (for `KStream.java` 
the part you wanted me to take another look is around the javadoc copy-paste 
errors, right?), still looks good to me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 merged pull request #10905: MINOR Addressed minor typos in READMEs.

2021-06-19 Thread GitBox


chia7712 merged pull request #10905:
URL: https://github.com/apache/kafka/pull/10905


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 merged pull request #10908: MINOR: fix round_trip_fault_test.py - don't assign replicas to nonexi…

2021-06-19 Thread GitBox


chia7712 merged pull request #10908:
URL: https://github.com/apache/kafka/pull/10908


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-12908) Load snapshot heuristic

2021-06-19 Thread dengziming (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dengziming reassigned KAFKA-12908:
--

Assignee: dengziming

> Load snapshot heuristic
> ---
>
> Key: KAFKA-12908
> URL: https://issues.apache.org/jira/browse/KAFKA-12908
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jose Armando Garcia Sancio
>Assignee: dengziming
>Priority: Minor
>
> The {{KafkaRaftCient}} implementation only forces the {{RaftClient.Listener}} 
> to load a snapshot only when the listener's next offset is less than the 
> start offset.
> This is technically correct but in some cases it may be more efficient to 
> load a snapshot even when the next offset exists in the log. This is clearly 
> true when the latest snapshot has less entries than the number of records 
> from the next offset to the latest snapshot.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 opened a new pull request #10908: MINOR: fix round_trip_fault_test.py - don't assign replicas to nonexi…

2021-06-19 Thread GitBox


chia7712 opened a new pull request #10908:
URL: https://github.com/apache/kafka/pull/10908


   The broker id starts with 1 
(https://github.com/apache/kafka/blob/trunk/tests/kafkatest/services/kafka/kafka.py#L207)
 so `round_trip_fault_test.py` do assign replica to nonexistent broker.
   
   The interesting story is that it fails on KRaft only. KRaft mode check the 
existent ids 
(https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java#L950).
 By contrast, zk mode have no such check and the `min.insync.replicas` is set 
with `1` so it can work on zk mode even though there is one replica is off-line.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-12243) Add toString methods to some of the classes introduced by this Epic

2021-06-19 Thread loboxu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

loboxu reassigned KAFKA-12243:
--

Assignee: loboxu

> Add toString methods to some of the classes introduced by this Epic
> ---
>
> Key: KAFKA-12243
> URL: https://issues.apache.org/jira/browse/KAFKA-12243
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jose Armando Garcia Sancio
>Assignee: loboxu
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-12243) Add toString methods to some of the classes introduced by this Epic

2021-06-19 Thread loboxu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

loboxu reassigned KAFKA-12243:
--

Assignee: (was: loboxu)

> Add toString methods to some of the classes introduced by this Epic
> ---
>
> Key: KAFKA-12243
> URL: https://issues.apache.org/jira/browse/KAFKA-12243
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jose Armando Garcia Sancio
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-12333) KafkaMetadataLog and MockLock should validate that appended epochs are monotonically

2021-06-19 Thread loboxu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12333?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

loboxu reassigned KAFKA-12333:
--

Assignee: (was: loboxu)

> KafkaMetadataLog and MockLock should validate that appended epochs are 
> monotonically
> 
>
> Key: KAFKA-12333
> URL: https://issues.apache.org/jira/browse/KAFKA-12333
> Project: Kafka
>  Issue Type: Sub-task
>  Components: replication
>Reporter: Jose Armando Garcia Sancio
>Priority: Major
>
> Both the MockLog and KafkaMetadataLog should only allow appendAsLeader and 
> appendAsFollower with monotonically increasing epochs. In other words the 
> following test in KafkaMetadataLogTest should fail:
> {code:java}
>   @Test
>   def testOutOfOrderEpoch(): Unit = {
> val topicPartition = new TopicPartition("cluster-metadata", 0)
> val log = buildMetadataLog(tempDir, mockTime, topicPartition)val 
> recordFoo = new SimpleRecord("foo".getBytes())
> val currentEpoch = 3
> val initialOffset = log.endOffset().offsetlog.appendAsLeader(
>   MemoryRecords.withRecords(initialOffset, CompressionType.NONE, 
> currentEpoch, recordFoo),
>   currentEpoch
> )// Out order epoch should throw an exception
> log.appendAsLeader(
>   MemoryRecords.withRecords(
> initialOffset + 1, CompressionType.NONE, currentEpoch - 1, recordFoo
>   ),
>   currentEpoch - 1
> )log.appendAsFollower(
>   MemoryRecords.withRecords(
> initialOffset + 2, CompressionType.NONE, currentEpoch - 2, recordFoo
>   )
> )
>   } {code}
> The same for MockLogTest.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-12851) Flaky Test RaftEventSimulationTest.canMakeProgressIfMajorityIsReachable

2021-06-19 Thread loboxu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

loboxu reassigned KAFKA-12851:
--

Assignee: (was: loboxu)

> Flaky Test RaftEventSimulationTest.canMakeProgressIfMajorityIsReachable
> ---
>
> Key: KAFKA-12851
> URL: https://issues.apache.org/jira/browse/KAFKA-12851
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: A. Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 3.0.0
>
>
> Failed twice on a [PR 
> build|https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10755/6/testReport/]
> h3. Stacktrace
> org.opentest4j.AssertionFailedError: expected:  but was:  at 
> org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at 
> org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) at 
> org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:35) at 
> org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:162) at 
> org.apache.kafka.raft.RaftEventSimulationTest.canMakeProgressIfMajorityIsReachable(RaftEventSimulationTest.java:263)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (KAFKA-10900) Add metrics enumerated in KIP-630

2021-06-19 Thread loboxu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

loboxu updated KAFKA-10900:
---
Comment: was deleted

(was: [~jagsancio]  Can you give me some guidance? Where the metrics are added 
in the code. I have looked at the code and the logic is not very clear.

Here is my understanding of the position:
 * GenSnapshotLatencyMs:ReplicatedCounter.handleCommit()

 - LoadSnapshotLatencyMs:ReplicatedCounter.handleSnapshot(()
 - SnapshotSizeBytes: ReplicatedCounter.handleCommit()
 - SnapshotLag: ReplicatedCounter.handleCommit())

> Add metrics enumerated in KIP-630
> -
>
> Key: KAFKA-10900
> URL: https://issues.apache.org/jira/browse/KAFKA-10900
> Project: Kafka
>  Issue Type: Sub-task
>  Components: replication
>Reporter: Jose Armando Garcia Sancio
>Assignee: loboxu
>Priority: Major
>
> KIP-630 enumerates a few metrics. Makes sure that those metrics are 
> implemented.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12689) Remove deprecated EOS configs

2021-06-19 Thread loboxu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17365914#comment-17365914
 ] 

loboxu commented on KAFKA-12689:


Thank you very much for reminding me. I know what I should do.

> Remove deprecated EOS configs
> -
>
> Key: KAFKA-12689
> URL: https://issues.apache.org/jira/browse/KAFKA-12689
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: loboxu
>Priority: Blocker
> Fix For: 4.0.0
>
>
> In 
> [KIP-732|https://cwiki.apache.org/confluence/display/KAFKA/KIP-732%3A+Deprecate+eos-alpha+and+replace+eos-beta+with+eos-v2]
>  we deprecated the EXACTLY_ONCE and EXACTLY_ONCE_BETA configs in 
> StreamsConfig, to be removed in 4.0



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vamossagar12 commented on pull request #10877: KAFKA-12925: adding presfixScan operation for missed implementations

2021-06-19 Thread GitBox


vamossagar12 commented on pull request #10877:
URL: https://github.com/apache/kafka/pull/10877#issuecomment-864382529


   @mjsax , have added the tests for the combinations that you have mentioned.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #10907: KAFKA-10000: Exactly-once support for source connectors

2021-06-19 Thread GitBox


C0urante commented on pull request #10907:
URL: https://github.com/apache/kafka/pull/10907#issuecomment-864380630


   @gharris1727 you've been tremendously helpful so far in the design process; 
if you have time would you like to take a look at the implementation as well?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante opened a new pull request #10907: KAFKA-10000: Exactly-once support for source connectors

2021-06-19 Thread GitBox


C0urante opened a new pull request #10907:
URL: https://github.com/apache/kafka/pull/10907


   Implements 
[KIP-618](https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors#KIP618:ExactlyOnceSupportforSourceConnectors-Newmetrics).
   
   There are several changes here that can be reviewed fairly independently of 
each other:
   - Support for transactional source tasks, which is largely implemented in 
the `ExactlyOnceWorkerSourceTask` class, its newly-introduced 
`AbstractWorkerSourceTask` superclass, the `Worker` class (whose API for task 
starts has been split up from the existing `startTask` method into separate 
`startSinkTask`, `startSourceTask`, and `startExactlyOnceSourceTask` methods), 
and the `WorkerTransactionContext` class (which is used to allow connectors to 
define their own transaction boundaries)
   - Zombie fencing logic and the use of a transactional producer for some 
writes to the config topic, which are done by the leader of the cluster and are 
largely implemented in the `DistributedHerder`, `ConfigBackingStore`, 
`KafkaConfigBackingStore`, and `ClusterConfigState` classes
   - A new method in the `Admin` API for fencing out transactional producers by 
ID, which is done with changes to the `Admin` interface (unsurprisingly) and 
the `KafkaAdminClient` class
   - Support for per-connector offsets topics, which touches on the `Worker`, 
`OffsetStorageReaderImpl`, and `OffsetStorageWriter` classes
   - A few new `SourceConnector` methods for communicating support for 
exactly-once guarantees and connector-defined transactions; these take place in 
the `SourceConnector` class (also unsurprisingly) and the `AbstractHerder` class
   
   Existing unit tests are expanded where applicable, and new ones have been 
introduced where necessary.
   
   Eight new integration tests are added, which cover scenarios including 
preflight validation checks, all three types of transaction boundary, graceful 
recovery of the leader when fenced out from the config topic by another worker, 
ensuring that the correct number of task producers are fenced out across 
generations, accurate reporting of failure to bring up tasks when fencing does 
not succeed (includes an ACL-secured embedded Kafka cluster to simulate one of 
the most likely potential causes of this issue--insufficient permissions on the 
targeted Kafka cluster), and the use of a custom offsets topic.
   
   Many but not all existing system tests are modified to add cases involving 
exactly-once source support, which helps give us reasonable confidence that the 
feature is agnostic with regards to rebalance protocol. A new test is added 
that is based on the existing bounce test, but with no sink connector and with 
stricter expectations for delivery guarantees (no duplicates are permitted).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

2021-06-19 Thread GitBox


showuon commented on pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#issuecomment-864374754


   @hachikuji , I checked and there's no race condition there. The reason why 
we didn't put `NOT_CONTROLLER` in EnvelopResponse itself is because we will 
build envelopeResponse **with no error** as long as we got response from 
handler (createTopics handler in this case) 
[here](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/RequestChannel.scala#L124).
 So, in `ControllerApi#handleCreateTopics`, we'll sendResponse when future 
completed, and then `RequestHandlerHelper#sendResponseMaybeThrottle`, we'll 
`buildResponseSend` and then go to the above link location. 
   
   So, to fix this issue, I need to build EnvelopeResponse with 
`NotControllerError` when the response from handler is having 
`NotControllerError` (fix 2). But if we don't have (fix 1), we can only get 
`Unknown_Server_Error` in the response from handler because the 
`NotControllerError` is wrapped with `CompletionException`.
   
   That's my solution. Please help review. Thank you very much!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

2021-06-19 Thread GitBox


showuon commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r654763984



##
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##
@@ -124,8 +124,15 @@ object RequestChannel extends Logging {
 def buildResponseSend(abstractResponse: AbstractResponse): Send = {
   envelope match {
 case Some(request) =>
-  val responseBytes = 
context.buildResponseEnvelopePayload(abstractResponse)
-  val envelopeResponse = new EnvelopeResponse(responseBytes, 
Errors.NONE)
+  val envelopeResponse = if 
(abstractResponse.errorCounts().containsKey(Errors.NOT_CONTROLLER)) {
+// Since it's Not Controller error response, we need to make 
envelope response with Not Controller error
+// to notify the requester (i.e. BrokerToControllerRequestThread) 
to update active controller
+new EnvelopeResponse(new EnvelopeResponseData()
+  .setErrorCode(Errors.NOT_CONTROLLER.code()))

Review comment:
   fix 2: Make the envelope response return `NotControllerException` if the 
controller response has `NotControllerException`. So that we can catch the 
`NotControllerException` on envelopeResponse to update the active controller.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org