[kafka] branch trunk updated (4efd9bf -> 6fbea57)

2021-08-01 Thread boyang
This is an automated email from the ASF dual-hosted git repository.

boyang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from 4efd9bf  KAFKA-13114; Revert state and reregister raft listener 
(#6)
 add 6fbea57  update batch.size doc (#11160)

No new revisions were added by this update.

Summary of changes:
 .../java/org/apache/kafka/clients/producer/ProducerConfig.java   | 9 +++--
 1 file changed, 7 insertions(+), 2 deletions(-)


[kafka] branch 3.0 updated: KAFKA-13114; Revert state and reregister raft listener (#11116)

2021-08-01 Thread jgus
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
 new a2cb4b5  KAFKA-13114; Revert state and reregister raft listener 
(#6)
a2cb4b5 is described below

commit a2cb4b5b3a6fca4bbe7f5fb9679d1c80f2811ceb
Author: José Armando García Sancio 
AuthorDate: Sun Aug 1 15:26:04 2021 -0700

KAFKA-13114; Revert state and reregister raft listener (#6)

RaftClient's scheduleAppend may split the list of records into multiple
batches. This means that it is possible for the active controller to
see a committed offset for which it doesn't have an in-memory snapshot.

If the active controller needs to renounce and it is missing an
in-memory snapshot, then revert the state and reregister the Raft
listener. This will cause the controller to replay the entire metadata
partition.

Reviewers: Jason Gustafson 
---
 core/src/test/java/kafka/test/MockController.java  |   2 +-
 .../org/apache/kafka/controller/Controller.java|   2 +-
 .../apache/kafka/controller/QuorumController.java  |  46 +++--
 .../kafka/controller/QuorumControllerTest.java | 186 +++--
 .../kafka/controller/QuorumControllerTestEnv.java  |  23 +--
 .../org/apache/kafka/metalog/LocalLogManager.java  |  66 +++-
 .../kafka/metalog/LocalLogManagerTestEnv.java  |   4 +
 7 files changed, 286 insertions(+), 43 deletions(-)

diff --git a/core/src/test/java/kafka/test/MockController.java 
b/core/src/test/java/kafka/test/MockController.java
index 68e73fd..f56e2cb 100644
--- a/core/src/test/java/kafka/test/MockController.java
+++ b/core/src/test/java/kafka/test/MockController.java
@@ -346,7 +346,7 @@ public class MockController implements Controller {
 }
 
 @Override
-public long curClaimEpoch() {
+public int curClaimEpoch() {
 return active ? 1 : -1;
 }
 
diff --git a/metadata/src/main/java/org/apache/kafka/controller/Controller.java 
b/metadata/src/main/java/org/apache/kafka/controller/Controller.java
index 3cb0d26..f06b108 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/Controller.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/Controller.java
@@ -263,7 +263,7 @@ public interface Controller extends AutoCloseable {
  * If this controller is active, this is the non-negative controller epoch.
  * Otherwise, this is -1.
  */
-long curClaimEpoch();
+int curClaimEpoch();
 
 /**
  * Returns true if this controller is currently active.
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index d62f2f8..dba8817 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -580,12 +580,17 @@ public final class QuorumController implements Controller 
{
 // written before we can return our result to the user.  Here, 
we hand off
 // the batch of records to the raft client.  They will be 
written out
 // asynchronously.
-final long offset;
+final Long offset;
 if (result.isAtomic()) {
 offset = raftClient.scheduleAtomicAppend(controllerEpoch, 
result.records());
 } else {
 offset = raftClient.scheduleAppend(controllerEpoch, 
result.records());
 }
+if (offset == null) {
+throw new IllegalStateException("The raft client was 
unable to allocate a buffer for an append");
+} else if (offset == Long.MAX_VALUE) {
+throw new IllegalStateException("Unable to append records 
since this is not the leader");
+}
 op.processBatchEndOffset(offset);
 writeOffset = offset;
 resultAndOffset = ControllerResultAndOffset.of(offset, result);
@@ -640,7 +645,7 @@ public final class QuorumController implements Controller {
 
 @Override
 public void handleCommit(BatchReader reader) {
-appendControlEvent("handleCommits[baseOffset=" + 
reader.baseOffset() + "]", () -> {
+appendRaftEvent("handleCommit[baseOffset=" + reader.baseOffset() + 
"]", () -> {
 try {
 boolean isActiveController = curClaimEpoch != -1;
 long processedRecordsSize = 0;
@@ -698,7 +703,7 @@ public final class QuorumController implements Controller {
 
 @Override
 public void handleSnapshot(SnapshotReader 
reader) {
-appendControlEvent(String.format("handleSnapshot[snapshotId=%s]", 
reader.snapshotId()), () -> {
+

[kafka] branch trunk updated (8a1fcee -> 4efd9bf)

2021-08-01 Thread jgus
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from 8a1fcee  MINOR: Use time constant algorithms when comparing passwords 
or keys (#10978)
 add 4efd9bf  KAFKA-13114; Revert state and reregister raft listener 
(#6)

No new revisions were added by this update.

Summary of changes:
 core/src/test/java/kafka/test/MockController.java  |   2 +-
 .../org/apache/kafka/controller/Controller.java|   2 +-
 .../apache/kafka/controller/QuorumController.java  |  46 +++--
 .../kafka/controller/QuorumControllerTest.java | 186 +++--
 .../kafka/controller/QuorumControllerTestEnv.java  |  23 +--
 .../org/apache/kafka/metalog/LocalLogManager.java  |  66 +++-
 .../kafka/metalog/LocalLogManagerTestEnv.java  |   4 +
 7 files changed, 286 insertions(+), 43 deletions(-)


[kafka] branch 2.2 updated: MINOR: Use time constant algorithms when comparing passwords or keys (#10978)

2021-08-01 Thread rhauch
This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
 new ffa2812  MINOR: Use time constant algorithms when comparing passwords 
or keys (#10978)
ffa2812 is described below

commit ffa2812bf0c83c924fb8960131f40c39e441f3b2
Author: Randall Hauch 
AuthorDate: Fri Jul 30 17:48:03 2021 -0500

MINOR: Use time constant algorithms when comparing passwords or keys 
(#10978)

Author: Randall Hauch 
Reviewers: Manikumar Reddy , Rajini Sivaram 
, Mickael Maison , Ismael 
Juma 
---
 .../internals/PlainServerCallbackHandler.java  |  4 +-
 .../security/scram/internals/ScramSaslClient.java  |  3 +-
 .../security/scram/internals/ScramSaslServer.java  |  3 +-
 .../security/token/delegation/DelegationToken.java |  3 +-
 .../java/org/apache/kafka/common/utils/Utils.java  | 36 ++
 .../org/apache/kafka/common/utils/UtilsTest.java   | 44 ++
 6 files changed, 88 insertions(+), 5 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainServerCallbackHandler.java
 
b/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainServerCallbackHandler.java
index 842f986..10f5817 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainServerCallbackHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainServerCallbackHandler.java
@@ -22,9 +22,9 @@ import 
org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.security.plain.PlainAuthenticateCallback;
 import org.apache.kafka.common.security.plain.PlainLoginModule;
+import org.apache.kafka.common.utils.Utils;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
@@ -65,7 +65,7 @@ public class PlainServerCallbackHandler implements 
AuthenticateCallbackHandler {
 String expectedPassword = 
JaasContext.configEntryOption(jaasConfigEntries,
 JAAS_USER_PREFIX + username,
 PlainLoginModule.class.getName());
-return expectedPassword != null && Arrays.equals(password, 
expectedPassword.toCharArray());
+return expectedPassword != null && 
Utils.isEqualConstantTime(password, expectedPassword.toCharArray());
 }
 }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java
 
b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java
index c21a52e..2e6191b 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.security.scram.internals;
 
 import java.nio.charset.StandardCharsets;
 import java.security.InvalidKeyException;
+import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.Arrays;
 import java.util.Collection;
@@ -204,7 +205,7 @@ public class ScramSaslClient implements SaslClient {
 try {
 byte[] serverKey = formatter.serverKey(saltedPassword);
 byte[] serverSignature = formatter.serverSignature(serverKey, 
clientFirstMessage, serverFirstMessage, clientFinalMessage);
-if (!Arrays.equals(signature, serverSignature))
+if (!MessageDigest.isEqual(signature, serverSignature))
 throw new SaslException("Invalid server signature in server 
final message");
 } catch (InvalidKeyException e) {
 throw new SaslException("Sasl server signature verification 
failed", e);
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java
 
b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java
index f6286a6..3cc8ff0 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.security.scram.internals;
 
 import java.security.InvalidKeyException;
+import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.Arrays;
 import java.util.Collection;
@@ -226,7 +227,7 @@ public class ScramSaslServer implements SaslServer {
 byte[] expectedStoredKey = scramCredential.storedKey();
 byte[] clientSignature = 
formatter.clientSignature(expectedStoredKey, clientFirstMessage, 
serverFirstMessage, clientFinalMessage);
 byte[] computedStoredKey =