refactor reconnecting snitches patch by jasobrown; reviewed by jbellis for CASSANDRA-5681
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e75e33fa Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e75e33fa Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e75e33fa Branch: refs/heads/cassandra-1.2 Commit: e75e33fa6dc5e2a3fe061d747cc98679a65ef960 Parents: 18f3a79 Author: Jonathan Ellis <jbel...@apache.org> Authored: Fri Jun 21 10:40:31 2013 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Fri Jun 21 10:40:31 2013 -0500 ---------------------------------------------------------------------- .../cassandra/locator/Ec2MultiRegionSnitch.java | 71 +--------------- .../locator/GossipingPropertyFileSnitch.java | 63 +------------- .../locator/ReconnectableSnitchHelper.java | 88 ++++++++++++++++++++ 3 files changed, 94 insertions(+), 128 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e75e33fa/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java b/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java index 9317941..bd5e091 100644 --- a/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java +++ b/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java @@ -19,16 +19,11 @@ package org.apache.cassandra.locator; import java.io.IOException; import java.net.InetAddress; -import java.net.UnknownHostException; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.gms.ApplicationState; -import org.apache.cassandra.gms.EndpointState; import org.apache.cassandra.gms.Gossiper; -import org.apache.cassandra.gms.IEndpointStateChangeSubscriber; -import org.apache.cassandra.gms.VersionedValue; -import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageService; /** @@ -36,16 +31,13 @@ import org.apache.cassandra.service.StorageService; * * 2) Snitch will set the private IP as a Gossip application state. * - * 3) Snitch implements IESCS and will reset the connection if it is within the + * 3) Uses a helper class that implements IESCS and will reset the public IP connection if it is within the * same region to communicate via private IP. * - * Implements Ec2Snitch to inherit its functionality and extend it for - * Multi-Region. - * * Operational: All the nodes in this cluster needs to be able to (modify the * Security group settings in AWS) communicate via Public IP's. */ -public class Ec2MultiRegionSnitch extends Ec2Snitch implements IEndpointStateChangeSubscriber +public class Ec2MultiRegionSnitch extends Ec2Snitch { private static final String PUBLIC_IP_QUERY_URL = "http://169.254.169.254/latest/meta-data/public-ipv4"; private static final String PRIVATE_IP_QUERY_URL = "http://169.254.169.254/latest/meta-data/local-ipv4"; @@ -62,67 +54,10 @@ public class Ec2MultiRegionSnitch extends Ec2Snitch implements IEndpointStateCha DatabaseDescriptor.setBroadcastAddress(localPublicAddress); } - public void onJoin(InetAddress endpoint, EndpointState epState) - { - if (epState.getApplicationState(ApplicationState.INTERNAL_IP) != null) - reconnect(endpoint, epState.getApplicationState(ApplicationState.INTERNAL_IP)); - } - - public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) - { - if (state == ApplicationState.INTERNAL_IP) - reconnect(endpoint, value); - } - - public void onAlive(InetAddress endpoint, EndpointState state) - { - if (state.getApplicationState(ApplicationState.INTERNAL_IP) != null) - reconnect(endpoint, state.getApplicationState(ApplicationState.INTERNAL_IP)); - } - - public void onDead(InetAddress endpoint, EndpointState state) - { - // do nothing - } - - public void onRestart(InetAddress endpoint, EndpointState state) - { - // do nothing - } - - public void onRemove(InetAddress endpoint) - { - // do nothing. - } - - private void reconnect(InetAddress publicAddress, VersionedValue localAddressValue) - { - try - { - reconnect(publicAddress, InetAddress.getByName(localAddressValue.value)); - } - catch (UnknownHostException e) - { - logger.error("Error in getting the IP address resolved: ", e); - } - } - - private void reconnect(InetAddress publicAddress, InetAddress localAddress) - { - if (getDatacenter(publicAddress).equals(getDatacenter(localPublicAddress)) - && MessagingService.instance().getVersion(publicAddress) == MessagingService.current_version - && !MessagingService.instance().getConnectionPool(publicAddress).endPoint().equals(localAddress)) - { - MessagingService.instance().getConnectionPool(publicAddress).reset(localAddress); - logger.debug(String.format("Intiated reconnect to an Internal IP %s for the %s", localAddress, publicAddress)); - } - } - - @Override public void gossiperStarting() { super.gossiperStarting(); Gossiper.instance.addLocalApplicationState(ApplicationState.INTERNAL_IP, StorageService.instance.valueFactory.internalIP(localPrivateAddress)); - Gossiper.instance.register(this); + Gossiper.instance.register(new ReconnectableSnitchHelper(this, ec2region, true)); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e75e33fa/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java b/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java index 071cd09..e00239e 100644 --- a/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java +++ b/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java @@ -19,7 +19,6 @@ package org.apache.cassandra.locator; import java.net.InetAddress; -import java.net.UnknownHostException; import java.util.Map; import org.apache.cassandra.db.SystemTable; @@ -30,14 +29,11 @@ import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.EndpointState; import org.apache.cassandra.gms.Gossiper; -import org.apache.cassandra.gms.IEndpointStateChangeSubscriber; -import org.apache.cassandra.gms.VersionedValue; -import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.service.StorageService; -public class GossipingPropertyFileSnitch extends AbstractNetworkTopologySnitch implements IEndpointStateChangeSubscriber +public class GossipingPropertyFileSnitch extends AbstractNetworkTopologySnitch// implements IEndpointStateChangeSubscriber { private static final Logger logger = LoggerFactory.getLogger(GossipingPropertyFileSnitch.class); @@ -47,7 +43,7 @@ public class GossipingPropertyFileSnitch extends AbstractNetworkTopologySnitch i private Map<InetAddress, Map<String, String>> savedEndpoints; private String DEFAULT_DC = "UNKNOWN_DC"; private String DEFAULT_RACK = "UNKNOWN_RACK"; - private boolean preferLocal; + private final boolean preferLocal; public GossipingPropertyFileSnitch() throws ConfigurationException { @@ -126,64 +122,11 @@ public class GossipingPropertyFileSnitch extends AbstractNetworkTopologySnitch i return epState.getApplicationState(ApplicationState.RACK).value; } - // IEndpointStateChangeSubscriber methods - - public void onJoin(InetAddress endpoint, EndpointState epState) - { - if (preferLocal && epState.getApplicationState(ApplicationState.INTERNAL_IP) != null) - reConnect(endpoint, epState.getApplicationState(ApplicationState.INTERNAL_IP)); - } - - public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) - { - if (preferLocal && state == ApplicationState.INTERNAL_IP) - reConnect(endpoint, value); - } - - public void onAlive(InetAddress endpoint, EndpointState state) - { - if (preferLocal && state.getApplicationState(ApplicationState.INTERNAL_IP) != null) - reConnect(endpoint, state.getApplicationState(ApplicationState.INTERNAL_IP)); - } - - public void onDead(InetAddress endpoint, EndpointState state) - { - // do nothing - } - - public void onRestart(InetAddress endpoint, EndpointState state) - { - // do nothing - } - - public void onRemove(InetAddress endpoint) - { - // do nothing. - } - - private void reConnect(InetAddress endpoint, VersionedValue versionedValue) - { - if (!getDatacenter(endpoint).equals(myDC)) - return; // do nothing return back... - - try - { - InetAddress remoteIP = InetAddress.getByName(versionedValue.value); - MessagingService.instance().getConnectionPool(endpoint).reset(remoteIP); - logger.debug(String.format("Intiated reconnect to an Internal IP %s for the endpoint %s", remoteIP, endpoint)); - } - catch (UnknownHostException e) - { - logger.error("Error in getting the IP address resolved", e); - } - } - - @Override public void gossiperStarting() { super.gossiperStarting(); Gossiper.instance.addLocalApplicationState(ApplicationState.INTERNAL_IP, StorageService.instance.valueFactory.internalIP(FBUtilities.getLocalAddress().getHostAddress())); - Gossiper.instance.register(this); + Gossiper.instance.register(new ReconnectableSnitchHelper(this, myDC, preferLocal)); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e75e33fa/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java b/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java new file mode 100644 index 0000000..adec953 --- /dev/null +++ b/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java @@ -0,0 +1,88 @@ +package org.apache.cassandra.locator; + +import java.net.InetAddress; +import java.net.UnknownHostException; + +import org.apache.cassandra.gms.ApplicationState; +import org.apache.cassandra.gms.EndpointState; +import org.apache.cassandra.gms.IEndpointStateChangeSubscriber; +import org.apache.cassandra.gms.VersionedValue; +import org.apache.cassandra.net.MessagingService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Sidekick helper for snitches that want to reconnect from one IP addr for a node to another. + * Typically, this is for situations like EC2 where a node will have a public address and a private address, + * where we connect on the public, discover the private, and reconnect on the private. + */ +public class ReconnectableSnitchHelper implements IEndpointStateChangeSubscriber +{ + private static final Logger logger = LoggerFactory.getLogger(ReconnectableSnitchHelper.class); + private final IEndpointSnitch snitch; + private final String localDc; + private final boolean preferLocal; + + public ReconnectableSnitchHelper(IEndpointSnitch snitch, String localDc, boolean preferLocal) + { + this.snitch = snitch; + this.localDc = localDc; + this.preferLocal = preferLocal; + } + + private void reconnect(InetAddress publicAddress, VersionedValue localAddressValue) + { + try + { + reconnect(publicAddress, InetAddress.getByName(localAddressValue.value)); + } + catch (UnknownHostException e) + { + logger.error("Error in getting the IP address resolved: ", e); + } + } + + private void reconnect(InetAddress publicAddress, InetAddress localAddress) + { + if (snitch.getDatacenter(publicAddress).equals(localDc) + && MessagingService.instance().getVersion(publicAddress) == MessagingService.current_version + && !MessagingService.instance().getConnectionPool(publicAddress).endPoint().equals(localAddress)) + { + MessagingService.instance().getConnectionPool(publicAddress).reset(localAddress); + logger.debug(String.format("Intiated reconnect to an Internal IP %s for the %s", localAddress, publicAddress)); + } + } + + public void onJoin(InetAddress endpoint, EndpointState epState) + { + if (preferLocal && epState.getApplicationState(ApplicationState.INTERNAL_IP) != null) + reconnect(endpoint, epState.getApplicationState(ApplicationState.INTERNAL_IP)); + } + + public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) + { + if (preferLocal && state == ApplicationState.INTERNAL_IP) + reconnect(endpoint, value); + } + + public void onAlive(InetAddress endpoint, EndpointState state) + { + if (preferLocal && state.getApplicationState(ApplicationState.INTERNAL_IP) != null) + reconnect(endpoint, state.getApplicationState(ApplicationState.INTERNAL_IP)); + } + + public void onDead(InetAddress endpoint, EndpointState state) + { + // do nothing. + } + + public void onRemove(InetAddress endpoint) + { + // do nothing. + } + + public void onRestart(InetAddress endpoint, EndpointState state) + { + // do nothing. + } +}