[kafka] branch trunk updated (4efd9bf -> 6fbea57)
This is an automated email from the ASF dual-hosted git repository. boyang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 4efd9bf KAFKA-13114; Revert state and reregister raft listener (#6) add 6fbea57 update batch.size doc (#11160) No new revisions were added by this update. Summary of changes: .../java/org/apache/kafka/clients/producer/ProducerConfig.java | 9 +++-- 1 file changed, 7 insertions(+), 2 deletions(-)
[kafka] branch 3.0 updated: KAFKA-13114; Revert state and reregister raft listener (#11116)
This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 3.0 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.0 by this push: new a2cb4b5 KAFKA-13114; Revert state and reregister raft listener (#6) a2cb4b5 is described below commit a2cb4b5b3a6fca4bbe7f5fb9679d1c80f2811ceb Author: José Armando García Sancio AuthorDate: Sun Aug 1 15:26:04 2021 -0700 KAFKA-13114; Revert state and reregister raft listener (#6) RaftClient's scheduleAppend may split the list of records into multiple batches. This means that it is possible for the active controller to see a committed offset for which it doesn't have an in-memory snapshot. If the active controller needs to renounce and it is missing an in-memory snapshot, then revert the state and reregister the Raft listener. This will cause the controller to replay the entire metadata partition. Reviewers: Jason Gustafson --- core/src/test/java/kafka/test/MockController.java | 2 +- .../org/apache/kafka/controller/Controller.java| 2 +- .../apache/kafka/controller/QuorumController.java | 46 +++-- .../kafka/controller/QuorumControllerTest.java | 186 +++-- .../kafka/controller/QuorumControllerTestEnv.java | 23 +-- .../org/apache/kafka/metalog/LocalLogManager.java | 66 +++- .../kafka/metalog/LocalLogManagerTestEnv.java | 4 + 7 files changed, 286 insertions(+), 43 deletions(-) diff --git a/core/src/test/java/kafka/test/MockController.java b/core/src/test/java/kafka/test/MockController.java index 68e73fd..f56e2cb 100644 --- a/core/src/test/java/kafka/test/MockController.java +++ b/core/src/test/java/kafka/test/MockController.java @@ -346,7 +346,7 @@ public class MockController implements Controller { } @Override -public long curClaimEpoch() { +public int curClaimEpoch() { return active ? 1 : -1; } diff --git a/metadata/src/main/java/org/apache/kafka/controller/Controller.java b/metadata/src/main/java/org/apache/kafka/controller/Controller.java index 3cb0d26..f06b108 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/Controller.java +++ b/metadata/src/main/java/org/apache/kafka/controller/Controller.java @@ -263,7 +263,7 @@ public interface Controller extends AutoCloseable { * If this controller is active, this is the non-negative controller epoch. * Otherwise, this is -1. */ -long curClaimEpoch(); +int curClaimEpoch(); /** * Returns true if this controller is currently active. diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index d62f2f8..dba8817 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -580,12 +580,17 @@ public final class QuorumController implements Controller { // written before we can return our result to the user. Here, we hand off // the batch of records to the raft client. They will be written out // asynchronously. -final long offset; +final Long offset; if (result.isAtomic()) { offset = raftClient.scheduleAtomicAppend(controllerEpoch, result.records()); } else { offset = raftClient.scheduleAppend(controllerEpoch, result.records()); } +if (offset == null) { +throw new IllegalStateException("The raft client was unable to allocate a buffer for an append"); +} else if (offset == Long.MAX_VALUE) { +throw new IllegalStateException("Unable to append records since this is not the leader"); +} op.processBatchEndOffset(offset); writeOffset = offset; resultAndOffset = ControllerResultAndOffset.of(offset, result); @@ -640,7 +645,7 @@ public final class QuorumController implements Controller { @Override public void handleCommit(BatchReader reader) { -appendControlEvent("handleCommits[baseOffset=" + reader.baseOffset() + "]", () -> { +appendRaftEvent("handleCommit[baseOffset=" + reader.baseOffset() + "]", () -> { try { boolean isActiveController = curClaimEpoch != -1; long processedRecordsSize = 0; @@ -698,7 +703,7 @@ public final class QuorumController implements Controller { @Override public void handleSnapshot(SnapshotReader reader) { -appendControlEvent(String.format("handleSnapshot[snapshotId=%s]", reader.snapshotId()), () -> { +
[kafka] branch trunk updated (8a1fcee -> 4efd9bf)
This is an automated email from the ASF dual-hosted git repository. jgus pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 8a1fcee MINOR: Use time constant algorithms when comparing passwords or keys (#10978) add 4efd9bf KAFKA-13114; Revert state and reregister raft listener (#6) No new revisions were added by this update. Summary of changes: core/src/test/java/kafka/test/MockController.java | 2 +- .../org/apache/kafka/controller/Controller.java| 2 +- .../apache/kafka/controller/QuorumController.java | 46 +++-- .../kafka/controller/QuorumControllerTest.java | 186 +++-- .../kafka/controller/QuorumControllerTestEnv.java | 23 +-- .../org/apache/kafka/metalog/LocalLogManager.java | 66 +++- .../kafka/metalog/LocalLogManagerTestEnv.java | 4 + 7 files changed, 286 insertions(+), 43 deletions(-)
[kafka] branch 2.2 updated: MINOR: Use time constant algorithms when comparing passwords or keys (#10978)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.2 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.2 by this push: new ffa2812 MINOR: Use time constant algorithms when comparing passwords or keys (#10978) ffa2812 is described below commit ffa2812bf0c83c924fb8960131f40c39e441f3b2 Author: Randall Hauch AuthorDate: Fri Jul 30 17:48:03 2021 -0500 MINOR: Use time constant algorithms when comparing passwords or keys (#10978) Author: Randall Hauch Reviewers: Manikumar Reddy , Rajini Sivaram , Mickael Maison , Ismael Juma --- .../internals/PlainServerCallbackHandler.java | 4 +- .../security/scram/internals/ScramSaslClient.java | 3 +- .../security/scram/internals/ScramSaslServer.java | 3 +- .../security/token/delegation/DelegationToken.java | 3 +- .../java/org/apache/kafka/common/utils/Utils.java | 36 ++ .../org/apache/kafka/common/utils/UtilsTest.java | 44 ++ 6 files changed, 88 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainServerCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainServerCallbackHandler.java index 842f986..10f5817 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainServerCallbackHandler.java +++ b/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainServerCallbackHandler.java @@ -22,9 +22,9 @@ import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.security.plain.PlainAuthenticateCallback; import org.apache.kafka.common.security.plain.PlainLoginModule; +import org.apache.kafka.common.utils.Utils; import java.io.IOException; -import java.util.Arrays; import java.util.List; import java.util.Map; @@ -65,7 +65,7 @@ public class PlainServerCallbackHandler implements AuthenticateCallbackHandler { String expectedPassword = JaasContext.configEntryOption(jaasConfigEntries, JAAS_USER_PREFIX + username, PlainLoginModule.class.getName()); -return expectedPassword != null && Arrays.equals(password, expectedPassword.toCharArray()); +return expectedPassword != null && Utils.isEqualConstantTime(password, expectedPassword.toCharArray()); } } diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java index c21a52e..2e6191b 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java +++ b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java @@ -18,6 +18,7 @@ package org.apache.kafka.common.security.scram.internals; import java.nio.charset.StandardCharsets; import java.security.InvalidKeyException; +import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.Arrays; import java.util.Collection; @@ -204,7 +205,7 @@ public class ScramSaslClient implements SaslClient { try { byte[] serverKey = formatter.serverKey(saltedPassword); byte[] serverSignature = formatter.serverSignature(serverKey, clientFirstMessage, serverFirstMessage, clientFinalMessage); -if (!Arrays.equals(signature, serverSignature)) +if (!MessageDigest.isEqual(signature, serverSignature)) throw new SaslException("Invalid server signature in server final message"); } catch (InvalidKeyException e) { throw new SaslException("Sasl server signature verification failed", e); diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java index f6286a6..3cc8ff0 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java +++ b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.security.scram.internals; import java.security.InvalidKeyException; +import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.Arrays; import java.util.Collection; @@ -226,7 +227,7 @@ public class ScramSaslServer implements SaslServer { byte[] expectedStoredKey = scramCredential.storedKey(); byte[] clientSignature = formatter.clientSignature(expectedStoredKey, clientFirstMessage, serverFirstMessage, clientFinalMessage); byte[] computedStoredKey =