[GitHub] [kafka] iprithv commented on pull request #9204: KAFKA-6181 Examining log messages with {{--deep-iteration}} should show superset of fields
iprithv commented on pull request #9204: URL: https://github.com/apache/kafka/pull/9204#issuecomment-723764315 Hi @chia7712 @hachikuji @cmccabe please review this. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming closed pull request #9575: MINOR: optimize lock operation
dengziming closed pull request #9575: URL: https://github.com/apache/kafka/pull/9575 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a change in pull request #9575: MINOR: optimize lock operation
dengziming commented on a change in pull request #9575: URL: https://github.com/apache/kafka/pull/9575#discussion_r519552611 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java ## @@ -112,12 +112,11 @@ public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedEx + " on memory allocations."); ByteBuffer buffer = null; -this.lock.lock(); Review comment: good catch! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #9531: KAFKA-10661; Add new resigned state for graceful shutdown/initialization
guozhangwang commented on a change in pull request #9531: URL: https://github.com/apache/kafka/pull/9531#discussion_r519551394 ## File path: raft/src/main/java/org/apache/kafka/raft/QuorumState.java ## @@ -21,18 +21,21 @@ import org.slf4j.Logger; import java.io.IOException; +import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Optional; import java.util.OptionalInt; import java.util.Random; import java.util.Set; import java.util.stream.Collectors; /** - * This class is responsible for managing the current state of this node and ensuring only - * valid state transitions. + * This class is responsible for managing the current state of this node and ensuring + * only valid state transitions. Below we define the possible state transitions and + * how they are triggered: * - * Unattached => + * Unattached|Resigned => Review comment: Makes me thinking: if we have an even number sized quorum (say 2N), and the leader is resigning. Then before the leader shutdown we need N+1 votes, while after the leader shutdown, the quorum size shrink to 2N-1 and we would only need N votes. So if the resigning leader gives it a vote to a candidate and then shutdown, while the candidates thinks they only need N votes, would that potentially result in two candidates claiming victory --- somehow this sounds quite close to the real world :P --- each with N votes while one of them has the vote from the resigned leader? ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -1601,7 +1618,12 @@ private long pollFollower(long currentTimeMs) throws IOException { } private long pollFollowerAsVoter(FollowerState state, long currentTimeMs) throws IOException { -if (state.hasFetchTimeoutExpired(currentTimeMs)) { +GracefulShutdown shutdown = this.shutdown.get(); +if (shutdown != null) { +// If we are a follower, then we can shutdown immediately. We want to +// skip the transition to candidate in any case. +return 0; Review comment: Why the behavior of `pollFollowerAsVoter` and `pollVoted` are different when shutting down? Could the former case still help in casting and completing a vote as well? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #9531: KAFKA-10661; Add new resigned state for graceful shutdown/initialization
guozhangwang commented on pull request #9531: URL: https://github.com/apache/kafka/pull/9531#issuecomment-723747753 > One thing I am strongly considering, however, is changing this state machine so that the resigned state is only for leaders. That would definitely simplify the logic. The optimization mentioned above in response to @dengziming 's question seems unlikely to have much benefit in practice. We could always reconsider it in the future of course. Sounds good! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on pull request #9558: KAFKA-10342: migrate remaining RPCs to forwarding
abbccdda commented on pull request #9558: URL: https://github.com/apache/kafka/pull/9558#issuecomment-723717007 @guozhangwang for a review This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10699) Add system test coverage for group coordinator emigration
Boyang Chen created KAFKA-10699: --- Summary: Add system test coverage for group coordinator emigration Key: KAFKA-10699 URL: https://issues.apache.org/jira/browse/KAFKA-10699 Project: Kafka Issue Type: Improvement Reporter: Boyang Chen After merging the fix https://issues.apache.org/jira/browse/KAFKA-10284, we believe that it is important to add system test coverage for the group coordinator migration to verify consumer behaviors are correct. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma commented on pull request #9401: KAFKA-9628 Replace Produce request/response with automated protocol
ijuma commented on pull request #9401: URL: https://github.com/apache/kafka/pull/9401#issuecomment-723675299 Can we summarize the regression here for a real world workload? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9575: MINOR: optimize lock operation
chia7712 commented on a change in pull request #9575: URL: https://github.com/apache/kafka/pull/9575#discussion_r519441272 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java ## @@ -112,12 +112,11 @@ public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedEx + " on memory allocations."); ByteBuffer buffer = null; -this.lock.lock(); Review comment: The read/write of ```closed``` must be in lock so this improvement is not acceptable to ```BufferPool```. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9563: KAFKA-10684; Avoid additional envelope copies during network transmission
chia7712 commented on a change in pull request #9563: URL: https://github.com/apache/kafka/pull/9563#discussion_r519343434 ## File path: generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java ## @@ -1579,58 +1566,58 @@ private void generateVariableLengthFieldSize(FieldSpec field, } if (tagged) { headerGenerator.addImport(MessageGenerator.BYTE_UTILS_CLASS); +buffer.printf("int _arraySize = _size.totalSize() - _sizeBeforeArray;%n"); buffer.printf("_cache.setArraySizeInBytes(%s, _arraySize);%n", field.camelCaseName()); -buffer.printf("_size += _arraySize + ByteUtils.sizeOfUnsignedVarint(_arraySize);%n"); -} else { -buffer.printf("_size += _arraySize;%n"); + buffer.printf("_size.addBytes(ByteUtils.sizeOfUnsignedVarint(_arraySize));%n"); } } else if (field.type().isBytes()) { +buffer.printf("int _sizeBeforeBytes = _size.totalSize();%n"); Review comment: ```java if (tagged) { buffer.printf("int _sizeBeforeBytes = _size.totalSize();%n"); } ``` ## File path: clients/src/main/java/org/apache/kafka/common/protocol/Message.java ## @@ -47,7 +47,20 @@ * If the specified version is too new to be supported * by this software. */ -int size(ObjectSerializationCache cache, short version); +default int size(ObjectSerializationCache cache, short version) { +MessageSizeAccumulator size = new MessageSizeAccumulator(); +addSize(size, cache, version); +return size.totalSize(); +} + +/** + * Add the size of this message to an accumulator. + * + * @param size The size accumulator to add to + * @param cache The serialization size cache to populate. + * @param version The version to use. + */ +void addSize(MessageSizeAccumulator size, ObjectSerializationCache cache, short version); Review comment: I'm thinking about how to simplify this process. Could we reuse the method ```void write(Writable writable, ObjectSerializationCache cache, short version)``` ? Maybe we can create a ```Writable``` instance but it does not write data to any output. Instead, it calculate the size of buffer according to input data. ## File path: clients/src/main/java/org/apache/kafka/common/requests/EnvelopeRequest.java ## @@ -91,4 +91,14 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { public static EnvelopeRequest parse(ByteBuffer buffer, short version) { return new EnvelopeRequest(ApiKeys.ENVELOPE.parseRequest(version, buffer), version); } + +public EnvelopeRequestData data() { +return data; +} + +@Override +public Send toSend(String destination, RequestHeader header) { Review comment: If all requests are using auto-generated data, should this be default implementation of ```AbstractRequest```? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org