This is an automated email from the ASF dual-hosted git repository. burcham pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new d294764 GEODE-7652: MembershipBuilder lets you set the local locator (#4614) d294764 is described below commit d2947640af8c6bf16a650348bf09a28d11d27997 Author: Bill Burcham <bburc...@pivotal.io> AuthorDate: Wed Jan 29 22:08:07 2020 -0800 GEODE-7652: MembershipBuilder lets you set the local locator (#4614) --- ...ReconnectWithClusterConfigurationDUnitTest.java | 1 - .../apache/geode/distributed/LocatorDUnitTest.java | 13 +--- .../internal/membership/MembershipJUnitTest.java | 48 +++++++------ .../membership/gms/MembershipOnlyTest.java | 17 ++--- .../locator/GMSLocatorRecoveryIntegrationTest.java | 5 +- .../internal/ClusterDistributionManager.java | 18 +++-- .../distributed/internal/DistributionImpl.java | 26 +++----- .../internal/InternalDistributedSystem.java | 78 ++++++++++++++++++---- .../distributed/internal/InternalLocator.java | 17 +++-- .../gms/locator/GMSLocatorIntegrationTest.java | 8 +-- .../internal/membership/api/LifecycleListener.java | 7 -- .../internal/membership/api/MembershipBuilder.java | 5 +- .../internal/membership/gms/GMSMembership.java | 1 - .../membership/gms/LifecycleListenerNoOp.java | 5 -- .../membership/gms/MembershipBuilderImpl.java | 16 ++++- .../internal/membership/gms/Services.java | 25 +++++-- .../membership/gms/locator/GMSLocator.java | 19 +++--- .../gms/locator/MembershipLocatorImpl.java | 17 ++++- 18 files changed, 201 insertions(+), 125 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache30/ReconnectWithClusterConfigurationDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache30/ReconnectWithClusterConfigurationDUnitTest.java index b51d7f2..6a7c262 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/cache30/ReconnectWithClusterConfigurationDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/cache30/ReconnectWithClusterConfigurationDUnitTest.java @@ -91,7 +91,6 @@ public class ReconnectWithClusterConfigurationDUnitTest implements Serializable locator = Locator.startLocatorAndDS(locatorPorts[locatorNumber], new File(""), props); system = locator.getDistributedSystem(); cache = ((InternalLocator) locator).getCache(); - ReconnectDUnitTest.savedSystem = locator.getDistributedSystem(); IgnoredException.addIgnoredException( "org.apache.geode.ForcedDisconnectException||Possible loss of quorum"); } catch (IOException e) { diff --git a/geode-core/src/distributedTest/java/org/apache/geode/distributed/LocatorDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/distributed/LocatorDUnitTest.java index 2e6bf6b..348fdad 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/distributed/LocatorDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/distributed/LocatorDUnitTest.java @@ -1357,17 +1357,8 @@ public class LocatorDUnitTest implements Serializable { public void testHostingMultipleLocators() throws Exception { Locator.startLocator(port1, null); - try { - Locator.startLocator(port2, null); - fail("expected second locator start to fail."); - } catch (IllegalStateException expected) { - } - - String locators = hostName + "[" + port1 + "]," + hostName + "[" + port2 + "]"; - - Properties props = getBasicProperties(locators); - - getConnectedDistributedSystem(props); + assertThatThrownBy(() -> Locator.startLocator(port2, null)) + .isInstanceOf(IllegalStateException.class); } /** diff --git a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/MembershipJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/MembershipJUnitTest.java index d70e603..3d21f69 100755 --- a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/MembershipJUnitTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/MembershipJUnitTest.java @@ -26,7 +26,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.isA; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -46,12 +45,10 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.apache.geode.distributed.ConfigurationProperties; -import org.apache.geode.distributed.Locator; import org.apache.geode.distributed.internal.ClusterDistributionManager; import org.apache.geode.distributed.internal.DMStats; import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.distributed.internal.DistributionConfigImpl; -import org.apache.geode.distributed.internal.DistributionImpl; import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.distributed.internal.InternalLocator; import org.apache.geode.distributed.internal.SerialAckedMessage; @@ -66,6 +63,7 @@ import org.apache.geode.distributed.internal.membership.api.Membership; import org.apache.geode.distributed.internal.membership.api.MembershipBuilder; import org.apache.geode.distributed.internal.membership.api.MembershipConfig; import org.apache.geode.distributed.internal.membership.api.MembershipListener; +import org.apache.geode.distributed.internal.membership.api.MembershipLocator; import org.apache.geode.distributed.internal.membership.api.MembershipView; import org.apache.geode.distributed.internal.membership.api.MessageListener; import org.apache.geode.distributed.internal.tcpserver.TcpClient; @@ -131,7 +129,7 @@ public class MembershipJUnitTest { throws Exception { Membership<InternalDistributedMember> m1 = null, m2 = null; - Locator l = null; + InternalLocator internalLocator = null; // int mcastPort = AvailablePortHelper.getRandomAvailableUDPPort(); try { @@ -142,8 +140,9 @@ public class MembershipJUnitTest { // this locator will hook itself up with the first Membership // to be created - l = InternalLocator.startLocator(port, new File(""), null, null, localHost, false, - new Properties(), null, temporaryFolder.getRoot().toPath()); + internalLocator = + InternalLocator.startLocator(port, new File(""), null, null, localHost, false, + new Properties(), null, temporaryFolder.getRoot().toPath()); // create configuration objects Properties nonDefault = new Properties(); @@ -159,11 +158,14 @@ public class MembershipJUnitTest { new RemoteTransportConfig(config, ClusterDistributionManager.LOCATOR_DM_TYPE); // start the first membership manager - m1 = createMembershipManager(config, transport).getLeft(); + final MembershipLocator<InternalDistributedMember> membershipLocator = + internalLocator.getMembershipLocator(); + + m1 = createMembershipManager(config, transport, membershipLocator).getLeft(); // start the second membership manager final Pair<Membership, MessageListener> pair = - createMembershipManager(config, transport); + createMembershipManager(config, transport, membershipLocator); m2 = pair.getLeft(); final MessageListener listener2 = pair.getRight(); @@ -241,15 +243,16 @@ public class MembershipJUnitTest { if (m1 != null) { m1.shutdown(); } - if (l != null) { - l.stop(); + if (internalLocator != null) { + internalLocator.stop(); } } } private Pair<Membership, MessageListener> createMembershipManager( final DistributionConfigImpl config, - final RemoteTransportConfig transport) throws MemberStartupException { + final RemoteTransportConfig transport, + final MembershipLocator<InternalDistributedMember> locator) throws MemberStartupException { final MembershipListener<InternalDistributedMember> listener = mock(MembershipListener.class); final MessageListener<InternalDistributedMember> messageListener = mock(MessageListener.class); final DMStats stats1 = mock(DMStats.class); @@ -290,6 +293,7 @@ public class MembershipJUnitTest { final Membership<InternalDistributedMember> m1 = MembershipBuilder.<InternalDistributedMember>newMembershipBuilder( socketCreator, locatorClient, serializer, memberIdentifierFactory) + .setMembershipLocator(locator) .setAuthenticator(authenticator) .setStatistics(stats1) .setMessageListener(messageListener) @@ -297,10 +301,6 @@ public class MembershipJUnitTest { .setConfig(new ServiceConfig(transport, config)) .setLifecycleListener(lifeCycleListener) .create(); - doAnswer(invocation -> { - DistributionImpl.connectLocatorToServices(m1); - return null; - }).when(lifeCycleListener).started(); m1.start(); m1.startEventProcessing(); return Pair.of(m1, messageListener); @@ -319,7 +319,7 @@ public class MembershipJUnitTest { public void testLocatorAndTwoServersJoinUsingDiffeHellman() throws Exception { Membership<InternalDistributedMember> m1 = null, m2 = null; - Locator l = null; + InternalLocator internalLocator = null; int mcastPort = AvailablePortHelper.getRandomAvailableUDPPort(); try { @@ -331,8 +331,9 @@ public class MembershipJUnitTest { p.setProperty(ConfigurationProperties.SECURITY_UDP_DHALGO, "AES:128"); // this locator will hook itself up with the first Membership // to be created - l = InternalLocator.startLocator(port, new File(""), null, null, localHost, false, p, null, - temporaryFolder.getRoot().toPath()); + internalLocator = + InternalLocator.startLocator(port, new File(""), null, null, localHost, false, p, null, + temporaryFolder.getRoot().toPath()); // create configuration objects Properties nonDefault = new Properties(); @@ -349,11 +350,14 @@ public class MembershipJUnitTest { new RemoteTransportConfig(config, ClusterDistributionManager.LOCATOR_DM_TYPE); // start the first membership manager - m1 = createMembershipManager(config, transport).getLeft(); + final MembershipLocator<InternalDistributedMember> membershipLocator = + internalLocator.getMembershipLocator(); + + m1 = createMembershipManager(config, transport, membershipLocator).getLeft(); // start the second membership manager final Pair<Membership, MessageListener> pair = - createMembershipManager(config, transport); + createMembershipManager(config, transport, membershipLocator); m2 = pair.getLeft(); final MessageListener listener2 = pair.getRight(); @@ -423,8 +427,8 @@ public class MembershipJUnitTest { if (m1 != null) { m1.disconnect(false); } - if (l != null) { - l.stop(); + if (internalLocator != null) { + internalLocator.stop(); } } } diff --git a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/MembershipOnlyTest.java b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/MembershipOnlyTest.java index c1ba5bd..9718e0b 100644 --- a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/MembershipOnlyTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/MembershipOnlyTest.java @@ -16,7 +16,6 @@ package org.apache.geode.distributed.internal.membership.gms; import static org.apache.geode.test.awaitility.GeodeAwaitility.await; import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import java.io.IOException; @@ -96,7 +95,6 @@ public class MembershipOnlyTest { assertThat(membership.getView().getMembers()).hasSize(1); } - @Test public void twoMembersCanConnect() throws MemberStartupException { Membership<MemberIdentifierImpl> member1 = startMember("member1", membershipLocator); @@ -137,20 +135,15 @@ public class MembershipOnlyTest { final Membership<MemberIdentifierImpl> membership = MembershipBuilder.<MemberIdentifierImpl>newMembershipBuilder( - socketCreator, locatorClient, dsfidSerializer, memberIdFactory) + socketCreator, + locatorClient, + dsfidSerializer, + memberIdFactory) + .setMembershipLocator(membershipLocator) .setConfig(config) .setLifecycleListener(lifeCycleListener) .create(); - // TODO - the membership *must* be installed in the locator at this special - // point during membership startup for the start to succeed - if (embeddedLocator != null) { - doAnswer(invocation -> { - embeddedLocator.setMembership(membership); - return null; - }).when(lifeCycleListener).started(); - } - membership.start(); membership.startEventProcessing(); return membership; diff --git a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocatorRecoveryIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocatorRecoveryIntegrationTest.java index dcc49d4..94a7e7e 100644 --- a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocatorRecoveryIntegrationTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocatorRecoveryIntegrationTest.java @@ -41,7 +41,6 @@ import org.junit.rules.TemporaryFolder; import org.junit.rules.TestName; import org.apache.geode.DataSerializer; -import org.apache.geode.distributed.Locator; import org.apache.geode.distributed.internal.ClusterDistributionManager; import org.apache.geode.distributed.internal.DMStats; import org.apache.geode.distributed.internal.Distribution; @@ -78,7 +77,7 @@ public class GMSLocatorRecoveryIntegrationTest { private File stateFile; private GMSLocator gmsLocator; - private Locator locator; + private InternalLocator locator; private DSFIDSerializer serializer; private Distribution distribution; @@ -193,7 +192,7 @@ public class GMSLocatorRecoveryIntegrationTest { distribution = new DistributionImpl(mockClusterDistributionManager, transport, mockSystem, mockListener, - mockMessageListener); + mockMessageListener, locator.getMembershipLocator()); distribution.start(); GMSLocator gmsLocator = new GMSLocator(localHost, diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java index 4d765ac..0ece626 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java @@ -67,6 +67,7 @@ import org.apache.geode.distributed.internal.membership.api.MemberDisconnectedEx import org.apache.geode.distributed.internal.membership.api.MemberIdentifier; import org.apache.geode.distributed.internal.membership.api.MemberIdentifierFactory; import org.apache.geode.distributed.internal.membership.api.Membership; +import org.apache.geode.distributed.internal.membership.api.MembershipLocator; import org.apache.geode.distributed.internal.membership.api.MembershipView; import org.apache.geode.distributed.internal.membership.api.Message; import org.apache.geode.internal.Assert; @@ -295,7 +296,8 @@ public class ClusterDistributionManager implements DistributionManager { * * @param system The distributed system to which this distribution manager will send messages. */ - static ClusterDistributionManager create(InternalDistributedSystem system) { + static ClusterDistributionManager create(InternalDistributedSystem system, + final MembershipLocator<InternalDistributedMember> membershipLocator) { ClusterDistributionManager distributionManager = null; boolean beforeJoined = true; @@ -322,7 +324,8 @@ public class ClusterDistributionManager implements DistributionManager { long start = System.currentTimeMillis(); distributionManager = - new ClusterDistributionManager(system, transport, system.getAlertingService()); + new ClusterDistributionManager(system, transport, system.getAlertingService(), + membershipLocator); distributionManager.assertDistributionManagerType(); beforeJoined = false; // we have now joined the system @@ -426,7 +429,8 @@ public class ClusterDistributionManager implements DistributionManager { * */ private ClusterDistributionManager(RemoteTransportConfig transport, - InternalDistributedSystem system, AlertingService alertingService) { + InternalDistributedSystem system, AlertingService alertingService, + MembershipLocator<InternalDistributedMember> locator) { this.system = system; this.transport = transport; @@ -459,7 +463,7 @@ public class ClusterDistributionManager implements DistributionManager { DMListener listener = new DMListener(this); distribution = DistributionImpl .createDistribution(this, transport, system, listener, - this::handleIncomingDMsg); + this::handleIncomingDMsg, locator); sb.append(System.currentTimeMillis() - start); @@ -488,8 +492,10 @@ public class ClusterDistributionManager implements DistributionManager { * @param system The distributed system to which this distribution manager will send messages. */ private ClusterDistributionManager(InternalDistributedSystem system, - RemoteTransportConfig transport, AlertingService alertingService) { - this(transport, system, alertingService); + RemoteTransportConfig transport, + AlertingService alertingService, + final MembershipLocator<InternalDistributedMember> membershipLocator) { + this(transport, system, alertingService, membershipLocator); boolean finishedConstructor = false; try { diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionImpl.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionImpl.java index 482fe9d..b81d2ac 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionImpl.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionImpl.java @@ -57,6 +57,7 @@ import org.apache.geode.distributed.internal.membership.api.MembershipBuilder; import org.apache.geode.distributed.internal.membership.api.MembershipClosedException; import org.apache.geode.distributed.internal.membership.api.MembershipConfigurationException; import org.apache.geode.distributed.internal.membership.api.MembershipListener; +import org.apache.geode.distributed.internal.membership.api.MembershipLocator; import org.apache.geode.distributed.internal.membership.api.MembershipStatistics; import org.apache.geode.distributed.internal.membership.api.MembershipView; import org.apache.geode.distributed.internal.membership.api.Message; @@ -109,9 +110,11 @@ public class DistributionImpl implements Distribution { public DistributionImpl(final ClusterDistributionManager clusterDistributionManager, - final RemoteTransportConfig transport, final InternalDistributedSystem system, + final RemoteTransportConfig transport, + final InternalDistributedSystem system, final MembershipListener<InternalDistributedMember> listener, - final MessageListener<InternalDistributedMember> messageListener) { + final MessageListener<InternalDistributedMember> messageListener, + final MembershipLocator<InternalDistributedMember> locator) { this.clusterDistributionManager = clusterDistributionManager; this.transportConfig = transport; this.tcpDisabled = transportConfig.isTcpDisabled(); @@ -137,6 +140,7 @@ public class DistributionImpl implements Distribution { locatorClient, InternalDataSerializer.getDSFIDSerializer(), new ClusterDistributionManager.ClusterDistributionManagerIDFactory()) + .setMembershipLocator(locator) .setAuthenticator( new GMSAuthenticator(system.getSecurityProperties(), system.getSecurityService(), system.getSecurityLogWriter(), system.getInternalLogWriter())) @@ -156,14 +160,6 @@ public class DistributionImpl implements Distribution { } } - public static void connectLocatorToServices(Membership<InternalDistributedMember> membership) { - // see if a locator was started and put it in GMS Services - InternalLocator l = (InternalLocator) Locator.getLocator(); - if (l != null && l.getMembershipLocator() != null) { - l.getMembershipLocator().setMembership(membership); - } - } - @Override public Membership<InternalDistributedMember> getMembership() { return membership; @@ -217,11 +213,12 @@ public class DistributionImpl implements Distribution { ClusterDistributionManager clusterDistributionManager, RemoteTransportConfig transport, InternalDistributedSystem system, MembershipListener<InternalDistributedMember> listener, - MessageListener<InternalDistributedMember> messageListener) { + MessageListener<InternalDistributedMember> messageListener, + final MembershipLocator<InternalDistributedMember> locator) { DistributionImpl distribution = new DistributionImpl(clusterDistributionManager, transport, system, listener, - messageListener); + messageListener, locator); distribution.start(); return distribution; } @@ -916,11 +913,6 @@ public class DistributionImpl implements Distribution { } @Override - public void started() { - connectLocatorToServices(distribution.getMembership()); - } - - @Override public void forcedDisconnect() { // stop server locators immediately since they may not have correct // information. This has caused client failures in bridge/wan diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java index fa53678..2ed25ae 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java @@ -77,6 +77,7 @@ import org.apache.geode.distributed.DurableClientAttributes; import org.apache.geode.distributed.internal.locks.GrantorRequestProcessor; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.distributed.internal.membership.api.MembershipInformation; +import org.apache.geode.distributed.internal.membership.api.MembershipLocator; import org.apache.geode.distributed.internal.membership.api.QuorumChecker; import org.apache.geode.internal.Assert; import org.apache.geode.internal.InternalDataSerializer; @@ -198,6 +199,23 @@ public class InternalDistributedSystem extends DistributedSystem private final AtomicReference<ClusterAlertMessaging> clusterAlertMessaging = new AtomicReference<>(); + // captured in initialize() when starting so that we can hand it to new instance when restarting + private MembershipLocator<InternalDistributedMember> membershipLocator; + + /** + * If the experimental multiple-system feature is enabled, always create a new system. + * + * <p> + * Otherwise, create a new InternalDistributedSystem with the given properties, or connect to an + * existing one with the same properties. + */ + public static InternalDistributedSystem connectInternal( + Properties config, + SecurityConfig securityConfig, + MetricsService.Builder metricsSessionBuilder) { + return connectInternal(config, securityConfig, metricsSessionBuilder, null); + } + /** * If the experimental multiple-system feature is enabled, always create a new system. * @@ -205,8 +223,11 @@ public class InternalDistributedSystem extends DistributedSystem * Otherwise, create a new InternalDistributedSystem with the given properties, or connect to an * existing one with the same properties. */ - public static InternalDistributedSystem connectInternal(Properties config, - SecurityConfig securityConfig, MetricsService.Builder metricsSessionBuilder) { + public static InternalDistributedSystem connectInternal( + Properties config, + SecurityConfig securityConfig, + MetricsService.Builder metricsSessionBuilder, + final MembershipLocator<InternalDistributedMember> locator) { if (config == null) { config = new Properties(); } @@ -214,6 +235,7 @@ public class InternalDistributedSystem extends DistributedSystem if (Boolean.getBoolean(ALLOW_MULTIPLE_SYSTEMS_PROPERTY)) { return new Builder(config, metricsSessionBuilder) .setSecurityConfig(securityConfig) + .setLocator(locator) .build(); } @@ -264,6 +286,7 @@ public class InternalDistributedSystem extends DistributedSystem // Make a new connection to the distributed system InternalDistributedSystem newSystem = new Builder(config, metricsSessionBuilder) .setSecurityConfig(securityConfig) + .setLocator(locator) .build(); addSystem(newSystem); return newSystem; @@ -651,7 +674,9 @@ public class InternalDistributedSystem extends DistributedSystem * Initializes this connection to a distributed system with the current configuration state. */ private void initialize(SecurityManager securityManager, PostProcessor postProcessor, - MetricsService.Builder metricsServiceBuilder) { + MetricsService.Builder metricsServiceBuilder, + final MembershipLocator<InternalDistributedMember> membershipLocatorArg) { + if (originalConfig.getLocators().equals("")) { if (originalConfig.getMcastPort() != 0) { throw new GemFireConfigException("The " + LOCATORS + " attribute can not be empty when the " @@ -740,7 +765,7 @@ public class InternalDistributedSystem extends DistributedSystem } try { - startInitLocator(); + startInitLocator(membershipLocatorArg); } catch (InterruptedException e) { throw new SystemConnectException("Startup has been interrupted", e); } @@ -751,12 +776,13 @@ public class InternalDistributedSystem extends DistributedSystem if (!isLoner) { try { - dm = ClusterDistributionManager.create(this); + dm = ClusterDistributionManager.create(this, membershipLocator); // fix bug #46324 if (InternalLocator.hasLocator()) { - InternalLocator locator = InternalLocator.getLocator(); + InternalLocator internalLocator = InternalLocator.getLocator(); getDistributionManager().addHostedLocators(getDistributedMember(), - InternalLocator.getLocatorStrings(), locator.isSharedConfigurationEnabled()); + InternalLocator.getLocatorStrings(), + internalLocator.isSharedConfigurationEnabled()); } } finally { if (dm == null && quorumChecker != null) { @@ -780,7 +806,7 @@ public class InternalDistributedSystem extends DistributedSystem } if (attemptingToReconnect && (startedLocator == null)) { try { - startInitLocator(); + startInitLocator(membershipLocatorArg); } catch (InterruptedException e) { throw new SystemConnectException("Startup has been interrupted", e); } @@ -853,11 +879,25 @@ public class InternalDistributedSystem extends DistributedSystem } /** + * Starts a locator in this JVM iff the distribution config wants one started. + * + * @return the membershipLocatorArg if the distribution config has no locator specified; + * otherwise starts a new InternalLocator and returns its associated MembershipLocator + * * @since GemFire 5.7 + * @param membershipLocatorArg on initial startup, a MembershipLocator provided explicitly by + * a caller, or null; on restart, the old MembershipLocator (from the previous instance of + * InternalDistributedSystem.) */ - private void startInitLocator() throws InterruptedException { - String locatorString = originalConfig.getStartLocator(); - if (locatorString.length() == 0) { + private void startInitLocator( + final MembershipLocator<InternalDistributedMember> membershipLocatorArg) + throws InterruptedException { + + final String locatorString = originalConfig.getStartLocator(); + final boolean shouldStartLocator = locatorString.length() > 0; + + if (!shouldStartLocator) { + membershipLocator = membershipLocatorArg; return; } @@ -886,10 +926,13 @@ public class InternalDistributedSystem extends DistributedSystem boolean startedPeerLocation = false; try { startedLocator.startPeerLocation(); + membershipLocator = startedLocator.getMembershipLocator(); startedPeerLocation = true; } finally { if (!startedPeerLocation) { startedLocator.stop(); + startedLocator = null; + membershipLocator = null; } } } catch (IOException e) { @@ -2568,7 +2611,8 @@ public class InternalDistributedSystem extends DistributedSystem try { - newDS = connectInternal(configProps, null, metricsService.getRebuilder()); + newDS = connectInternal(configProps, null, metricsService.getRebuilder(), + membershipLocator); } catch (CancelException e) { if (isReconnectCancelled()) { @@ -2961,6 +3005,8 @@ public class InternalDistributedSystem extends DistributedSystem private SecurityConfig securityConfig; private MetricsService.Builder metricsServiceBuilder; + private MembershipLocator<InternalDistributedMember> locator; + public Builder(Properties configProperties, MetricsService.Builder metricsServiceBuilder) { this.configProperties = configProperties; this.metricsServiceBuilder = metricsServiceBuilder; @@ -2971,6 +3017,12 @@ public class InternalDistributedSystem extends DistributedSystem return this; } + public Builder setLocator( + final MembershipLocator<InternalDistributedMember> locator) { + this.locator = locator; + return this; + } + /** * Builds and initializes new instance of InternalDistributedSystem. */ @@ -2989,7 +3041,7 @@ public class InternalDistributedSystem extends DistributedSystem FunctionStatsManager::new); newSystem .initialize(securityConfig.getSecurityManager(), securityConfig.getPostProcessor(), - metricsServiceBuilder); + metricsServiceBuilder, locator); notifyConnectListeners(newSystem); stopThreads = false; return newSystem; diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java index 8466dfb..b7475ad 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java @@ -110,6 +110,7 @@ import org.apache.geode.management.internal.configuration.handlers.SharedConfigu import org.apache.geode.management.internal.configuration.messages.ClusterManagementServiceInfoRequest; import org.apache.geode.management.internal.configuration.messages.SharedConfigurationStatusRequest; import org.apache.geode.management.internal.configuration.messages.SharedConfigurationStatusResponse; +import org.apache.geode.metrics.internal.InternalDistributedSystemMetricsService; import org.apache.geode.security.AuthTokenEnabledComponents; /** @@ -561,9 +562,9 @@ public class InternalLocator extends Locator implements ConnectListener, LogConf membershipLocator.addHandler(InfoRequest.class, new InfoRequestHandler()); restartHandlers.add((ds, cache, sharedConfig) -> { - InternalDistributedSystem ids = (InternalDistributedSystem) ds; - Distribution distribution = ids.getDM().getDistribution(); - membershipLocator.setMembership(distribution.getMembership()); + final InternalDistributedSystem ids = (InternalDistributedSystem) ds; + // let old locator know about new membership object + membershipLocator.setMembership(ids.getDM().getDistribution().getMembership()); }); } @@ -658,7 +659,7 @@ public class InternalLocator extends Locator implements ConnectListener, LogConf /** * @return the TcpHandler for peer to peer discovery */ - public MembershipLocator getMembershipLocator() { + public MembershipLocator<InternalDistributedMember> getMembershipLocator() { return membershipLocator; } @@ -733,13 +734,17 @@ public class InternalLocator extends Locator implements ConnectListener, LogConf logger.info("Starting distributed system"); internalDistributedSystem = - (InternalDistributedSystem) DistributedSystem.connect(distributedSystemProperties); + InternalDistributedSystem + .connectInternal(distributedSystemProperties, null, + new InternalDistributedSystemMetricsService.Builder(), + membershipLocator); if (peerLocator) { // We've created a peer location message handler - it needs to be connected to // the membership service in order to get membership view notifications membershipLocator - .setMembership(internalDistributedSystem.getDM().getDistribution().getMembership()); + .setMembership(internalDistributedSystem.getDM() + .getDistribution().getMembership()); } internalDistributedSystem.addDisconnectListener(sys -> stop(false, false, false)); diff --git a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocatorIntegrationTest.java b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocatorIntegrationTest.java index cab82dc..7086f34 100644 --- a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocatorIntegrationTest.java +++ b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocatorIntegrationTest.java @@ -30,7 +30,7 @@ import org.apache.geode.distributed.internal.membership.api.MemberDataBuilder; import org.apache.geode.distributed.internal.membership.api.MemberIdentifier; import org.apache.geode.distributed.internal.membership.api.MemberIdentifierFactoryImpl; import org.apache.geode.distributed.internal.membership.api.MembershipConfigurationException; -import org.apache.geode.distributed.internal.membership.gms.GMSMembership; +import org.apache.geode.distributed.internal.membership.api.MembershipLocator; import org.apache.geode.distributed.internal.membership.gms.GMSMembershipView; import org.apache.geode.distributed.internal.membership.gms.MembershipLocatorStatisticsNoOp; import org.apache.geode.distributed.internal.membership.gms.Services; @@ -85,9 +85,9 @@ public class GMSLocatorIntegrationTest { services.getSerializer().getObjectDeserializer()), services.getSerializer().getObjectSerializer(), services.getSerializer().getObjectDeserializer()); - GMSMembership membership = mock(GMSMembership.class); - when(membership.getServices()).thenReturn(services); - gmsLocator.setMembership(membership); + + final MembershipLocator membershipLocator = mock(MembershipLocator.class); + gmsLocator.setServices(services); } @Test diff --git a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/api/LifecycleListener.java b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/api/LifecycleListener.java index 32f5c49..68118ff 100644 --- a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/api/LifecycleListener.java +++ b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/api/LifecycleListener.java @@ -33,13 +33,6 @@ public interface LifecycleListener<ID extends MemberIdentifier> { final ID memberID); /** - * Invoked when the Membership is starting. All membership services will have been - * initialized and had their "started" methods invoked but we will not yet have joined - * the cluster. - */ - void started(); - - /** * Invoked when the Membership has successfully joined the cluster. At this point the * membership address is stable. */ diff --git a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/api/MembershipBuilder.java b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/api/MembershipBuilder.java index d3f491a..220a0d0 100644 --- a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/api/MembershipBuilder.java +++ b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/api/MembershipBuilder.java @@ -32,6 +32,8 @@ public interface MembershipBuilder<ID extends MemberIdentifier> { MembershipBuilder<ID> setMembershipListener(MembershipListener<ID> membershipListener); + MembershipBuilder<ID> setMembershipLocator(MembershipLocator<ID> membershipLocator); + MembershipBuilder<ID> setMessageListener(MessageListener<ID> messageListener); MembershipBuilder<ID> setConfig(MembershipConfig membershipConfig); @@ -45,6 +47,7 @@ public interface MembershipBuilder<ID extends MemberIdentifier> { final TcpClient locatorClient, final DSFIDSerializer serializer, final MemberIdentifierFactory<ID> memberFactory) { - return new MembershipBuilderImpl<>(socketCreator, locatorClient, serializer, memberFactory); + return new MembershipBuilderImpl<>( + socketCreator, locatorClient, serializer, memberFactory); } } diff --git a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSMembership.java b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSMembership.java index ccef4ef..4406939 100644 --- a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSMembership.java +++ b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSMembership.java @@ -1890,7 +1890,6 @@ public class GMSMembership<ID extends MemberIdentifier> implements Membership<ID @Override public void started() throws MemberStartupException { startCleanupTimer(); - lifecycleListener.started(); } /* Service interface */ diff --git a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/LifecycleListenerNoOp.java b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/LifecycleListenerNoOp.java index 3924467..5f3f8f0 100644 --- a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/LifecycleListenerNoOp.java +++ b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/LifecycleListenerNoOp.java @@ -43,11 +43,6 @@ public class LifecycleListenerNoOp<ID extends MemberIdentifier> implements Lifec } @Override - public void started() { - - } - - @Override public void forcedDisconnect() { } diff --git a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/MembershipBuilderImpl.java b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/MembershipBuilderImpl.java index f6a46db..742c046 100644 --- a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/MembershipBuilderImpl.java +++ b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/MembershipBuilderImpl.java @@ -24,8 +24,10 @@ import org.apache.geode.distributed.internal.membership.api.MembershipBuilder; import org.apache.geode.distributed.internal.membership.api.MembershipConfig; import org.apache.geode.distributed.internal.membership.api.MembershipConfigurationException; import org.apache.geode.distributed.internal.membership.api.MembershipListener; +import org.apache.geode.distributed.internal.membership.api.MembershipLocator; import org.apache.geode.distributed.internal.membership.api.MembershipStatistics; import org.apache.geode.distributed.internal.membership.api.MessageListener; +import org.apache.geode.distributed.internal.membership.gms.locator.MembershipLocatorImpl; import org.apache.geode.distributed.internal.tcpserver.TcpClient; import org.apache.geode.distributed.internal.tcpserver.TcpSocketCreator; import org.apache.geode.internal.serialization.DSFIDSerializer; @@ -47,6 +49,8 @@ public class MembershipBuilderImpl<ID extends MemberIdentifier> implements Membe private final MemberIdentifierFactory<ID> memberFactory; private LifecycleListener<ID> lifecycleListener = new LifecycleListenerNoOp(); + private MembershipLocatorImpl<ID> membershipLocator; + public MembershipBuilderImpl( final TcpSocketCreator socketCreator, final TcpClient locatorClient, @@ -77,6 +81,13 @@ public class MembershipBuilderImpl<ID extends MemberIdentifier> implements Membe } @Override + public MembershipBuilder<ID> setMembershipLocator( + final MembershipLocator<ID> membershipLocator) { + this.membershipLocator = (MembershipLocatorImpl<ID>) membershipLocator; + return this; + } + + @Override public MembershipBuilder<ID> setMessageListener(MessageListener<ID> messageListener) { this.messageListener = messageListener; return this; @@ -99,9 +110,12 @@ public class MembershipBuilderImpl<ID extends MemberIdentifier> implements Membe public Membership<ID> create() throws MembershipConfigurationException { GMSMembership<ID> gmsMembership = new GMSMembership<>(membershipListener, messageListener, lifecycleListener); - Services<ID> services = + final Services<ID> services = new Services<>(gmsMembership.getGMSManager(), statistics, authenticator, membershipConfig, serializer, memberFactory, locatorClient, socketCreator); + if (membershipLocator != null) { + services.setLocators(membershipLocator.getGMSLocator(), membershipLocator); + } services.init(); return gmsMembership; } diff --git a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/Services.java b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/Services.java index a994e42..334dc9c 100644 --- a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/Services.java +++ b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/Services.java @@ -46,6 +46,7 @@ import org.apache.geode.distributed.internal.membership.api.MemberStartupExcepti import org.apache.geode.distributed.internal.membership.api.MembershipClosedException; import org.apache.geode.distributed.internal.membership.api.MembershipConfig; import org.apache.geode.distributed.internal.membership.api.MembershipConfigurationException; +import org.apache.geode.distributed.internal.membership.api.MembershipLocator; import org.apache.geode.distributed.internal.membership.api.MembershipStatistics; import org.apache.geode.distributed.internal.membership.gms.fd.GMSHealthMonitor; import org.apache.geode.distributed.internal.membership.gms.interfaces.HealthMonitor; @@ -57,6 +58,7 @@ import org.apache.geode.distributed.internal.membership.gms.locator.FindCoordina import org.apache.geode.distributed.internal.membership.gms.locator.FindCoordinatorResponse; import org.apache.geode.distributed.internal.membership.gms.locator.GetViewRequest; import org.apache.geode.distributed.internal.membership.gms.locator.GetViewResponse; +import org.apache.geode.distributed.internal.membership.gms.locator.MembershipLocatorImpl; import org.apache.geode.distributed.internal.membership.gms.membership.GMSJoinLeave; import org.apache.geode.distributed.internal.membership.gms.messages.FinalCheckPassedMessage; import org.apache.geode.distributed.internal.membership.gms.messages.HeartbeatMessage; @@ -103,6 +105,7 @@ public class Services<ID extends MemberIdentifier> { private volatile Exception shutdownCause; private Locator<ID> locator; + private MembershipLocator<ID> membershipLocator; private final Timer timer = new Timer("Geode Membership Timer", true); @@ -209,6 +212,18 @@ public class Services<ID extends MemberIdentifier> { this.joinLeave.started(); this.healthMon.started(); this.manager.started(); + + if (membershipLocator != null) { + /* + * Now that all the services have started we can let the membership locator know + * about them. We must do this before telling the manager to joinDistributedSystem() + * later in this method + */ + final MembershipLocatorImpl locatorImpl = + (MembershipLocatorImpl) this.membershipLocator; + locatorImpl.setServices(this); + } + logger.debug("All membership services have been started"); started = true; } catch (RuntimeException e) { @@ -330,12 +345,14 @@ public class Services<ID extends MemberIdentifier> { return this.manager; } - public Locator<ID> getLocator() { - return this.locator; + public void setLocators(final Locator<ID> locator, + final MembershipLocator<ID> membershipLocator) { + this.locator = locator; + this.membershipLocator = membershipLocator; } - public void setLocator(Locator<ID> locator) { - this.locator = locator; + public Locator<ID> getLocator() { + return locator; } public JoinLeave<ID> getJoinLeave() { diff --git a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocator.java b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocator.java index f295362..815b28e 100644 --- a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocator.java +++ b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/locator/GMSLocator.java @@ -39,10 +39,8 @@ import org.apache.logging.log4j.Logger; import org.apache.geode.annotations.VisibleForTesting; import org.apache.geode.distributed.internal.membership.api.MemberIdentifier; -import org.apache.geode.distributed.internal.membership.api.Membership; import org.apache.geode.distributed.internal.membership.api.MembershipConfigurationException; import org.apache.geode.distributed.internal.membership.api.MembershipLocatorStatistics; -import org.apache.geode.distributed.internal.membership.gms.GMSMembership; import org.apache.geode.distributed.internal.membership.gms.GMSMembershipView; import org.apache.geode.distributed.internal.membership.gms.GMSUtil; import org.apache.geode.distributed.internal.membership.gms.Services; @@ -132,15 +130,20 @@ public class GMSLocator<ID extends MemberIdentifier> implements Locator<ID>, Tcp this.objectDeserializer = objectDeserializer; } - public synchronized boolean setMembership(Membership<ID> membership) { - if (services == null || services.isStopped()) { - services = ((GMSMembership<ID>) membership).getServices(); - localAddress = services.getMessenger().getMemberID(); + /** + * Called initially and after each auto-reconnect. See restart handlers in InternalLocator + * up in geode-core. Services must be started before this call. + * + */ + public synchronized boolean setServices( + final Services<ID> services) { + if (this.services == null || this.services.isStopped()) { + this.services = services; + localAddress = this.services.getMessenger().getMemberID(); Objects.requireNonNull(localAddress, "member address should have been established"); logger.info("Peer locator is connecting to local membership services with ID {}", localAddress); - services.setLocator(this); - GMSMembershipView<ID> newView = services.getJoinLeave().getView(); + GMSMembershipView<ID> newView = this.services.getJoinLeave().getView(); if (newView != null) { view = newView; recoveredView = null; diff --git a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/locator/MembershipLocatorImpl.java b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/locator/MembershipLocatorImpl.java index 7a0a2d3..9009d66 100644 --- a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/locator/MembershipLocatorImpl.java +++ b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/locator/MembershipLocatorImpl.java @@ -34,6 +34,8 @@ import org.apache.geode.distributed.internal.membership.api.MembershipConfig; import org.apache.geode.distributed.internal.membership.api.MembershipConfigurationException; import org.apache.geode.distributed.internal.membership.api.MembershipLocator; import org.apache.geode.distributed.internal.membership.api.MembershipLocatorStatistics; +import org.apache.geode.distributed.internal.membership.gms.GMSMembership; +import org.apache.geode.distributed.internal.membership.gms.Services; import org.apache.geode.distributed.internal.tcpserver.ProtocolChecker; import org.apache.geode.distributed.internal.tcpserver.TcpClient; import org.apache.geode.distributed.internal.tcpserver.TcpHandler; @@ -141,8 +143,9 @@ public class MembershipLocatorImpl<ID extends MemberIdentifier> implements Membe } @Override - public void setMembership(Membership<ID> membership) { - gmsLocator.setMembership(membership); + public void setMembership(final Membership<ID> membership) { + final GMSMembership<ID> gmsMembership = (GMSMembership<ID>) membership; + setServices(gmsMembership.getServices()); } @Override @@ -156,10 +159,18 @@ public class MembershipLocatorImpl<ID extends MemberIdentifier> implements Membe } @VisibleForTesting - public GMSLocator getGMSLocator() { + public GMSLocator<ID> getGMSLocator() { return this.gmsLocator; } + /** + * Services is a class internal to the membership module. As such, the ability to setServices + * is available ony within the module. It's not part of the external API. + */ + public void setServices(final Services<ID> services) { + gmsLocator.setServices(services); + } + public void stop() { if (isAlive()) { logger.info("Stopping {}", this);