This is an automated email from the ASF dual-hosted git repository. bschuchardt pushed a commit to branch feature/membership-only-test-wip in repository https://gitbox.apache.org/repos/asf/geode.git
commit 052e092461b617f4a80d70dcde738af1d5e964a2 Author: Bruce Schuchardt <bschucha...@pivotal.io> AuthorDate: Mon Dec 23 16:30:15 2019 -0800 test multiple membership managers and a locator with minimal geode-core dependencies --- .../membership/gms/GMSMembershipJUnitTest.java | 9 - .../membership/gms/MembershipOnlyJUnitTest.java | 722 +++++++++++++++++++++ .../gms/fd/GMSHealthMonitorJUnitTest.java | 73 +-- .../locator/GMSLocatorRecoveryIntegrationTest.java | 3 +- .../internal/ClusterOperationExecutors.java | 51 +- .../membership/adapter/GMSLocatorAdapter.java | 3 +- .../membership/adapter/LocalViewMessage.java | 84 --- .../internal/membership/gms/GMSMembership.java | 21 +- .../membership/gms/locator/GMSLocator.java | 10 +- .../MembershipDependenciesJUnitTest.java | 5 +- .../serialization/StaticSerialization.java | 1 + .../distributed/internal/tcpserver/TcpServer.java | 16 +- 12 files changed, 787 insertions(+), 211 deletions(-) diff --git a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/GMSMembershipJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/GMSMembershipJUnitTest.java index 44ff1f0..796f148 100644 --- a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/GMSMembershipJUnitTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/GMSMembershipJUnitTest.java @@ -56,7 +56,6 @@ import org.apache.geode.distributed.internal.DistributionConfigImpl; import org.apache.geode.distributed.internal.DistributionMessage; import org.apache.geode.distributed.internal.HighPriorityAckedMessage; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; -import org.apache.geode.distributed.internal.membership.adapter.LocalViewMessage; import org.apache.geode.distributed.internal.membership.adapter.ServiceConfig; import org.apache.geode.distributed.internal.membership.gms.GMSMembership.StartupEvent; import org.apache.geode.distributed.internal.membership.gms.Services.Stopper; @@ -270,14 +269,6 @@ public class GMSMembershipJUnitTest { // supriseMember should have been rejected (old view ID) verify(listener, never()).newMemberConnected(surpriseMember); - // for code coverage also install a view after we finish joining but before - // event processing has started. This should notify the distribution manager - // with a LocalViewMessage to process the view - reset(listener); - manager.handleOrDeferViewEvent(new MembershipView(myMemberId, 5, viewmembers)); - assertEquals(0, manager.getStartupEvents().size()); - verify(messageListener).messageReceived(isA(LocalViewMessage.class)); - // process a suspect now - it will be passed to the listener reset(listener); suspectMember = mockMembers[1]; diff --git a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/MembershipOnlyJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/MembershipOnlyJUnitTest.java new file mode 100644 index 0000000..5b45b4f --- /dev/null +++ b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/MembershipOnlyJUnitTest.java @@ -0,0 +1,722 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.distributed.internal.membership.gms; + +import static org.apache.geode.distributed.ConfigurationProperties.DISABLE_TCP; +import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; +import static org.apache.geode.distributed.ConfigurationProperties.LOG_FILE; +import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL; +import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; +import static org.apache.geode.distributed.ConfigurationProperties.MEMBER_TIMEOUT; +import static org.apache.geode.test.awaitility.GeodeAwaitility.await; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.commons.lang3.tuple.Pair; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import org.apache.geode.distributed.internal.DistributionConfigImpl; +import org.apache.geode.distributed.internal.LocatorStats; +import org.apache.geode.distributed.internal.membership.adapter.ServiceConfig; +import org.apache.geode.distributed.internal.membership.gms.api.Authenticator; +import org.apache.geode.distributed.internal.membership.gms.api.LifecycleListener; +import org.apache.geode.distributed.internal.membership.gms.api.MemberData; +import org.apache.geode.distributed.internal.membership.gms.api.MemberDataBuilder; +import org.apache.geode.distributed.internal.membership.gms.api.MemberIdentifier; +import org.apache.geode.distributed.internal.membership.gms.api.MemberIdentifierFactory; +import org.apache.geode.distributed.internal.membership.gms.api.MemberShunnedException; +import org.apache.geode.distributed.internal.membership.gms.api.MemberStartupException; +import org.apache.geode.distributed.internal.membership.gms.api.Membership; +import org.apache.geode.distributed.internal.membership.gms.api.MembershipBuilder; +import org.apache.geode.distributed.internal.membership.gms.api.MembershipListener; +import org.apache.geode.distributed.internal.membership.gms.api.MembershipStatistics; +import org.apache.geode.distributed.internal.membership.gms.api.MessageListener; +import org.apache.geode.distributed.internal.membership.gms.interfaces.JoinLeave; +import org.apache.geode.distributed.internal.membership.gms.locator.GMSLocator; +import org.apache.geode.distributed.internal.membership.gms.membership.GMSJoinLeave; +import org.apache.geode.distributed.internal.membership.gms.messages.AbstractGMSMessage; +import org.apache.geode.distributed.internal.tcpserver.ConnectionWatcher; +import org.apache.geode.distributed.internal.tcpserver.ProtocolChecker; +import org.apache.geode.distributed.internal.tcpserver.TcpClient; +import org.apache.geode.distributed.internal.tcpserver.TcpServer; +import org.apache.geode.distributed.internal.tcpserver.TcpSocketCreator; +import org.apache.geode.internal.AvailablePortHelper; +import org.apache.geode.internal.admin.remote.RemoteTransportConfig; +import org.apache.geode.internal.serialization.DSFIDSerializer; +import org.apache.geode.internal.serialization.DSFIDSerializerFactory; +import org.apache.geode.internal.serialization.DeserializationContext; +import org.apache.geode.internal.serialization.SerializationContext; +import org.apache.geode.internal.serialization.StaticSerialization; +import org.apache.geode.internal.serialization.Version; + +@Category({MembershipOnlyJUnitTest.class}) +public class MembershipOnlyJUnitTest { + + public static final int MemberID_DSFID = 2002; + public static final int SerialMessage_DSFID = 2001; + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + private TcpClient locatorClient; + + @Test + public void testMultipleManagersInSameProcess() throws Exception { + Membership<MemberID> m1 = null, m2 = null; + TcpServer tcpServer = null; + + final MemberIdentifierFactory memberFactory = mock(MemberIdentifierFactory.class); + when(memberFactory.create(isA(GMSMemberData.class))).thenAnswer(new Answer<MemberIdentifier>() { + @Override + public MemberIdentifier answer(InvocationOnMock invocation) throws Throwable { + return new MemberID((GMSMemberData) invocation.getArgument(0)); + } + }); + + try { + final DSFIDSerializer dsfidSerializer = createDSFIDSerializer(); + TcpSocketCreator socketCreator = new TestTcpSocketCreator(); + locatorClient = new TcpClient(socketCreator, dsfidSerializer.getObjectSerializer(), + dsfidSerializer.getObjectDeserializer()); + String locatorsString = ""; + GMSLocator locator = new GMSLocator(InetAddress.getLocalHost(), locatorsString, + true, false, mock( + LocatorStats.class), + "", new File("").toPath(), locatorClient, + dsfidSerializer.getObjectSerializer(), dsfidSerializer.getObjectDeserializer()); + int locationServicePort = AvailablePortHelper.getRandomAvailableTCPPort(); + tcpServer = new TcpServer(locationServicePort, InetAddress.getLocalHost(), locator, + "testTcpServerThread", mock(ProtocolChecker.class), + System::nanoTime, MembershipOnlyJUnitTest::newThreadPool, socketCreator, + dsfidSerializer.getObjectSerializer(), + dsfidSerializer.getObjectDeserializer(), + "scooby_scooby_doo", + "where_are_you"); + System.out.println("Test is starting a locator"); + tcpServer.start(); + + // boot up a locator + InetAddress localHost = InetAddress.getLocalHost(); + String locators = localHost.getHostName() + '[' + locationServicePort + ']'; + + + System.out.println("Test is creating the first membership manager"); + try { + System.setProperty(GMSJoinLeave.BYPASS_DISCOVERY_PROPERTY, "true"); + m1 = createMembershipManager(locators, memberFactory).getLeft(); + } finally { + System.getProperties().remove(GMSJoinLeave.BYPASS_DISCOVERY_PROPERTY); + } + locator.setMembership(m1); + + System.out.println("Test is creating the second membership manager"); + final Pair<Membership, MessageListener> pair = + createMembershipManager(locators, memberFactory); + m2 = pair.getLeft(); + final MessageListener listener2 = pair.getRight(); + + // we have to check the views with JoinLeave because the membership + // manager queues new views for processing through the DM listener, + // which is a mock object in this test + System.out.println("waiting for views to stabilize"); + JoinLeave jl1 = ((GMSMembership) m1).getServices().getJoinLeave(); + JoinLeave jl2 = ((GMSMembership) m2).getServices().getJoinLeave(); + long giveUp = System.currentTimeMillis() + 15000; + for (;;) { + try { + assertTrue("view = " + jl2.getView(), jl2.getView().size() == 2); + assertTrue("view = " + jl1.getView(), jl1.getView().size() == 2); + assertTrue(jl1.getView().getCreator().equals(jl2.getView().getCreator())); + assertTrue(jl1.getView().getViewId() == jl2.getView().getViewId()); + break; + } catch (AssertionError e) { + if (System.currentTimeMillis() > giveUp) { + throw e; + } + } + } + + GMSMembershipView<MemberID> view = jl1.getView(); + MemberIdentifier notCreator; + if (view.getCreator().equals(jl1.getMemberID())) { + notCreator = view.getMembers().get(1); + } else { + notCreator = view.getMembers().get(0); + } + List<String> result = notCreator.getGroups(); + + System.out.println("sending SerialMessage from m1 to m2"); + SerialMessage msg = new SerialMessage(); + msg.setRecipient(m2.getLocalMember()); + msg.setMulticast(false); + m1.send(new MemberID[] {m2.getLocalMember()}, msg); + giveUp = System.currentTimeMillis() + 15000; + boolean verified = false; + Throwable problem = null; + while (giveUp > System.currentTimeMillis()) { + try { + verify(listener2).messageReceived(isA(SerialMessage.class)); + verified = true; + break; + } catch (Error e) { + problem = e; + Thread.sleep(500); + } + } + if (!verified) { + AssertionError error = new AssertionError("Expected a message to be received"); + if (problem != null) { + error.initCause(problem); + } + throw error; + } + + // let the managers idle for a while and get used to each other + // Thread.sleep(4000l); + + m2.disconnect(false); + assertTrue(!m2.isConnected()); + + System.out.println("view is " + m1.getView()); + final Membership<MemberID> waitingMember = m1; + await().untilAsserted(() -> assertTrue(waitingMember.getView().size() == 1)); + } finally { + + if (m2 != null) { + m2.shutdown(); + } + if (m1 != null) { + m1.shutdown(); + } + if (tcpServer != null) { + tcpServer.requestShutdown(); + tcpServer.join(300_000); + } + } + } + + private DSFIDSerializer createDSFIDSerializer() { + final DSFIDSerializer dsfidSerializer = new DSFIDSerializerFactory().create(); + Services.registerSerializables(dsfidSerializer); + dsfidSerializer.registerDSFID(SerialMessage_DSFID, SerialMessage.class); + dsfidSerializer.registerDSFID(MemberID_DSFID, MemberID.class); + return dsfidSerializer; + } + + private Pair<Membership, MessageListener> createMembershipManager(String locators, + MemberIdentifierFactory memberIdentifierFactory) + throws MemberStartupException, MemberShunnedException { + // create configuration objects + Properties nonDefault = new Properties(); + nonDefault.put(DISABLE_TCP, "true"); + nonDefault.put(MCAST_PORT, "0"); + nonDefault.put(LOG_FILE, ""); + nonDefault.put(LOG_LEVEL, "fine"); + nonDefault.put(MEMBER_TIMEOUT, "2000"); + nonDefault.put(LOCATORS, locators); + DistributionConfigImpl config = new DistributionConfigImpl(nonDefault); + RemoteTransportConfig transport = + new RemoteTransportConfig(config, GMSMemberData.NORMAL_DM_TYPE); + + final MembershipListener listener = mock(MembershipListener.class); + final MessageListener messageListener = mock(MessageListener.class); + DSFIDSerializer serializer = createDSFIDSerializer(); + + LifecycleListener lifeCycleListener = mock(LifecycleListener.class); + final Membership m1 = + MembershipBuilder.<MemberID>newMembershipBuilder() + .setAuthenticator(mock(Authenticator.class)) + .setStatistics(mock(MembershipStatistics.class)) + .setMessageListener(messageListener) + .setMembershipListener(listener) + .setConfig(new ServiceConfig(transport, config)) + .setSerializer(serializer) + .setLifecycleListener(lifeCycleListener) + .setMemberIDFactory(memberIdentifierFactory) + .setLocatorClient(locatorClient) + .setSocketCreator(new TestTcpSocketCreator()) + .create(); + // doAnswer(invocation -> { + // DistributionImpl.connectLocatorToServices(m1); + // return null; + // }).when(lifeCycleListener).started(); + m1.start(); + m1.startEventProcessing(); + return Pair.of(m1, messageListener); + } + + + public static class MemberID implements MemberIdentifier { + MemberData memberData; + private boolean isPartialIdentifier; + + public MemberID() {} // constructor for deserialization + + public MemberID(GMSMemberData data) { + memberData = data; + } + + @Override + public MemberData getMemberData() { + return memberData; + } + + public boolean equals(Object o) { + return compareTo(o) == 0; + } + + public int compareTo(Object o) { + return compareTo(o, true, false); + } + + public int compareTo(Object o, boolean compareViewIds, boolean compareMemberData) { + if (this == o) { + return 0; + } + // obligatory type check + if (!(o instanceof MemberID)) + throw new ClassCastException( + "MemberID.compareTo(): comparison between different classes"); + MemberID other = (MemberID) o; + + int myPort = getMembershipPort(); + int otherPort = other.getMembershipPort(); + if (myPort < otherPort) + return -1; + if (myPort > otherPort) + return 1; + + + InetAddress myAddr = getInetAddress(); + InetAddress otherAddr = other.getInetAddress(); + + // Discard null cases + if (myAddr == null && otherAddr == null) { + return 0; + } else if (myAddr == null) { + return -1; + } else if (otherAddr == null) + return 1; + + byte[] myBytes = myAddr.getAddress(); + byte[] otherBytes = otherAddr.getAddress(); + + if (myBytes != otherBytes) { + for (int i = 0; i < myBytes.length; i++) { + if (i >= otherBytes.length) + return -1; // same as far as they go, but shorter... + if (myBytes[i] < otherBytes[i]) + return -1; + if (myBytes[i] > otherBytes[i]) + return 1; + } + if (myBytes.length > otherBytes.length) + return 1; // same as far as they go, but longer... + } + if (compareViewIds) { + // not loners, so look at P2P view ID + int thisViewId = getVmViewId(); + int otherViewId = other.getVmViewId(); + if (thisViewId >= 0 && otherViewId >= 0) { + if (thisViewId < otherViewId) { + return -1; + } else if (thisViewId > otherViewId) { + return 1; + } // else they're the same, so continue + } + } + + if (compareMemberData && this.memberData != null && other.memberData != null) { + return this.memberData.compareAdditionalData(other.memberData); + } else { + return 0; + } + } + + @Override + public int hashCode() { + int result = 0; + result = result + memberData.getInetAddress().hashCode(); + result = result + getMembershipPort(); + return result; + } + + @Override + public String getHostName() { + return memberData.getHostName(); + } + + @Override + public InetAddress getInetAddress() { + return memberData.getInetAddress(); + } + + @Override + public int getMembershipPort() { + return memberData.getMembershipPort(); + } + + @Override + public short getVersionOrdinal() { + return memberData.getVersionOrdinal(); + } + + @Override + public int getVmViewId() { + return memberData.getVmViewId(); + } + + @Override + public boolean preferredForCoordinator() { + return memberData.isPreferredForCoordinator(); + } + + @Override + public int getVmKind() { + return memberData.getVmKind(); + } + + @Override + public int getMemberWeight() { + return memberData.getMemberWeight(); + } + + @Override + public List<String> getGroups() { + return Arrays.asList(memberData.getGroups()); + } + + @Override + public void setVmViewId(int viewNumber) { + memberData.setVmViewId(viewNumber); + } + + @Override + public void setPreferredForCoordinator(boolean preferred) { + memberData.setPreferredForCoordinator(preferred); + } + + @Override + public void setDirectChannelPort(int dcPort) { + memberData.setDirectChannelPort(dcPort); + } + + @Override + public void setVmKind(int dmType) { + memberData.setVmKind(dmType); + } + + @Override + public Version getVersionObject() { + return Version.fromOrdinalNoThrow(memberData.getVersionOrdinal(), false); + } + + @Override + public void setMemberData(MemberData memberData) { + this.memberData = memberData; + } + + @Override + public void setIsPartial(boolean b) { + isPartialIdentifier = true; + } + + @Override + public boolean isPartial() { + return isPartialIdentifier; + } + + @Override + public int getDSFID() { + return MemberID_DSFID; + } + + @Override + public void toData(DataOutput out, SerializationContext context) throws IOException { + StaticSerialization.writeInetAddress(getInetAddress(), out); + out.writeInt(getMembershipPort()); + + StaticSerialization.writeString(memberData.getHostName(), out); + out.writeBoolean(memberData.isNetworkPartitionDetectionEnabled()); + out.writeBoolean(memberData.isPreferredForCoordinator()); + out.writeBoolean(memberData.isPartial()); + + out.writeInt(memberData.getDirectChannelPort()); + out.writeInt(memberData.getProcessId()); + int vmKind = memberData.getVmKind(); + out.writeByte(vmKind); + StaticSerialization.writeStringArray(memberData.getGroups(), out); + + StaticSerialization.writeString(memberData.getName(), out); + out.writeInt(memberData.getVmViewId()); + short version = memberData.getVersionOrdinal(); + out.writeInt(version); + memberData.writeAdditionalData(out); + } + + @Override + public void fromData(DataInput in, DeserializationContext context) + throws IOException, ClassNotFoundException { + InetAddress inetAddr = StaticSerialization.readInetAddress(in); + int port = in.readInt(); + + String hostName = StaticSerialization.readString(in); + + boolean sbEnabled = in.readBoolean(); + boolean elCoord = in.readBoolean(); + isPartialIdentifier = in.readBoolean(); + + int dcPort = in.readInt(); + int vmPid = in.readInt(); + int vmKind = in.readUnsignedByte(); + String[] groups = StaticSerialization.readStringArray(in); + + String name = StaticSerialization.readString(in); + int vmViewId = in.readInt(); + + int version = in.readInt(); + + memberData = MemberDataBuilder.newBuilder(inetAddr, hostName) + .setMembershipPort(port) + .setDirectChannelPort(dcPort) + .setName(name) + .setNetworkPartitionDetectionEnabled(sbEnabled) + .setPreferredForCoordinator(elCoord) + .setVersionOrdinal((short) version) + .setVmPid(vmPid) + .setVmKind(vmKind) + .setVmViewId(vmViewId) + .setGroups(groups) + .build(); + + memberData.readAdditionalData(in); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder(); + addFixedToString(sb); + + // add version if not current + short version = memberData.getVersionOrdinal(); + if (version != Version.CURRENT.ordinal()) { + sb.append("(version:").append(Version.toString(version)).append(')'); + } + + // leave out Roles on purpose + + return sb.toString(); + } + + public void addFixedToString(StringBuilder sb) { + // Note: This method is used to generate the HARegion name. If it is changed, memory and GII + // issues will occur in the case of clients with subscriptions during rolling upgrade. + String host; + + InetAddress add = getInetAddress(); + if (add.isMulticastAddress()) + host = add.getHostAddress(); + else { + String hostName = memberData.getHostName(); + host = hostName; + } + + sb.append(host); + sb.append('/').append(memberData.getInetAddress().getHostAddress()); + + int vmPid = memberData.getProcessId(); + int vmKind = memberData.getVmKind(); + if (vmPid > 0 || vmKind != GMSMemberData.NORMAL_DM_TYPE) { + sb.append("("); + + if (vmPid > 0) + sb.append(vmPid); + + String vmStr = ""; + switch (vmKind) { + case GMSMemberData.NORMAL_DM_TYPE: + // vmStr = ":local"; // let this be silent + break; + case GMSMemberData.LOCATOR_DM_TYPE: + vmStr = ":locator"; + break; + case GMSMemberData.ADMIN_ONLY_DM_TYPE: + vmStr = ":admin"; + break; + case GMSMemberData.LONER_DM_TYPE: + vmStr = ":loner"; + break; + default: + vmStr = ":<unknown:" + vmKind + ">"; + break; + } + sb.append(vmStr); + sb.append(")"); + } + if (vmKind != GMSMemberData.LONER_DM_TYPE + && memberData.isPreferredForCoordinator()) { + sb.append("<ec>"); + } + int vmViewId = getVmViewId(); + if (vmViewId >= 0) { + sb.append("<v" + vmViewId + ">"); + } + sb.append(":"); + sb.append(getMembershipPort()); + } + + @Override + public Version[] getSerializationVersions() { + return new Version[0]; + } + } + + public static class LocationService { + TcpServer tcpServer; + + public LocationService(TcpServer server) { + this.tcpServer = server; + } + } + + public static class TestTcpSocketCreator implements TcpSocketCreator { + + public TestTcpSocketCreator() {} + + @Override + public boolean useSSL() { + return false; + } + + @Override + public ServerSocket createServerSocket(int nport, int backlog) throws IOException { + return new ServerSocket(nport, backlog); + } + + @Override + public ServerSocket createServerSocket(int nport, int backlog, InetAddress bindAddr) + throws IOException { + ServerSocket socket = new ServerSocket(); + socket.bind(new InetSocketAddress(bindAddr, nport), backlog); + return socket; + } + + @Override + public InetAddress getLocalHost() throws UnknownHostException { + return InetAddress.getLocalHost(); + } + + @Override + public String getHostName(InetAddress addr) { + return addr.getCanonicalHostName(); + } + + @Override + public ServerSocket createServerSocketUsingPortRange(InetAddress ba, int backlog, + boolean isBindAddress, boolean useNIO, + int tcpBufferSize, int[] tcpPortRange, + boolean sslConnection) throws IOException { + for (int port = tcpPortRange[0]; port < tcpPortRange[1]; port++) { + try { + return createServerSocket(port, backlog, ba); + } catch (IOException e) { + // port in use - continue + } + } + throw new IOException("unable to allocate a server port in the specified range: " + + Arrays.toString(tcpPortRange)); + } + + + @Override + public Socket connect(InetAddress inetadd, int port, int timeout, + ConnectionWatcher optionalWatcher, boolean clientSide) + throws IOException { + Socket socket = new Socket(); + socket.setSoTimeout(timeout); + // ignore optionalWatcher for now + // ignore clientSide for now - used for geode client/server comms and SSL handshakes + socket.connect(new InetSocketAddress(inetadd, port)); + return socket; + } + + @Override + public Socket connect(InetAddress inetadd, int port, int timeout, + ConnectionWatcher optionalWatcher, boolean clientSide, + int socketBufferSize, boolean sslConnection) throws IOException { + return connect(inetadd, port, timeout, optionalWatcher, clientSide); + } + + @Override + public void handshakeIfSocketIsSSL(Socket socket, int timeout) throws IOException { + // see useSSL() + } + + @Override + public boolean resolveDns() { + return true; + } + } + + public static class SerialMessage extends AbstractGMSMessage { + + @Override + public int getDSFID() { + return SerialMessage_DSFID; + } + + @Override + public void toData(DataOutput out, SerializationContext context) throws IOException { + + } + + @Override + public void fromData(DataInput in, DeserializationContext context) + throws IOException, ClassNotFoundException { + + } + + @Override + public Version[] getSerializationVersions() { + return null; + } + } + + public static ExecutorService newThreadPool() { + return Executors.newCachedThreadPool(); + } + +} diff --git a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java index c643152..74e6c33 100644 --- a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java @@ -191,17 +191,13 @@ public class GMSHealthMonitorJUnitTest { } @Test - public void testHMServiceStarted() throws IOException { + public void testHMServiceStarted() throws IOException, MemberStartupException { MemberIdentifier mbr = new InternalDistributedMember("localhost", 12345); mbr.setVmViewId(1); when(messenger.getMemberID()).thenReturn(mbr); - try { - gmsHealthMonitor.started(); - } catch (MemberStartupException e) { - e.printStackTrace(); - } + gmsHealthMonitor.started(); GMSMembershipView v = new GMSMembershipView(mbr, 1, mockMembers); @@ -212,16 +208,12 @@ public class GMSHealthMonitorJUnitTest { } @Test - public void testHMServiceHandlesShutdownRace() throws IOException { + public void testHMServiceHandlesShutdownRace() throws Exception { // The health monitor starts a thread to monitor the tcp socket, both that thread and the // stopServices call will attempt to shut down the socket during a normal close. This test tries // to create a problematic ordering to make sure we still shutdown properly. ((GMSHealthMonitorTest) gmsHealthMonitor).useBlockingSocket = true; - try { - gmsHealthMonitor.started(); - } catch (MemberStartupException e) { - e.printStackTrace(); - } + gmsHealthMonitor.started(); gmsHealthMonitor.stop(); } @@ -229,7 +221,7 @@ public class GMSHealthMonitorJUnitTest { * checks who is next neighbor */ @Test - public void testHMNextNeighborVerify() throws IOException { + public void testHMNextNeighborVerify() throws Exception { installAView(); assertEquals(mockMembers.get(myAddressIndex + 1), gmsHealthMonitor.getNextNeighbor()); } @@ -260,7 +252,7 @@ public class GMSHealthMonitorJUnitTest { */ @Test - public void testHMNextNeighborBeforeTimeout() throws IOException { + public void testHMNextNeighborBeforeTimeout() throws Exception { long startTime = System.currentTimeMillis(); installAView(); final MemberIdentifier neighbor = gmsHealthMonitor.getNextNeighbor(); @@ -293,17 +285,12 @@ public class GMSHealthMonitorJUnitTest { System.out.println("testSuspectMembersCalledThroughMemberCheckThread ending"); } - private GMSMembershipView installAView() { + private GMSMembershipView installAView() throws Exception { GMSMembershipView v = new GMSMembershipView(mockMembers.get(0), 2, mockMembers); // 3rd is current member when(messenger.getMemberID()).thenReturn(mockMembers.get(myAddressIndex)); - try { - gmsHealthMonitor.started(); - } catch (MemberStartupException e) { - e.printStackTrace(); - } - + gmsHealthMonitor.started(); gmsHealthMonitor.installView(v); return v; @@ -322,7 +309,7 @@ public class GMSHealthMonitorJUnitTest { * checks ping thread didn't sends suspectMembers message before timeout */ @Test - public void testSuspectMembersNotCalledThroughPingThreadBeforeTimeout() { + public void testSuspectMembersNotCalledThroughPingThreadBeforeTimeout() throws Exception { long startTime = System.currentTimeMillis(); installAView(); MemberIdentifier neighbor = gmsHealthMonitor.getNextNeighbor(); @@ -354,7 +341,7 @@ public class GMSHealthMonitorJUnitTest { * Checks suspect thread doesn't sends suspectMembers message before timeout */ @Test - public void testSuspectMembersNotCalledThroughSuspectThreadBeforeTimeout() { + public void testSuspectMembersNotCalledThroughSuspectThreadBeforeTimeout() throws Exception { installAView(); gmsHealthMonitor.suspect(mockMembers.get(1), "Not responding"); @@ -399,18 +386,14 @@ public class GMSHealthMonitorJUnitTest { * Shouldn't send remove member message before doing final check, or before ping Timeout */ @Test - public void testRemoveMemberNotCalledBeforeTimeout() { + public void testRemoveMemberNotCalledBeforeTimeout() throws Exception { System.out.println("testRemoveMemberNotCalledBeforeTimeout starting"); GMSMembershipView v = new GMSMembershipView(mockMembers.get(0), 2, mockMembers); // 3rd is current member when(messenger.getMemberID()).thenReturn(mockMembers.get(0)); // coordinator and local member when(joinLeave.getMemberID()).thenReturn(mockMembers.get(0)); // coordinator and local member - try { - gmsHealthMonitor.started(); - } catch (MemberStartupException e) { - e.printStackTrace(); - } + gmsHealthMonitor.started(); gmsHealthMonitor.installView(v); @@ -470,7 +453,7 @@ public class GMSHealthMonitorJUnitTest { } @Test - public void testCheckIfAvailableWithSimulatedHeartBeat() { + public void testCheckIfAvailableWithSimulatedHeartBeat() throws Exception { GMSMembershipView v = installAView(); MemberIdentifier memberToCheck = mockMembers.get(1); @@ -489,7 +472,7 @@ public class GMSHealthMonitorJUnitTest { } @Test - public void testCheckIfAvailableWithSimulatedHeartBeatWithTcpCheck() { + public void testCheckIfAvailableWithSimulatedHeartBeatWithTcpCheck() throws Exception { System.out.println("testCheckIfAvailableWithSimulatedHeartBeatWithTcpCheck"); useGMSHealthMonitorTestClass = true; @@ -508,7 +491,7 @@ public class GMSHealthMonitorJUnitTest { } @Test - public void testMemberIsExaminedAgainAfterPassingAvailabilityCheck() { + public void testMemberIsExaminedAgainAfterPassingAvailabilityCheck() throws Exception { // use the test health monitor's availability check for the first round of suspect processing // but then turn it off so that a subsequent round is performed and fails to get a heartbeat useGMSHealthMonitorTestClass = true; @@ -534,7 +517,7 @@ public class GMSHealthMonitorJUnitTest { } @Test - public void testNeighborRemainsSameAfterSuccessfulFinalCheck() { + public void testNeighborRemainsSameAfterSuccessfulFinalCheck() throws Exception { useGMSHealthMonitorTestClass = true; try { @@ -560,7 +543,7 @@ public class GMSHealthMonitorJUnitTest { @Test - public void testNeighborChangesAfterFailedFinalCheck() { + public void testNeighborChangesAfterFailedFinalCheck() throws Exception { useGMSHealthMonitorTestClass = true; simulateHeartbeatInGMSHealthMonitorTestClass = false; @@ -588,7 +571,7 @@ public class GMSHealthMonitorJUnitTest { @Test - public void testExonerationMessageIsSentAfterSuccessfulFinalCheck() { + public void testExonerationMessageIsSentAfterSuccessfulFinalCheck() throws Exception { useGMSHealthMonitorTestClass = true; try { @@ -613,7 +596,7 @@ public class GMSHealthMonitorJUnitTest { } @Test - public void testExonerationMessageIsNotSentToVersion_1_3() { + public void testExonerationMessageIsNotSentToVersion_1_3() throws Exception { // versions older than 1.4 don't know about the FinalCheckPassedMessage class useGMSHealthMonitorTestClass = true; @@ -656,7 +639,7 @@ public class GMSHealthMonitorJUnitTest { @Test - public void testInitiatorRewatchesSuspectAfterSuccessfulFinalCheck() { + public void testInitiatorRewatchesSuspectAfterSuccessfulFinalCheck() throws Exception { GMSMembershipView v = installAView(); setFailureDetectionPorts(v); @@ -670,7 +653,7 @@ public class GMSHealthMonitorJUnitTest { @Test - public void testFinalCheckFailureLeavesMemberAsSuspect() { + public void testFinalCheckFailureLeavesMemberAsSuspect() throws Exception { useGMSHealthMonitorTestClass = true; simulateHeartbeatInGMSHealthMonitorTestClass = false; @@ -686,7 +669,7 @@ public class GMSHealthMonitorJUnitTest { } @Test - public void testFailedSelfCheckRemovesMemberAsSuspect() { + public void testFailedSelfCheckRemovesMemberAsSuspect() throws Exception { useGMSHealthMonitorTestClass = true; simulateHeartbeatInGMSHealthMonitorTestClass = false; allowSelfCheckToSucceed = false; @@ -710,7 +693,7 @@ public class GMSHealthMonitorJUnitTest { * a failed availablility check should initiate suspect processing */ @Test - public void testFailedCheckIfAvailableDoesNotRemoveMember() { + public void testFailedCheckIfAvailableDoesNotRemoveMember() throws Exception { useGMSHealthMonitorTestClass = true; simulateHeartbeatInGMSHealthMonitorTestClass = false; @@ -730,7 +713,7 @@ public class GMSHealthMonitorJUnitTest { * Same test as above but with request to initiate removal */ @Test - public void testFailedCheckIfAvailableRemovesMember() { + public void testFailedCheckIfAvailableRemovesMember() throws Exception { useGMSHealthMonitorTestClass = true; simulateHeartbeatInGMSHealthMonitorTestClass = false; @@ -750,7 +733,8 @@ public class GMSHealthMonitorJUnitTest { */ @Test - public void testFailedCheckIfAvailableWithoutFailureDetectionPortDoesNotRemoveMember() { + public void testFailedCheckIfAvailableWithoutFailureDetectionPortDoesNotRemoveMember() + throws Exception { useGMSHealthMonitorTestClass = true; simulateHeartbeatInGMSHealthMonitorTestClass = false; @@ -769,7 +753,8 @@ public class GMSHealthMonitorJUnitTest { } @Test - public void testFailedCheckIfAvailableWithoutFailureDetectionPortRemovesMember() { + public void testFailedCheckIfAvailableWithoutFailureDetectionPortRemovesMember() + throws Exception { useGMSHealthMonitorTestClass = true; simulateHeartbeatInGMSHealthMonitorTestClass = false; @@ -788,7 +773,7 @@ public class GMSHealthMonitorJUnitTest { } @Test - public void testShutdown() { + public void testShutdown() throws Exception { installAView(); diff --git a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocatorRecoveryIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocatorRecoveryIntegrationTest.java index f0226fe..a8cf79d 100644 --- a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocatorRecoveryIntegrationTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocatorRecoveryIntegrationTest.java @@ -57,6 +57,7 @@ import org.apache.geode.distributed.internal.membership.gms.api.MemberIdentifier import org.apache.geode.distributed.internal.membership.gms.api.MembershipListener; import org.apache.geode.distributed.internal.membership.gms.api.MessageListener; import org.apache.geode.distributed.internal.tcpserver.TcpClient; +import org.apache.geode.distributed.internal.tcpserver.TcpServer; import org.apache.geode.internal.AvailablePortHelper; import org.apache.geode.internal.InternalDataSerializer; import org.apache.geode.internal.admin.remote.RemoteTransportConfig; @@ -205,7 +206,7 @@ public class GMSLocatorRecoveryIntegrationTest { InternalDataSerializer.getDSFIDSerializer().getObjectSerializer(), InternalDataSerializer.getDSFIDSerializer().getObjectDeserializer()); gmsLocator.setViewFile(new File(temporaryFolder.getRoot(), "locator2.dat")); - gmsLocator.init(null); + gmsLocator.init((TcpServer) null); assertThat(gmsLocator.getMembers()) .contains(distribution.getLocalMember()); diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterOperationExecutors.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterOperationExecutors.java index 587ebf2..90fe3f0 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterOperationExecutors.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterOperationExecutors.java @@ -32,7 +32,6 @@ import org.apache.geode.CancelException; import org.apache.geode.InternalGemFireError; import org.apache.geode.SystemFailure; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; -import org.apache.geode.distributed.internal.membership.gms.messages.ViewAckMessage; import org.apache.geode.internal.logging.CoreLoggingExecutors; import org.apache.geode.internal.logging.log4j.LogMarker; import org.apache.geode.internal.monitoring.ThreadsMonitoring; @@ -108,14 +107,6 @@ public class ClusterOperationExecutors implements OperationExecutors { // 76 not in use - /** - * Executor for view related messages - * - * @see ViewAckMessage - */ - public static final int VIEW_EXECUTOR = 79; - - private InternalDistributedSystem system; private DistributionStats stats; @@ -157,13 +148,6 @@ public class ClusterOperationExecutors implements OperationExecutors { private ExecutorService serialThread; /** - * Message processing executor for view messages - * - * @see ViewAckMessage - */ - private ExecutorService viewThread; - - /** * If using a throttling queue for the serialThread, we cache the queue here so we can see if * delivery would block */ @@ -227,11 +211,6 @@ public class ClusterOperationExecutors implements OperationExecutors { } - viewThread = - CoreLoggingExecutors.newSerialThreadPoolWithUnlimitedFeed("View Message Processor", - thread -> stats.incViewThreadStarts(), this::doViewThread, - stats.getViewProcessorHelper(), threadMonitor); - threadPool = CoreLoggingExecutors.newThreadPoolWithFeedStatistics("Pooled Message Processor ", thread -> stats.incProcessingThreadStarts(), this::doProcessingThread, @@ -306,8 +285,6 @@ public class ClusterOperationExecutors implements OperationExecutors { return getThreadPool(); case SERIAL_EXECUTOR: return getSerialExecutor(sender); - case VIEW_EXECUTOR: - return viewThread; case HIGH_PRIORITY_EXECUTOR: return getHighPriorityThreadPool(); case WAITING_POOL_EXECUTOR: @@ -446,18 +423,6 @@ public class ClusterOperationExecutors implements OperationExecutors { } } - private void doViewThread(Runnable command) { - stats.incNumViewThreads(1); - try { - ConnectionTable.threadWantsSharedResources(); - Connection.makeReaderThread(); - runUntilShutdown(command); - } finally { - ConnectionTable.releaseThreadsSockets(); - stats.incNumViewThreads(-1); - } - } - private void doSerialThread(Runnable command) { stats.incNumSerialThreads(1); try { @@ -500,13 +465,6 @@ public class ClusterOperationExecutors implements OperationExecutors { if (es != null) { es.shutdown(); } - es = viewThread; - if (es != null) { - // Hmmm...OK, I'll let any view events currently in the queue be - // processed. Not sure it's very important whether they get - // handled... - es.shutdown(); - } if (serialQueuedExecutorPool != null) { serialQueuedExecutorPool.shutdown(); } @@ -548,7 +506,7 @@ public class ClusterOperationExecutors implements OperationExecutors { long start = System.currentTimeMillis(); long remaining = timeInMillis; - ExecutorService[] allExecutors = new ExecutorService[] {serialThread, viewThread, + ExecutorService[] allExecutors = new ExecutorService[] {serialThread, functionExecutionThread, functionExecutionPool, partitionedRegionThread, partitionedRegionPool, highPriorityPool, waitingPool, prMetaDataCleanupThreadPool, threadPool}; @@ -597,10 +555,6 @@ public class ClusterOperationExecutors implements OperationExecutors { stillAlive = true; culprits.append(" serial thread;"); } - if (executorAlive(viewThread, "view thread")) { - stillAlive = true; - culprits.append(" view thread;"); - } if (executorAlive(partitionedRegionThread, "partitioned region thread")) { stillAlive = true; culprits.append(" partitioned region thread;"); @@ -651,9 +605,6 @@ public class ClusterOperationExecutors implements OperationExecutors { if (serialThread != null) { serialThread.shutdownNow(); } - if (viewThread != null) { - viewThread.shutdownNow(); - } if (functionExecutionThread != null) { functionExecutionThread.shutdownNow(); } diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/adapter/GMSLocatorAdapter.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/adapter/GMSLocatorAdapter.java index ff4bb24..1fe8061 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/adapter/GMSLocatorAdapter.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/adapter/GMSLocatorAdapter.java @@ -27,7 +27,6 @@ import org.apache.geode.distributed.internal.Distribution; import org.apache.geode.distributed.internal.InternalConfigurationPersistenceService; import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.distributed.internal.LocatorStats; -import org.apache.geode.distributed.internal.RestartableTcpHandler; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.distributed.internal.membership.NetLocator; import org.apache.geode.distributed.internal.membership.gms.api.Membership; @@ -40,7 +39,7 @@ import org.apache.geode.internal.InternalDataSerializer; import org.apache.geode.internal.net.SocketCreatorFactory; import org.apache.geode.internal.security.SecurableCommunicationChannel; -public class GMSLocatorAdapter implements RestartableTcpHandler, NetLocator { +public class GMSLocatorAdapter implements NetLocator { private final GMSLocator<InternalDistributedMember> gmsLocator; diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/adapter/LocalViewMessage.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/adapter/LocalViewMessage.java deleted file mode 100755 index 700c8a6..0000000 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/adapter/LocalViewMessage.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.distributed.internal.membership.adapter; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.geode.distributed.internal.ClusterDistributionManager; -import org.apache.geode.distributed.internal.ClusterOperationExecutors; -import org.apache.geode.distributed.internal.SerialDistributionMessage; -import org.apache.geode.distributed.internal.membership.InternalDistributedMember; -import org.apache.geode.distributed.internal.membership.gms.GMSMembership; -import org.apache.geode.distributed.internal.membership.gms.api.MembershipView; -import org.apache.geode.internal.serialization.DeserializationContext; -import org.apache.geode.internal.serialization.SerializationContext; - - -/** - * LocalViewMessage is used to pass a new membership view to the GemFire cache in an orderly manner. - * It is intended to be queued with serially executed messages so that the view takes effect at the - * proper time. - * - */ - -public class LocalViewMessage extends SerialDistributionMessage { - - private GMSMembership<InternalDistributedMember> manager; - private long viewId; - private MembershipView<InternalDistributedMember> view; - - public LocalViewMessage(InternalDistributedMember addr, long viewId, - MembershipView<InternalDistributedMember> view, - GMSMembership<InternalDistributedMember> manager) { - super(); - this.sender = addr; - this.viewId = viewId; - this.view = view; - this.manager = manager; - } - - @Override - public int getProcessorType() { - return ClusterOperationExecutors.VIEW_EXECUTOR; - } - - - @Override - protected void process(ClusterDistributionManager dm) { - // dm.getLogger().info("view message processed", new Exception()); - manager.processView(viewId, view); - } - - // These "messages" are never DataSerialized - - @Override - public int getDSFID() { - throw new UnsupportedOperationException(); - } - - @Override - public void toData(DataOutput out, - SerializationContext context) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public void fromData(DataInput in, - DeserializationContext context) throws IOException, ClassNotFoundException { - throw new UnsupportedOperationException(); - } -} diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSMembership.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSMembership.java index 8d59bf6..1a9eeb6 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSMembership.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSMembership.java @@ -30,6 +30,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -46,8 +47,6 @@ import org.apache.geode.SystemFailure; import org.apache.geode.annotations.internal.MakeNotStatic; import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.distributed.internal.StartupMessage; -import org.apache.geode.distributed.internal.membership.InternalDistributedMember; -import org.apache.geode.distributed.internal.membership.adapter.LocalViewMessage; import org.apache.geode.distributed.internal.membership.gms.api.LifecycleListener; import org.apache.geode.distributed.internal.membership.gms.api.MemberDisconnectedException; import org.apache.geode.distributed.internal.membership.gms.api.MemberIdentifier; @@ -98,6 +97,10 @@ public class GMSMembership<ID extends MemberIdentifier> implements Membership<ID private volatile boolean isCloseInProgress; + private ExecutorService viewExcecutor = LoggingExecutors.newSingleThreadExecutor( + "Geode View Installation thread ", + true); + /** * Trick class to make the startup synch more visible in stack traces * @@ -980,15 +983,9 @@ public class GMSMembership<ID extends MemberIdentifier> implements Membership<ID // view processing can take a while, so we use a separate thread // to avoid blocking a reader thread long newId = viewArg.getViewId(); - LocalViewMessage v = new LocalViewMessage((InternalDistributedMember) address, newId, - (MembershipView<InternalDistributedMember>) viewArg, - (GMSMembership<InternalDistributedMember>) GMSMembership.this); - - try { - messageListener.messageReceived((Message<ID>) v); - } catch (MemberShunnedException e) { - logger.error("View installation was blocked by a MemberShunnedException", e); - } + viewExcecutor.submit(() -> { + processView(newId, viewArg); + }); } finally { latestViewWriteLock.unlock(); } @@ -1920,6 +1917,8 @@ public class GMSMembership<ID extends MemberIdentifier> implements Membership<ID cleanupTimer.shutdown(); } + viewExcecutor.shutdown(); + if (logger.isDebugEnabled()) { logger.debug("Membership: channel closed"); } diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocator.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocator.java index c04ad18..64ed085 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocator.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocator.java @@ -50,13 +50,15 @@ import org.apache.geode.distributed.internal.membership.gms.interfaces.Locator; import org.apache.geode.distributed.internal.membership.gms.membership.HostAddress; import org.apache.geode.distributed.internal.membership.gms.messenger.GMSMemberWrapper; import org.apache.geode.distributed.internal.tcpserver.TcpClient; +import org.apache.geode.distributed.internal.tcpserver.TcpHandler; +import org.apache.geode.distributed.internal.tcpserver.TcpServer; import org.apache.geode.internal.serialization.ObjectDeserializer; import org.apache.geode.internal.serialization.ObjectSerializer; import org.apache.geode.internal.serialization.Version; import org.apache.geode.internal.serialization.VersionedDataInputStream; import org.apache.geode.logging.internal.log4j.api.LogService; -public class GMSLocator<ID extends MemberIdentifier> implements Locator<ID> { +public class GMSLocator<ID extends MemberIdentifier> implements Locator<ID>, TcpHandler { static final int LOCATOR_FILE_STAMP = 0x7b8cf741; @@ -205,7 +207,6 @@ public class GMSLocator<ID extends MemberIdentifier> implements Locator<ID> { if (logger.isDebugEnabled()) { logger.debug("Peer locator processing {}", request); } - if (localAddress == null && services != null) { localAddress = services.getMessenger().getMemberID(); } @@ -382,6 +383,11 @@ public class GMSLocator<ID extends MemberIdentifier> implements Locator<ID> { publicKeys.clear(); } + @Override + public void init(TcpServer tcpServer) { + // no-op. This is normally handled by the GMSLocatorAdapter wrapper in geode-core + } + @VisibleForTesting public List<ID> getMembers() { if (view != null) { diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/membership/MembershipDependenciesJUnitTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/membership/MembershipDependenciesJUnitTest.java index 21b78ab..2a268c8 100644 --- a/geode-core/src/test/java/org/apache/geode/distributed/internal/membership/MembershipDependenciesJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/distributed/internal/membership/MembershipDependenciesJUnitTest.java @@ -31,7 +31,6 @@ import org.junit.runner.RunWith; import org.apache.geode.alerting.internal.spi.AlertingAction; import org.apache.geode.distributed.Locator; import org.apache.geode.distributed.internal.LocatorStats; -import org.apache.geode.distributed.internal.membership.adapter.LocalViewMessage; import org.apache.geode.internal.OSProcess; import org.apache.geode.internal.security.SecurableCommunicationChannel; import org.apache.geode.internal.util.JavaWorkarounds; @@ -116,8 +115,6 @@ public class MembershipDependenciesJUnitTest { .or(type(OSProcess.class)) // TODO: - .or(type(AlertingAction.class)) + .or(type(AlertingAction.class))); - // TODO: - .or(type(LocalViewMessage.class))); } diff --git a/geode-serialization/src/main/java/org/apache/geode/internal/serialization/StaticSerialization.java b/geode-serialization/src/main/java/org/apache/geode/internal/serialization/StaticSerialization.java index 70e6210..23c3de5 100644 --- a/geode-serialization/src/main/java/org/apache/geode/internal/serialization/StaticSerialization.java +++ b/geode-serialization/src/main/java/org/apache/geode/internal/serialization/StaticSerialization.java @@ -399,6 +399,7 @@ public class StaticSerialization { byte typeCode = in.readByte(); if (typeCode == DSCODE.CLASS.toByte()) { String className = readString(in); + className = processIncomingClassName(className); return Class.forName(className); } else { return StaticSerialization.decodePrimitiveClass(typeCode); diff --git a/geode-tcp-server/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java b/geode-tcp-server/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java index 70548ee..63e4a7b 100755 --- a/geode-tcp-server/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java +++ b/geode-tcp-server/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java @@ -404,10 +404,7 @@ public class TcpServer { logger.debug("Locator received request " + request + " from " + socket.getInetAddress()); } if (request instanceof ShutdownRequest) { - shuttingDown = true; - // Don't call shutdown from within the worker thread, see java bug #6576792. - // Closing the socket will cause our acceptor thread to shutdown the executor - srv_sock.close(); + requestShutdown(); response = new ShutdownResponse(); } else if (request instanceof VersionRequest) { response = handleVersionRequest(request); @@ -435,6 +432,17 @@ public class TcpServer { } } + /** + * Request that this TcpServer shut down. Use join() or join(long) to + * wait for shutdown to complete. + */ + public void requestShutdown() throws IOException { + shuttingDown = true; + // Don't call shutdown from within the worker thread, see java bug #6576792. + // Closing the socket will cause our acceptor thread to shutdown the executor + srv_sock.close(); + } + private void rejectUnknownProtocolConnection(Socket socket, int gossipVersion) { try { socket.getOutputStream().write("unknown protocol version".getBytes());