Repository: kafka
Updated Branches:
  refs/heads/trunk 24a4e6146 -> 27107ee34


MINOR: JoinGroupRequest V0 invalid rebalance timeout

A JoinGroupRequest V0 built with the Builder had
a rebalance timeout  = -1 rather than equal to session timeout
as it would have been if coming from the wire and deserialized
from a V0 Struct

fix developed with mimaison

Author: Edoardo Comar <eco...@uk.ibm.com>

Reviewers: Rajini Sivaram

Closes #2936 from edoardocomar/MINOR-JoinGroupRequestV0


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/27107ee3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/27107ee3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/27107ee3

Branch: refs/heads/trunk
Commit: 27107ee34d1df89035eb9b9b4e11036fca6cf723
Parents: 24a4e61
Author: Edoardo Comar <eco...@uk.ibm.com>
Authored: Thu May 11 16:17:34 2017 -0400
Committer: Rajini Sivaram <rajinisiva...@googlemail.com>
Committed: Thu May 11 16:17:34 2017 -0400

----------------------------------------------------------------------
 .../apache/kafka/common/requests/JoinGroupRequest.java   |  3 ++-
 .../kafka/common/requests/RequestResponseTest.java       | 11 +++++++++--
 2 files changed, 11 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/27107ee3/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
index 1080fe7..ff07d13 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -90,7 +90,8 @@ public class JoinGroupRequest extends AbstractRequest {
         @Override
         public JoinGroupRequest build(short version) {
             if (version < 1) {
-                rebalanceTimeout = -1;
+                // v0 had no rebalance timeout but used session timeout 
implicitly
+                rebalanceTimeout = sessionTimeout;
             }
             return new JoinGroupRequest(version, groupId, sessionTimeout,
                     rebalanceTimeout, memberId, protocolType, groupProtocols);

http://git-wip-us.apache.org/repos/asf/kafka/blob/27107ee3/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 6443e4d..b1e83bf 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -511,7 +511,15 @@ public class RequestResponseTest {
         deserialized = (FetchRequest) deserialize(request, struct, 
request.version());
         assertEquals(request.isolationLevel(), deserialized.isolationLevel());
     }
-    
+
+    @Test
+    public void testJoinGroupRequestVersion0RebalanceTimeout() throws 
Exception {
+        final short version = 0;
+        JoinGroupRequest jgr = createJoinGroupRequest(version);
+        JoinGroupRequest jgr2 = new JoinGroupRequest(jgr.toStruct(), version);
+        assertEquals(jgr2.rebalanceTimeout(), jgr.rebalanceTimeout());
+    }
+
     private RequestHeader createRequestHeader() {
         return new RequestHeader((short) 10, (short) 1, "", 10);
     }
@@ -565,7 +573,6 @@ public class RequestResponseTest {
         return new HeartbeatResponse(Errors.NONE);
     }
 
-    @SuppressWarnings("deprecation")
     private JoinGroupRequest createJoinGroupRequest(int version) {
         ByteBuffer metadata = ByteBuffer.wrap(new byte[] {});
         List<JoinGroupRequest.ProtocolMetadata> protocols = new ArrayList<>();

Reply via email to