Repository: cassandra Updated Branches: refs/heads/trunk 2bad5d5b6 -> 5db822b71
Internode messaging handshake sends wrong messaging version number patch by jasobrown; reviewed by Dinesh Joshi for CASSANDRA-14540 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5db822b7 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5db822b7 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5db822b7 Branch: refs/heads/trunk Commit: 5db822b71ad7278ca6443455d029dd79e22388d8 Parents: 2bad5d5 Author: Jason Brown <jasedbr...@gmail.com> Authored: Fri Jun 22 13:56:17 2018 -0700 Committer: Jason Brown <jasedbr...@gmail.com> Committed: Mon Jun 25 06:36:59 2018 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../net/async/OutboundHandshakeHandler.java | 2 +- .../net/async/OutboundHandshakeHandlerTest.java | 59 ++++++++++++++++++-- 3 files changed, 55 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5db822b7/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 047689e..fb14e40 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Internode messaging handshake sends wrong messaging version number (CASSANDRA-14540) * Add a virtual table to expose active client connections (CASSANDRA-14458) * Clean up and refactor client metrics (CASSANDRA-14524) * Nodetool import row cache invalidation races with adding sstables to tracker (CASSANDRA-14529) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5db822b7/src/java/org/apache/cassandra/net/async/OutboundHandshakeHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/async/OutboundHandshakeHandler.java b/src/java/org/apache/cassandra/net/async/OutboundHandshakeHandler.java index c555bed..3ccbf49 100644 --- a/src/java/org/apache/cassandra/net/async/OutboundHandshakeHandler.java +++ b/src/java/org/apache/cassandra/net/async/OutboundHandshakeHandler.java @@ -177,7 +177,7 @@ public class OutboundHandshakeHandler extends ByteToMessageDecoder try { - ctx.writeAndFlush(new ThirdHandshakeMessage(MessagingService.current_version, connectionId.local()).encode(ctx.alloc())); + ctx.writeAndFlush(new ThirdHandshakeMessage(peerMessagingVersion, connectionId.local()).encode(ctx.alloc())); ChannelWriter channelWriter = setupPipeline(ctx.channel(), peerMessagingVersion); callback.accept(HandshakeResult.success(channelWriter, peerMessagingVersion)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5db822b7/test/unit/org/apache/cassandra/net/async/OutboundHandshakeHandlerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/async/OutboundHandshakeHandlerTest.java b/test/unit/org/apache/cassandra/net/async/OutboundHandshakeHandlerTest.java index be71fd4..2d377af 100644 --- a/test/unit/org/apache/cassandra/net/async/OutboundHandshakeHandlerTest.java +++ b/test/unit/org/apache/cassandra/net/async/OutboundHandshakeHandlerTest.java @@ -18,7 +18,6 @@ package org.apache.cassandra.net.async; -import java.net.InetSocketAddress; import java.util.LinkedList; import java.util.List; import java.util.Optional; @@ -42,6 +41,7 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.async.HandshakeProtocol.SecondHandshakeMessage; +import org.apache.cassandra.net.async.HandshakeProtocol.ThirdHandshakeMessage; import org.apache.cassandra.net.async.OutboundHandshakeHandler.HandshakeResult; import static org.apache.cassandra.net.async.OutboundHandshakeHandler.HandshakeResult.UNKNOWN_PROTOCOL_VERSION; @@ -102,20 +102,31 @@ public class OutboundHandshakeHandlerTest } @Test - public void decode_HappyPath() throws Exception + public void decode_HappyPath() { buf = new SecondHandshakeMessage(MESSAGING_VERSION).encode(PooledByteBufAllocator.DEFAULT); channel.writeInbound(buf); Assert.assertEquals(1, channel.outboundMessages().size()); Assert.assertTrue(channel.isOpen()); - Assert.assertTrue(channel.releaseOutbound()); // throw away any responses from decode() Assert.assertEquals(MESSAGING_VERSION, callbackHandler.result.negotiatedMessagingVersion); Assert.assertEquals(HandshakeResult.Outcome.SUCCESS, callbackHandler.result.outcome); + Assert.assertFalse(channel.outboundMessages().isEmpty()); + + ByteBuf thridMsgBuf = (ByteBuf) channel.outboundMessages().poll(); + try + { + ThirdHandshakeMessage thirdHandshakeMessage = ThirdHandshakeMessage.maybeDecode(thridMsgBuf); + Assert.assertEquals(MESSAGING_VERSION, thirdHandshakeMessage.messagingVersion); + } + finally + { + thridMsgBuf.release(); + } } @Test - public void decode_HappyPathThrowsException() throws Exception + public void decode_HappyPathThrowsException() { callbackHandler.failOnCallback = true; buf = new SecondHandshakeMessage(MESSAGING_VERSION).encode(PooledByteBufAllocator.DEFAULT); @@ -129,7 +140,7 @@ public class OutboundHandshakeHandlerTest } @Test - public void decode_ReceivedLowerMsgVersion() throws Exception + public void decode_ReceivedUnexpectedLowerMsgVersion() { int msgVersion = MESSAGING_VERSION - 1; buf = new SecondHandshakeMessage(msgVersion).encode(PooledByteBufAllocator.DEFAULT); @@ -143,7 +154,43 @@ public class OutboundHandshakeHandlerTest } @Test - public void decode_ReceivedHigherMsgVersion() throws Exception + public void decode_ReceivedExpectedLowerMsgVersion() + { + int msgVersion = MESSAGING_VERSION - 1; + channel.pipeline().remove(HANDLER_NAME); + params = OutboundConnectionParams.builder() + .connectionId(connectionId) + .callback(handshakeResult -> callbackHandler.receive(handshakeResult)) + .mode(NettyFactory.Mode.MESSAGING) + .protocolVersion(msgVersion) + .coalescingStrategy(Optional.empty()) + .build(); + handler = new OutboundHandshakeHandler(params); + channel.pipeline().addFirst(HANDLER_NAME, handler); + + buf = new SecondHandshakeMessage(msgVersion).encode(PooledByteBufAllocator.DEFAULT); + channel.writeInbound(buf); + Assert.assertTrue(channel.inboundMessages().isEmpty()); + + Assert.assertEquals(msgVersion, callbackHandler.result.negotiatedMessagingVersion); + Assert.assertEquals(HandshakeResult.Outcome.SUCCESS, callbackHandler.result.outcome); + Assert.assertTrue(channel.isOpen()); + Assert.assertFalse(channel.outboundMessages().isEmpty()); + + ByteBuf thridMsgBuf = (ByteBuf) channel.outboundMessages().poll(); + try + { + ThirdHandshakeMessage thirdHandshakeMessage = ThirdHandshakeMessage.maybeDecode(thridMsgBuf); + Assert.assertEquals(msgVersion, thirdHandshakeMessage.messagingVersion); + } + finally + { + thridMsgBuf.release(); + } + } + + @Test + public void decode_ReceivedHigherMsgVersion() { int msgVersion = MESSAGING_VERSION - 1; channel.pipeline().remove(HANDLER_NAME); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org