[GitHub] [kafka] iprithv commented on pull request #9204: KAFKA-6181 Examining log messages with {{--deep-iteration}} should show superset of fields

2020-11-08 Thread GitBox


iprithv commented on pull request #9204:
URL: https://github.com/apache/kafka/pull/9204#issuecomment-723764315


   Hi @chia7712  @hachikuji @cmccabe please review this. Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming closed pull request #9575: MINOR: optimize lock operation

2020-11-08 Thread GitBox


dengziming closed pull request #9575:
URL: https://github.com/apache/kafka/pull/9575


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on a change in pull request #9575: MINOR: optimize lock operation

2020-11-08 Thread GitBox


dengziming commented on a change in pull request #9575:
URL: https://github.com/apache/kafka/pull/9575#discussion_r519552611



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
##
@@ -112,12 +112,11 @@ public ByteBuffer allocate(int size, long 
maxTimeToBlockMs) throws InterruptedEx
+ " on memory allocations.");
 
 ByteBuffer buffer = null;
-this.lock.lock();

Review comment:
   good catch!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #9531: KAFKA-10661; Add new resigned state for graceful shutdown/initialization

2020-11-08 Thread GitBox


guozhangwang commented on a change in pull request #9531:
URL: https://github.com/apache/kafka/pull/9531#discussion_r519551394



##
File path: raft/src/main/java/org/apache/kafka/raft/QuorumState.java
##
@@ -21,18 +21,21 @@
 import org.slf4j.Logger;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.Random;
 import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
- * This class is responsible for managing the current state of this node and 
ensuring only
- * valid state transitions.
+ * This class is responsible for managing the current state of this node and 
ensuring
+ * only valid state transitions. Below we define the possible state 
transitions and
+ * how they are triggered:
  *
- * Unattached =>
+ * Unattached|Resigned =>

Review comment:
   Makes me thinking: if we have an even number sized quorum (say 2N), and 
the leader is resigning. Then before the leader shutdown we need N+1 votes, 
while after the leader shutdown, the quorum size shrink to 2N-1 and we would 
only need N votes. So if the resigning leader gives it a vote to a candidate 
and then shutdown, while the candidates thinks they only need N votes, would 
that potentially result in two candidates claiming victory --- somehow this 
sounds quite close to the real world :P --- each with N votes while one of them 
has the vote from the resigned leader?

##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -1601,7 +1618,12 @@ private long pollFollower(long currentTimeMs) throws 
IOException {
 }
 
 private long pollFollowerAsVoter(FollowerState state, long currentTimeMs) 
throws IOException {
-if (state.hasFetchTimeoutExpired(currentTimeMs)) {
+GracefulShutdown shutdown = this.shutdown.get();
+if (shutdown != null) {
+// If we are a follower, then we can shutdown immediately. We want 
to
+// skip the transition to candidate in any case.
+return 0;

Review comment:
   Why the behavior of `pollFollowerAsVoter` and `pollVoted` are different 
when shutting down? Could the former case still help in casting and completing 
a vote as well?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #9531: KAFKA-10661; Add new resigned state for graceful shutdown/initialization

2020-11-08 Thread GitBox


guozhangwang commented on pull request #9531:
URL: https://github.com/apache/kafka/pull/9531#issuecomment-723747753


   > One thing I am strongly considering, however, is changing this state 
machine so that the resigned state is only for leaders. That would definitely 
simplify the logic. The optimization mentioned above in response to @dengziming 
's question seems unlikely to have much benefit in practice. We could always 
reconsider it in the future of course.
   
   Sounds good!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #9558: KAFKA-10342: migrate remaining RPCs to forwarding

2020-11-08 Thread GitBox


abbccdda commented on pull request #9558:
URL: https://github.com/apache/kafka/pull/9558#issuecomment-723717007


   @guozhangwang for a review



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10699) Add system test coverage for group coordinator emigration

2020-11-08 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10699:
---

 Summary: Add system test coverage for group coordinator emigration
 Key: KAFKA-10699
 URL: https://issues.apache.org/jira/browse/KAFKA-10699
 Project: Kafka
  Issue Type: Improvement
Reporter: Boyang Chen


After merging the fix https://issues.apache.org/jira/browse/KAFKA-10284, we 
believe that it is important to add system test coverage for the group 
coordinator migration to verify consumer behaviors are correct.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on pull request #9401: KAFKA-9628 Replace Produce request/response with automated protocol

2020-11-08 Thread GitBox


ijuma commented on pull request #9401:
URL: https://github.com/apache/kafka/pull/9401#issuecomment-723675299


   Can we summarize the regression here for a real world workload?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9575: MINOR: optimize lock operation

2020-11-08 Thread GitBox


chia7712 commented on a change in pull request #9575:
URL: https://github.com/apache/kafka/pull/9575#discussion_r519441272



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
##
@@ -112,12 +112,11 @@ public ByteBuffer allocate(int size, long 
maxTimeToBlockMs) throws InterruptedEx
+ " on memory allocations.");
 
 ByteBuffer buffer = null;
-this.lock.lock();

Review comment:
   The read/write of ```closed``` must be in lock so this improvement is 
not acceptable to ```BufferPool```.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9563: KAFKA-10684; Avoid additional envelope copies during network transmission

2020-11-08 Thread GitBox


chia7712 commented on a change in pull request #9563:
URL: https://github.com/apache/kafka/pull/9563#discussion_r519343434



##
File path: 
generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
##
@@ -1579,58 +1566,58 @@ private void generateVariableLengthFieldSize(FieldSpec 
field,
 }
 if (tagged) {
 
headerGenerator.addImport(MessageGenerator.BYTE_UTILS_CLASS);
+buffer.printf("int _arraySize = _size.totalSize() - 
_sizeBeforeArray;%n");
 buffer.printf("_cache.setArraySizeInBytes(%s, 
_arraySize);%n",
 field.camelCaseName());
-buffer.printf("_size += _arraySize + 
ByteUtils.sizeOfUnsignedVarint(_arraySize);%n");
-} else {
-buffer.printf("_size += _arraySize;%n");
+
buffer.printf("_size.addBytes(ByteUtils.sizeOfUnsignedVarint(_arraySize));%n");
 }
 } else if (field.type().isBytes()) {
+buffer.printf("int _sizeBeforeBytes = 
_size.totalSize();%n");

Review comment:
   ```java
   if (tagged) {
 buffer.printf("int _sizeBeforeBytes = _size.totalSize();%n");
   }
   ```

##
File path: clients/src/main/java/org/apache/kafka/common/protocol/Message.java
##
@@ -47,7 +47,20 @@
  *  If the specified version is too new to be supported
  *  by this software.
  */
-int size(ObjectSerializationCache cache, short version);
+default int size(ObjectSerializationCache cache, short version) {
+MessageSizeAccumulator size = new MessageSizeAccumulator();
+addSize(size, cache, version);
+return size.totalSize();
+}
+
+/**
+ * Add the size of this message to an accumulator.
+ *
+ * @param size  The size accumulator to add to
+ * @param cache The serialization size cache to populate.
+ * @param version   The version to use.
+ */
+void addSize(MessageSizeAccumulator size, ObjectSerializationCache cache, 
short version);

Review comment:
   I'm thinking about how to simplify this process.
   
   Could we reuse the method ```void write(Writable writable, 
ObjectSerializationCache cache, short version)``` ? Maybe we can create a 
```Writable``` instance but it does not write data to any output. Instead, it 
calculate the size of buffer according to input data.

##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/EnvelopeRequest.java
##
@@ -91,4 +91,14 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, 
Throwable e) {
 public static EnvelopeRequest parse(ByteBuffer buffer, short version) {
 return new EnvelopeRequest(ApiKeys.ENVELOPE.parseRequest(version, 
buffer), version);
 }
+
+public EnvelopeRequestData data() {
+return data;
+}
+
+@Override
+public Send toSend(String destination, RequestHeader header) {

Review comment:
   If all requests are using auto-generated data, should this be default 
implementation of ```AbstractRequest```?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org