Repository: incubator-geode Updated Branches: refs/heads/feature/GEODE-77 b8f40f4c1 -> f894238b5
GEODE-77 adding test coverage for JGroupsMessenger A couple of unit tests to make sure that request routing is working as expected and that message payload fragmentation is being performed. Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/f894238b Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/f894238b Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/f894238b Branch: refs/heads/feature/GEODE-77 Commit: f894238b5f9f54665728c69145a6c1c72dba3c21 Parents: b8f40f4 Author: Bruce Schuchardt <bschucha...@pivotal.io> Authored: Fri Aug 21 14:10:10 2015 -0700 Committer: Bruce Schuchardt <bschucha...@pivotal.io> Committed: Fri Aug 21 14:10:10 2015 -0700 ---------------------------------------------------------------------- .../membership/gms/messenger/InterceptUDP.java | 79 +++++++++ .../messenger/JGroupsMessengerJUnitTest.java | 167 +++++++++++++++++++ 2 files changed, 246 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f894238b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/InterceptUDP.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/InterceptUDP.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/InterceptUDP.java new file mode 100755 index 0000000..aafb466 --- /dev/null +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/InterceptUDP.java @@ -0,0 +1,79 @@ +package com.gemstone.gemfire.distributed.internal.membership.gms.messenger; + +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.Map; + +import org.jgroups.Address; +import org.jgroups.Event; +import org.jgroups.Message; +import org.jgroups.conf.ClassConfigurator; +import org.jgroups.protocols.UNICAST3; +import org.jgroups.protocols.pbcast.NAKACK2; +import org.jgroups.stack.IpAddress; +import org.jgroups.stack.Protocol; +import org.jgroups.util.UUID; + +/** + * FakeUDP replaces the regular UDP JGroups messaging protocol + * for unit testing. It does not create a datagram socket + * and is only set up to record message counts and respond + * to Unicast to keep it from retransmitting + */ +public class InterceptUDP extends Protocol { + + static final int MEMBERSHIP_PORT = 12345; + + private final short nakackHeaderId = ClassConfigurator.getProtocolId(NAKACK2.class); + private final short unicastHeaderId = ClassConfigurator.getProtocolId(UNICAST3.class); + + UUID uuid; +// IpAddress addr; +// Map<UUID, IpAddress> addressMap; + int unicastSentDataMessages; + + public InterceptUDP() { +// uuid = new UUID(); +// try { +// addr = new IpAddress("localhost", MEMBERSHIP_PORT); +// } catch (UnknownHostException e) { +// throw new RuntimeException("unexpected exception", e); +// } +// addressMap = new HashMap<>(); +// addressMap.put(uuid, addr); + } + + @Override + public Object up(Event evt) { + return up_prot.up(evt); + } + + @Override + public Object down(Event evt) { + switch (evt.getType()) { + case Event.MSG: + handleMessage((Message)evt.getArg()); + return null; + case Event.SET_LOCAL_ADDRESS: + uuid=(UUID)evt.getArg(); + break; + } + return down_prot.down(evt); + } + + private void handleMessage(Message msg) { + Object o = msg.getHeader(nakackHeaderId); + o = msg.getHeader(unicastHeaderId); + if (o != null) { + UNICAST3.Header hdr = (UNICAST3.Header)o; + switch (hdr.type()) { + case UNICAST3.Header.DATA: + unicastSentDataMessages++; + Message response = new Message(uuid, msg.getDest(), null); + response.putHeader(unicastHeaderId, UNICAST3.Header.createAckHeader(hdr.seqno(), hdr.connId(), System.currentTimeMillis())); + up_prot.up(new Event(Event.MSG, response)); + break; + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f894238b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java new file mode 100755 index 0000000..515c115 --- /dev/null +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java @@ -0,0 +1,167 @@ +package com.gemstone.gemfire.distributed.internal.membership.gms.messenger; + +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Properties; + +import org.jgroups.Event; +import org.jgroups.Message; +import org.jgroups.util.UUID; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.gemstone.gemfire.distributed.internal.DMStats; +import com.gemstone.gemfire.distributed.internal.DistributionConfig; +import com.gemstone.gemfire.distributed.internal.DistributionConfigImpl; +import com.gemstone.gemfire.distributed.internal.DistributionManager; +import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; +import com.gemstone.gemfire.distributed.internal.membership.gms.GMSMember; +import com.gemstone.gemfire.distributed.internal.membership.gms.ServiceConfig; +import com.gemstone.gemfire.distributed.internal.membership.gms.Services; +import com.gemstone.gemfire.distributed.internal.membership.gms.Services.Stopper; +import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.JoinLeave; +import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Manager; +import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.MessageHandler; +import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinRequestMessage; +import com.gemstone.gemfire.distributed.internal.membership.gms.messages.LeaveRequestMessage; +import com.gemstone.gemfire.internal.Version; +import com.gemstone.gemfire.internal.admin.remote.RemoteTransportConfig; +import com.gemstone.gemfire.test.junit.categories.UnitTest; + +@Category(UnitTest.class) +public class JGroupsMessengerJUnitTest { + private Services services; + private JGroupsMessenger messenger; + private JoinLeave joinLeave; + private Manager manager; + private Stopper stopper; + private InterceptUDP interceptor; + + + /** + * Create stub and mock objects + */ + @Before + public void initMocks() throws Exception { + Properties nonDefault = new Properties(); + nonDefault.put(DistributionConfig.DISABLE_TCP_NAME, "true"); + nonDefault.put(DistributionConfig.MCAST_PORT_NAME, "0"); + nonDefault.put(DistributionConfig.LOG_FILE_NAME, ""); + nonDefault.put(DistributionConfig.LOG_LEVEL_NAME, "fine"); + nonDefault.put(DistributionConfig.LOCATORS_NAME, "localhost[10344]"); + DistributionConfigImpl config = new DistributionConfigImpl(nonDefault); + RemoteTransportConfig tconfig = new RemoteTransportConfig(config, + DistributionManager.NORMAL_DM_TYPE); + + stopper = mock(Stopper.class); + when(stopper.isCancelInProgress()).thenReturn(false); + + manager = mock(Manager.class); + + joinLeave = mock(JoinLeave.class); + + ServiceConfig serviceConfig = new ServiceConfig(tconfig, config); + + services = mock(Services.class); + when(services.getConfig()).thenReturn(serviceConfig); + when(services.getCancelCriterion()).thenReturn(stopper); + when(services.getManager()).thenReturn(manager); + when(services.getJoinLeave()).thenReturn(joinLeave); + when(services.getStatistics()).thenReturn(mock(DMStats.class)); + + messenger = new JGroupsMessenger(); + messenger.init(services); + + String jgroupsConfig = messenger.getJGroupsStackConfig(); + int startIdx = jgroupsConfig.indexOf("<UDP"); + int insertIdx = jgroupsConfig.indexOf('>', startIdx+4) + 1; + jgroupsConfig = jgroupsConfig.substring(0, insertIdx) + + "<"+InterceptUDP.class.getName()+"/>" + + jgroupsConfig.substring(insertIdx); + messenger.setJGroupsStackConfigForTesting(jgroupsConfig); + System.out.println("jgroups config: " + jgroupsConfig); + + messenger.start(); + messenger.started(); + + interceptor = (InterceptUDP)messenger.myChannel + .getProtocolStack().getTransport().getUpProtocol(); + + } + + @After + public void stopMessenger() { + if (messenger != null && messenger.myChannel != null) { + messenger.stop(); + } + } + + + @Test + public void testMessageDeliveredToHandler() throws Exception { + MessageHandler mh = mock(MessageHandler.class); + messenger.addHandler(JoinRequestMessage.class, mh); + + InternalDistributedMember sender = createAddress(8888); + JoinRequestMessage msg = new JoinRequestMessage(messenger.localAddress, sender, null); + + Message jmsg = messenger.createJGMessage(msg, messenger.jgAddress, Version.CURRENT_ORDINAL); + interceptor.up(new Event(Event.MSG, jmsg)); + + verify(mh, times(1)).processMessage(any(JoinRequestMessage.class)); + + LeaveRequestMessage lmsg = new LeaveRequestMessage(messenger.localAddress, sender, "testing"); + jmsg = messenger.createJGMessage(lmsg, messenger.jgAddress, Version.CURRENT_ORDINAL); + interceptor.up(new Event(Event.MSG, jmsg)); + + verify(manager).processMessage(any(LeaveRequestMessage.class)); + + } + + + @Test + public void testBigMessageIsFragmented() throws Exception { + MessageHandler mh = mock(MessageHandler.class); + messenger.addHandler(JoinRequestMessage.class, mh); + + InternalDistributedMember sender = createAddress(8888); + JoinRequestMessage msg = new JoinRequestMessage(messenger.localAddress, sender, null); + + messenger.send(msg); + assertTrue("expected 1 message to be sent but found "+ interceptor.unicastSentDataMessages, + interceptor.unicastSentDataMessages == 1); + + // send a big message and expect fragmentation + msg = new JoinRequestMessage(messenger.localAddress, sender, new byte[(int)(services.getConfig().getDistributionConfig().getUdpFragmentSize()*(1.5))]); + + interceptor.unicastSentDataMessages = 0; + messenger.send(msg); + assertTrue("expected 2 messages to be sent but found "+ interceptor.unicastSentDataMessages, + interceptor.unicastSentDataMessages == 2); + + } + + + /** + * creates an InternalDistributedMember address that can be used + * with the doctored JGroups channel. This includes a logical + * (UUID) address and a physical (IpAddress) address. + * + * @param port the UDP port to use for the new address + */ + private InternalDistributedMember createAddress(int port) { + GMSMember gms = new GMSMember("localhost", 8888); + gms.setUUID(UUID.randomUUID()); + gms.setVmKind(DistributionManager.NORMAL_DM_TYPE); + gms.setVersionOrdinal(Version.CURRENT_ORDINAL); + return new InternalDistributedMember(gms); + } + +}