http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/STATS.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/STATS.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/STATS.java deleted file mode 100644 index 35dccca..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/STATS.java +++ /dev/null @@ -1,211 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -package com.gemstone.org.jgroups.protocols; - - -import com.gemstone.org.jgroups.Address; -import com.gemstone.org.jgroups.Event; -import com.gemstone.org.jgroups.Message; -import com.gemstone.org.jgroups.View; -import com.gemstone.org.jgroups.stack.Protocol; -import com.gemstone.org.jgroups.util.ExternalStrings; - -import java.util.*; - -/** - * Provides various stats - * @author Bela Ban - * @version $Id: STATS.java,v 1.2 2005/06/07 10:17:27 belaban Exp $ - */ -public class STATS extends Protocol { - long sent_msgs, sent_bytes, sent_ucasts, sent_mcasts, received_ucasts, received_mcasts; - long received_msgs, received_bytes, sent_ucast_bytes, sent_mcast_bytes, received_ucast_bytes, received_mcast_bytes; - - /** HashMap key=Address, value=Entry, maintains stats per target destination */ - HashMap sent=new HashMap(); - - /** HashMap key=Address, value=Entry, maintains stats per receiver */ - HashMap received=new HashMap(); - - static/*GemStoneAddition*/ final short UP=1; - static/*GemStoneAddition*/ final short DOWN=2; - - - @Override // GemStoneAddition - public String getName() { - return "STATS"; - } - - @Override // GemStoneAddition - public boolean setProperties(Properties props) { - super.setProperties(props); - down_thread=false; // never use a down thread - up_thread=false; // never use an up thread - - if(props.size() > 0) { - log.error(ExternalStrings.STATS_THE_FOLLOWING_PROPERTIES_ARE_NOT_RECOGNIZED__0, props); - return false; - } - return true; - } - - @Override // GemStoneAddition - public void resetStats() { - sent_msgs=sent_bytes=sent_ucasts=sent_mcasts=received_ucasts=received_mcasts=0; - received_msgs=received_bytes=sent_ucast_bytes=sent_mcast_bytes=received_ucast_bytes=received_mcast_bytes=0; - sent.clear(); - received.clear(); - } - - - public long getSentMessages() {return sent_msgs;} - public long getSentBytes() {return sent_bytes;} - public long getSentUnicastMessages() {return sent_ucasts;} - public long getSentUnicastBytes() {return sent_ucast_bytes;} - public long getSentMcastMessages() {return sent_mcasts;} - public long getSentMcastBytes() {return sent_mcast_bytes;} - - public long getReceivedMessages() {return received_msgs;} - public long getReceivedBytes() {return received_bytes;} - public long getReceivedUnicastMessages() {return received_ucasts;} - public long getReceivedUnicastBytes() {return received_ucast_bytes;} - public long getReceivedMcastMessages() {return received_mcasts;} - public long getReceivedMcastBytes() {return received_mcast_bytes;} - - - @Override // GemStoneAddition - public void up(Event evt) { - if(evt.getType() == Event.MSG) { - Message msg=(Message)evt.getArg(); - updateStats(msg, UP); - } - else if(evt.getType() == Event.VIEW_CHANGE) { - handleViewChange((View)evt.getArg()); - } - passUp(evt); - } - - - - @Override // GemStoneAddition - public void down(Event evt) { - if(evt.getType() == Event.MSG) { - Message msg=(Message)evt.getArg(); - updateStats(msg, DOWN); - } - else if(evt.getType() == Event.VIEW_CHANGE) { - handleViewChange((View)evt.getArg()); - } - passDown(evt); - } - - - @Override // GemStoneAddition - public String printStats() { - Map.Entry entry; - Object key, val; - StringBuffer sb=new StringBuffer(); - sb.append("sent:\n"); - for(Iterator it=sent.entrySet().iterator(); it.hasNext();) { - entry=(Map.Entry)it.next(); - key=entry.getKey(); - if(key == null) key="<mcast dest>"; - val=entry.getValue(); - sb.append(key).append(": ").append(val).append("\n"); - } - sb.append("\nreceived:\n"); - for(Iterator it=received.entrySet().iterator(); it.hasNext();) { - entry=(Map.Entry)it.next(); - key=entry.getKey(); - val=entry.getValue(); - sb.append(key).append(": ").append(val).append("\n"); - } - - return sb.toString(); - } - - private void handleViewChange(View view) { - Vector members=view.getMembers(); - Set tmp=new LinkedHashSet(members); - tmp.add(null); // for null destination (= mcast) - sent.keySet().retainAll(tmp); - received.keySet().retainAll(tmp); - } - - private void updateStats(Message msg, short direction) { - int length; - HashMap map; - boolean mcast; - Address dest, src; - - if(msg == null) return; - length=msg.getLength(); - dest=msg.getDest(); - src=msg.getSrc(); - mcast=dest == null || dest.isMulticastAddress(); - - if(direction == UP) { // received - received_msgs++; - received_bytes+=length; - if(mcast) { - received_mcasts++; - received_mcast_bytes+=length; - } - else { - received_ucasts++; - received_ucast_bytes+=length; - } - } - else { // sent - sent_msgs++; - sent_bytes+=length; - if(mcast) { - sent_mcasts++; - sent_mcast_bytes+=length; - } - else { - sent_ucasts++; - sent_ucast_bytes+=length; - } - } - - Address key=direction == UP? src : dest; - map=direction == UP? received : sent; - Entry entry=(Entry)map.get(key); - if(entry == null) { - entry=new Entry(); - map.put(key, entry); - } - entry.msgs++; - entry.bytes+=length; - if(mcast) { - entry.mcasts++; - entry.mcast_bytes+=length; - } - else { - entry.ucasts++; - entry.ucast_bytes+=length; - } - } - - - - - static class Entry { - long msgs, bytes, ucasts, mcasts, ucast_bytes, mcast_bytes; - - @Override // GemStoneAddition - public String toString() { - StringBuffer sb=new StringBuffer(); - sb.append(msgs).append(" (").append(bytes).append(" bytes)"); - sb.append(": ").append(ucasts).append(" ucasts (").append(ucast_bytes).append(" bytes), "); - sb.append(mcasts).append(" mcasts (").append(mcast_bytes).append(" bytes)"); - return sb.toString(); - } - } - - - -}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCP.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCP.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCP.java deleted file mode 100644 index cf50d36..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCP.java +++ /dev/null @@ -1,308 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -// $Id: TCP.java,v 1.31 2005/09/29 12:24:37 belaban Exp $ - -package com.gemstone.org.jgroups.protocols; - - - -import com.gemstone.org.jgroups.Address; -import com.gemstone.org.jgroups.Event; -import com.gemstone.org.jgroups.Message; -import com.gemstone.org.jgroups.SuspectMember; -import com.gemstone.org.jgroups.blocks.ConnectionTable; -import com.gemstone.org.jgroups.stack.IpAddress; -import com.gemstone.org.jgroups.util.BoundedList; -import com.gemstone.org.jgroups.util.ExternalStrings; - -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.Properties; -import java.util.Vector; - - - - -/** - * TCP based protocol. Creates a server socket, which gives us the local address of this group member. For - * each accept() on the server socket, a new thread is created that listens on the socket. - * For each outgoing message m, if m.dest is in the ougoing hashtable, the associated socket will be reused - * to send message, otherwise a new socket is created and put in the hashtable. - * When a socket connection breaks or a member is removed from the group, the corresponding items in the - * incoming and outgoing hashtables will be removed as well.<br> - * This functionality is in ConnectionTable, which isT used by TCP. TCP sends messages using ct.send() and - * registers with the connection table to receive all incoming messages. - * @author Bela Ban - */ -public class TCP extends TP implements ConnectionTable.Receiver { - private ConnectionTable ct=null; - private InetAddress external_addr=null; // the IP address which is broadcast to other group members - private int start_port=7800; // find first available port starting at this port - private int end_port=0; // maximum port to bind to - private long reaper_interval=0; // time in msecs between connection reaps - private long conn_expire_time=0; // max time a conn can be idle before being reaped - - /** List the maintains the currently suspected members. This is used so we don't send too many SUSPECT - * events up the stack (one per message !) - */ - final BoundedList suspected_mbrs=new BoundedList(20); - - /** Should we drop unicast messages to suspected members or not */ - boolean skip_suspected_members=true; - - /** Use separate send queues for each connection */ - boolean use_send_queues=true; - - int recv_buf_size=150000; - int send_buf_size=150000; - int sock_conn_timeout=2000; // max time in millis for a socket creation in ConnectionTable - - - - public TCP() { - } - - @Override // GemStoneAddition - public String getName() { - return "TCP"; - } - - - public int getOpenConnections() {return ct.getNumConnections();} - public InetAddress getBindAddr() {return bind_addr;} - public void setBindAddr(InetAddress bind_addr) {this.bind_addr=bind_addr;} - public int getStartPort() {return start_port;} - public void setStartPort(int start_port) {this.start_port=start_port;} - public int getEndPort() {return end_port;} - public void setEndPort(int end_port) {this.end_port=end_port;} - public long getReaperInterval() {return reaper_interval;} - public void setReaperInterval(long reaper_interval) {this.reaper_interval=reaper_interval;} - public long getConnExpireTime() {return conn_expire_time;} - public void setConnExpireTime(long conn_expire_time) {this.conn_expire_time=conn_expire_time;} - @Override // GemStoneAddition - public boolean isLoopback() {return loopback;} - @Override // GemStoneAddition - public void setLoopback(boolean loopback) {this.loopback=loopback;} - - - public String printConnections() {return ct.toString();} - - - /** Setup the Protocol instance acording to the configuration string */ - @Override // GemStoneAddition - public boolean setProperties(Properties props) { - String str; - - super.setProperties(props); - str=props.getProperty("start_port"); - if(str != null) { - start_port=Integer.parseInt(str); - props.remove("start_port"); - } - - str=props.getProperty("end_port"); - if(str != null) { - end_port=Integer.parseInt(str); - props.remove("end_port"); - } - - str=props.getProperty("external_addr"); - if(str != null) { - try { - external_addr=InetAddress.getByName(str); - } - catch(UnknownHostException unknown) { - if(log.isFatalEnabled()) log.fatal("(external_addr): host " + str + " not known"); - return false; - } - props.remove("external_addr"); - } - - str=props.getProperty("reaper_interval"); - if(str != null) { - reaper_interval=Long.parseLong(str); - props.remove("reaper_interval"); - } - - str=props.getProperty("conn_expire_time"); - if(str != null) { - conn_expire_time=Long.parseLong(str); - props.remove("conn_expire_time"); - } - - str=props.getProperty("sock_conn_timeout"); - if(str != null) { - sock_conn_timeout=Integer.parseInt(str); - props.remove("sock_conn_timeout"); - } - - str=props.getProperty("recv_buf_size"); - if(str != null) { - recv_buf_size=Integer.parseInt(str); - props.remove("recv_buf_size"); - } - - str=props.getProperty("send_buf_size"); - if(str != null) { - send_buf_size=Integer.parseInt(str); - props.remove("send_buf_size"); - } - - str=props.getProperty("skip_suspected_members"); - if(str != null) { - skip_suspected_members=Boolean.valueOf(str).booleanValue(); - props.remove("skip_suspected_members"); - } - - str=props.getProperty("use_send_queues"); - if(str != null) { - use_send_queues=Boolean.valueOf(str).booleanValue(); - props.remove("use_send_queues"); - } - - if(props.size() > 0) { - log.error(ExternalStrings.TCP_THE_FOLLOWING_PROPERTIES_ARE_NOT_RECOGNIZED__0, props); - return false; - } - return true; - } - - - @Override // GemStoneAddition - public void start() throws Exception { - ct=getConnectionTable(reaper_interval,conn_expire_time,bind_addr,external_addr,start_port,end_port); - ct.setUseSendQueues(use_send_queues); - // ct.addConnectionListener(this); - ct.setReceiveBufferSize(recv_buf_size); - ct.setSendBufferSize(send_buf_size); - ct.setSocketConnectionTimeout(sock_conn_timeout); - local_addr=ct.getLocalAddress(); - if(additional_data != null && local_addr instanceof IpAddress) - ((IpAddress)local_addr).setAdditionalData(additional_data); - super.start(); - } - - @Override // GemStoneAddition - public void stop() { - ct.stop(); - super.stop(); - } - - - @Override // GemStoneAddition - protected void handleDownEvent(Event evt) { - super.handleDownEvent(evt); - if(evt.getType() == Event.VIEW_CHANGE) { - suspected_mbrs.removeAll(); - } - else if(evt.getType() == Event.UNSUSPECT) { - suspected_mbrs.removeElement(evt.getArg()); - } - } - - - /** - * @param reaperInterval - * @param connExpireTime - * @param bindAddress - * @param startPort - * @throws Exception - * @return ConnectionTable - * Sub classes overrides this method to initialize a different version of - * ConnectionTable. - */ - protected ConnectionTable getConnectionTable(long reaperInterval, long connExpireTime, InetAddress bindAddress, - InetAddress externalAddress, int startPort, int endPort) throws Exception { - ConnectionTable cTable; - if(reaperInterval == 0 && connExpireTime == 0) { - cTable=new ConnectionTable(this, bindAddress, externalAddress, startPort, endPort); - } - else { - if(reaperInterval == 0) { - reaperInterval=5000; - if(warn) log.warn("reaper_interval was 0, set it to " + reaperInterval); - } - if(connExpireTime == 0) { - connExpireTime=1000 * 60 * 5; - if(warn) log.warn("conn_expire_time was 0, set it to " + connExpireTime); - } - cTable=new ConnectionTable(this, bindAddress, externalAddress, startPort, endPort, - reaperInterval, connExpireTime); - } - return cTable; - } - - - /** ConnectionTable.Receiver interface */ - public void receive(Address sender, byte[] data, int offset, int length) { - super.receive(local_addr, sender, data, offset, length); - } - - - - - @Override // GemStoneAddition - public void sendToAllMembers(byte[] data, int offset, int length) throws Exception { - Address dest; - Vector mbrs=(Vector)members.clone(); - for(int i=0; i < mbrs.size(); i++) { - dest=(Address)mbrs.elementAt(i); - sendToSingleMember(dest, false, data, offset, length); - } - } - - @Override // GemStoneAddition - public void sendToSingleMember(Address dest, boolean isJoinResponse/*temporary change - do not commit*/, byte[] data, int offset, int length) throws Exception { - if(trace) log.trace("dest=" + dest + " (" + data.length + " bytes)"); - if(skip_suspected_members) { - if(suspected_mbrs.contains(dest)) { - if(trace) - log.trace("will not send unicast message to " + dest + " as it is currently suspected"); - return; - } - } - -// if(dest.equals(local_addr)) { -// if(!loopback) // if loopback, we discard the message (was already looped back) -// receive(dest, data, offset, length); // else we loop it back here -// return; -// } - try { - ct.send(dest, data, offset, length); - } - catch(Exception e) { - if(members.contains(dest)) { - if(!suspected_mbrs.contains(dest)) { - suspected_mbrs.add(dest); - passUp(new Event(Event.SUSPECT, new SuspectMember(local_addr, dest))); // GemStoneAddition SuspectMember - } - } - } - } - - - @Override // GemStoneAddition - public String getInfo() { - StringBuffer sb=new StringBuffer(); - sb.append("connections: ").append(printConnections()).append("\n"); - return sb.toString(); - } - - - @Override // GemStoneAddition - public void postUnmarshalling(Message msg, Address dest, Address src, boolean multicast) { - if(multicast) - msg.setDest(null); - else - msg.setDest(dest); - } - - @Override // GemStoneAddition - public void postUnmarshallingList(Message msg, Address dest, boolean multicast) { - postUnmarshalling(msg, dest, null, multicast); - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCPGOSSIP.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCPGOSSIP.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCPGOSSIP.java deleted file mode 100644 index 91650af..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCPGOSSIP.java +++ /dev/null @@ -1,429 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -// $Id: TCPGOSSIP.java,v 1.16 2005/08/11 12:43:47 belaban Exp $ - -package com.gemstone.org.jgroups.protocols; - - -import java.net.Inet4Address; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.Properties; -import java.util.Set; -import java.util.StringTokenizer; -import java.util.Vector; -import java.util.concurrent.atomic.AtomicBoolean; - -import com.gemstone.org.jgroups.Address; -import com.gemstone.org.jgroups.Event; -import com.gemstone.org.jgroups.JChannel; -import com.gemstone.org.jgroups.Message; -import com.gemstone.org.jgroups.stack.GossipClient; -import com.gemstone.org.jgroups.stack.IpAddress; -import com.gemstone.org.jgroups.util.ExternalStrings; - - -/** - * The TCPGOSSIP protocol layer retrieves the initial membership (used by the GMS when started - * by sending event FIND_INITIAL_MBRS down the stack). - * We do this by contacting one or more GossipServers, which must be running at well-known - * addresses:ports. The responses should allow us to determine the coordinator whom we have to - * contact, e.g. in case we want to join the group. When we are a server (after having - * received the BECOME_SERVER event), we'll respond to TCPGOSSIP requests with a TCPGOSSIP - * response.<p> The FIND_INITIAL_MBRS event will eventually be answered with a - * FIND_INITIAL_MBRS_OK event up the stack. - * - * @author Bela Ban - */ -public class TCPGOSSIP extends Discovery { - Vector initial_hosts=null; // (list of IpAddresses) hosts to be contacted for the initial membership - GossipClient gossip_client=null; // accesses the GossipServer(s) to find initial mbrship - - // we need to refresh the registration with the GossipServer(s) periodically, - // so that our entries are not purged from the cache - long gossip_refresh_rate=20000; - - private boolean splitBrainDetectionEnabled; // GemStoneAddition - private int gossipServerWaitTime; // GemStoneAddition - - final static Vector EMPTY_VECTOR=new Vector(); - final static String name="TCPGOSSIP"; - - - @Override // GemStoneAddition - public String getName() { - return name; - } - - // start GemStoneAddition - @Override // GemStoneAddition - public int getProtocolEnum() { - return com.gemstone.org.jgroups.stack.Protocol.enumTCPGOSSIP; - } - // end GemStone addition - - @Override // GemStoneAddition - public boolean setProperties(Properties props) { - String str; - str=props.getProperty("gossip_refresh_rate"); // wait for at most n members - if(str != null) { - gossip_refresh_rate=Integer.parseInt(str); - props.remove("gossip_refresh_rate"); - } - - //GemStoneAddition - split-brain detection support - str=props.getProperty("split-brain-detection"); - if (str != null) { - splitBrainDetectionEnabled = Boolean.valueOf(str).booleanValue(); - props.remove("split-brain-detection"); - } - - str=props.getProperty("initial_hosts"); - if(str != null) { - props.remove("initial_hosts"); - initial_hosts=createInitialHosts(str); - } - - str = props.getProperty("gossip_server_wait_time"); - if (str != null) { - props.remove("gossip_server_wait_time"); - gossipServerWaitTime = Integer.parseInt(str); - } - - if(initial_hosts == null || initial_hosts.size() == 0) { - if(log.isErrorEnabled()) log.error(ExternalStrings.TCPGOSSIP_INITIAL_HOSTS_MUST_CONTAIN_THE_ADDRESS_OF_AT_LEAST_ONE_GOSSIPSERVER); - return false; - } - return super.setProperties(props); - } - - - - @Override // GemStoneAddition - public void start() throws Exception { - super.start(); - if(gossip_client == null) { - gossip_client=new GossipClient(initial_hosts, gossip_refresh_rate, this.stack); - gossip_client.setTimeout((int)this.timeout); - } - } - - @Override // GemStoneAddition - public void stop() { - super.stop(); - if(gossip_client != null) { - gossip_client.stop(); - //gossip_client=null; - } - } - - - @Override // GemStoneAddition - public void handleConnectOK() { - if(group_addr == null || local_addr == null) { - if(log.isErrorEnabled()) - log.error("[CONNECT_OK]: group_addr or local_addr is null. " + - "cannot register with GossipServer(s)"); - } - else { - gossip_client.register(group_addr, local_addr, timeout, true); // GemStone - timeout, stack & inhibit registration - } - } - - - private boolean ipWarningIssued; // GemStoneAddition - IP version checking - - @Override // GemStoneAddition - public void sendGetMembersRequest(AtomicBoolean waiter_sync) { // GemStoneAddition - both parameters - Message msg, copy; - PingHeader hdr; - Vector tmp_mbrs; - Address mbr_addr; - GossipClient client = gossip_client; // GemStoneAddition - gossip_client gets nulled when this proto is stopped - - // bug #41484 - only use coordinator advice from the gossip server once - boolean shortcutOK = !this.stack.hasTriedJoinShortcut(); - - if(group_addr == null) { - if(log.isErrorEnabled()) log.error(ExternalStrings.TCPGOSSIP_FIND_INITIAL_MBRS_GROUP_ADDR_IS_NULL_CANNOT_GET_MBRSHIP); - passUp(new Event(Event.FIND_INITIAL_MBRS_OK, EMPTY_VECTOR)); - return; - } - if(trace) log.trace("fetching members from GossipServer(s)"); - - // GemStoneAddition - bug 28965: don't allow startup if no gossip server - boolean isAdminOnly = stack.gfPeerFunctions.isAdminOnlyMember(); - //do { GemStone - see comment below - - if (gossip_client == null) - return; - - long giveUpTime = System.currentTimeMillis() + (this.gossipServerWaitTime * 1000L); - - tmp_mbrs=client.getMembers(group_addr, local_addr, true, this.timeout); // GemStoneAddition - send local addr on get - - boolean firstWait = true; - boolean startupStatusWaitingSet = false; - -// if (isAdminOnly) { // GemStoneAddition - this if-else block added - while (gossip_client != null && client.getResponsiveServerCount() == 0 || tmp_mbrs == null || tmp_mbrs.size() == 0) { - // Wait, until we can contact at least one of our - // gossip servers and it had someone register with it - if (!isAdminOnly && System.currentTimeMillis() >= giveUpTime) { - break; - } - if (firstWait) { - StringBuilder sb = new StringBuilder(100); - for (Object obj: this.initial_hosts) { - if (!firstWait) { - sb.append(','); - } - firstWait = false; - IpAddress addr = (IpAddress)obj; - sb.append(addr.getIpAddress().getHostName()) - .append('[') - .append(addr.getPort()) - .append(']'); - } - // inform gfsh / ServerLauncher - startupStatusWaitingSet = true; - stack.gfPeerFunctions.logStartup(ExternalStrings.WAITING_FOR_LOCATOR_TO_START,sb.toString()); - } - try { - Thread.sleep(1000); - } catch (InterruptedException ignore) { - Thread.currentThread().interrupt(); // GemStoneAddition - return; // GemStoneAddition - } - tmp_mbrs=client.getMembers(group_addr, local_addr, true, timeout); - // GemStone Addition 08-04-04 - // if the VM is exiting, return so that the distributed system - // sync can be released and the shutdown hook can do its job - if (stack.gfPeerFunctions.shutdownHookIsAlive()) { - throw stack.gfBasicFunctions.getGemFireConfigException("Unable to contact a Locator service before detecting that VM is exiting"); - } - } -// } else { -// if (gossip_client == null) GemStoneAddition (this is never null) -// return; - if (client.getResponsiveServerCount() == 0) { - RuntimeException re = stack.gfBasicFunctions.getGemFireConfigException("Unable to contact a Locator service. Operation either timed out or Locator does not exist. Configured list of locators is \"" + initial_hosts + "\"."); - throw re; - } -// } - - if (startupStatusWaitingSet) { - stack.gfPeerFunctions.logStartup(ExternalStrings.WAITING_FOR_LOCATOR_TO_START_COMPLETED); - } - Set<Address> serverAddresses = client.getServerAddresses(); - - if (client.getFloatingCoordinatorDisabled()) { - passUp(new Event(Event.FLOATING_COORDINATOR_DISABLED, null)); - } - - if (client.getNetworkPartitionDetectionEnabled() != splitBrainDetectionEnabled) { - if (!splitBrainDetectionEnabled) { - splitBrainDetectionEnabled = true; - passUp(new Event(Event.ENABLE_NETWORK_PARTITION_DETECTION)); - } else { - throw stack.gfBasicFunctions.getGemFireConfigException("Locator has enable-network-partition-detection=" - + client.getNetworkPartitionDetectionEnabled() - +" but this member has enable-network-partition-detection=" - + splitBrainDetectionEnabled); - } - } - - if (client.getNetworkPartitionDetectionEnabled()) { - stack.gfBasicFunctions.checkDisableDNS(); - } - - - // GemStoneAddition for bug 39220 see if we're using an incompatible - // version of IP - if (tmp_mbrs != null && !ipWarningIssued) { - TP protocol = (TP)stack.findProtocol("UDP"); - if (protocol == null) protocol = (TP)stack.findProtocol("TCP"); - InetAddress bindAddress = protocol.getInetBindAddress(); - if (bindAddress != null) { - boolean iAmIPv4 = (bindAddress instanceof Inet4Address); - for (int i=0; i<tmp_mbrs.size(); i++) { - IpAddress addr = (IpAddress)tmp_mbrs.get(i); - InetAddress iaddr = addr.getIpAddress(); - if (iAmIPv4 != (iaddr instanceof Inet4Address)) { - // incompatible addresses are being used - log.getLogWriter().warning( - ExternalStrings.TCPGOSSIP_IP_VERSION_MISMATCH); - ipWarningIssued = true; - break; - } - } - } - } - - serverAddresses.remove(this.local_addr); - this.ping_waiter.setRequiredResponses(serverAddresses); - - // GemStoneAddition - if no locators have distributed systems, - // tell the GMS that it's okay for it to become a coordinator - if (client.getServerDistributedSystemCount() == 0) { - passUp(new Event(Event.ENABLE_INITIAL_COORDINATOR, null)); - } - - // GemStoneAddition - shortcut the get_mbrs phase - if (shortcutOK) { - Address coordinator = client.getCoordinator(); - // if this is a Locator starting up and there are no other processes - // in the system we can bypass discovery - - // disabled: this allows a locator that's starting up to ignore concurrently - // starting locators. bug #30341 is fixed by requiring responses from - // all known locators during discovery, and this code messes that up -// if (coordinator == null && Locator.hasLocators() -// && tmp_mbrs.size() == 0 -// || (tmp_mbrs.size() == 1 && tmp_mbrs.get(0).equals(this.local_addr))) { -// coordinator = this.local_addr; -// } - if (coordinator != null) { - if (log.getLogWriter().fineEnabled()) { - log.getLogWriter().fine("Locator returned coordinator " + coordinator + - ", so bypassing unicast discovery processing"); - } - ping_waiter.setCoordinator(coordinator); - wakeWaiter(waiter_sync); - return; - } - } - - if(tmp_mbrs == null || tmp_mbrs.size() == 0) { - if(trace) log.trace("[FIND_INITIAL_MBRS]: gossip client found no members"); - passUp(new Event(Event.FIND_INITIAL_MBRS_OK, EMPTY_VECTOR)); - wakeWaiter(waiter_sync); // GemStoneAddition - return; - } - if(trace) { - log.trace("consolidated mbrs from GossipServer(s) are " + tmp_mbrs - + ". Locator distributed system count=" + client.getServerDistributedSystemCount() - + ", and floatingCoordinationDisabled="+client.getFloatingCoordinatorDisabled()); - } - - // GemStoneAddition - forces us to not get any initial member responses & tests the - // disable_initial_coordinator setting - //if (true) { - // log.info("DEBUG: not sending GET_MBRS_REQ message to list returned by gossip server"); - // return; - //} - - // 1. 'Mcast' GET_MBRS_REQ message - hdr=new PingHeader(PingHeader.GET_MBRS_REQ, null); - msg=new Message(null, null, null); - msg.putHeader(name, hdr); - //GemStoneAddition - don't bundle this message or we might time out - // before it's even sent - msg.bundleable = false; - - wakeWaiter(waiter_sync); // GemStoneAddition - - // GemStoneAddition - here we send the request to newer members first - // since they're likely to be around. - int max_msgs = Integer.getInteger("gemfire.max_ping_requests", 40).intValue(); - int msgs_sent = 0; - for(int i=tmp_mbrs.size()-1; i >= 0; i--) { - mbr_addr=(Address)tmp_mbrs.elementAt(i); - // make sure all required responders get the message - if (!serverAddresses.contains(mbr_addr) && (msgs_sent >= max_msgs)) { - continue; - } - copy=msg.copy(); - copy.setDest(mbr_addr); - if(trace) log.trace("[FIND_INITIAL_MBRS] sending PING request to " + copy.getDest()); - passDown(new Event(Event.MSG, copy)); - if (Thread.currentThread().isInterrupted()) { - break; - } - msgs_sent++; - } - - // GemStoneAddition - not really an addition, just a note from Bruce - // that this used to have a wait-for-initial-members section that is - // now gone, making the loop a bit difficult to implement - //} while (isAdminOnly && initial_members.size() <= 0); - - } - - - - /* -------------------------- Private methods ---------------------------- */ - - - /** - * Input is "daddy[8880],sindhu[8880],camille[5555]. Return list of IpAddresses - */ - public static Vector createInitialHosts(String l) { - Vector tmp=new Vector(); - String host; - int port; - IpAddress addr; - StringTokenizer tok=new StringTokenizer(l, ","); - String t; - boolean isLoopback = false; - InetAddress myAddress = null; - - String bindAddress = System.getProperty("gemfire.jg-bind-address"); - try { - if (bindAddress == null) { - isLoopback = JChannel.getGfFunctions().getLocalHost().isLoopbackAddress(); - } else { - isLoopback = InetAddress.getByName(bindAddress).isLoopbackAddress(); - } - } catch (UnknownHostException e) { - // ignore - } - - - - while(tok.hasMoreTokens()) { - try { - t=tok.nextToken(); - host=t.substring(0, t.indexOf('[')); - // GemStoneAddition - support for name:bind-addr[port] format - int idx = host.lastIndexOf('@'); - if (idx < 0) { - idx = host.lastIndexOf(':'); - } - String h = host.substring(0, idx > -1 ? idx : host.length()); - if (h.indexOf(':') >= 0) { // a single numeric ipv6 address - idx = host.lastIndexOf('@'); - } - if (idx >= 0) { - host = host.substring(idx+1, host.length()); - } - port=Integer.parseInt(t.substring(t.indexOf('[') + 1, t.indexOf(']'))); - addr=new IpAddress(host, port); - if (isLoopback && !addr.getIpAddress().isLoopbackAddress()) { // GemStoneAddition - // TODO this should be a GemFireConfigException but that class isn't available - // in a static method in the jgroups project - throw new RuntimeException("This process is attempting to join with a loopback address ("+myAddress+") using a locator that does not have a local address ("+addr.getIpAddress()+"). On Unix this usually means that /etc/hosts is misconfigured."); - } - tmp.addElement(addr); - } - catch(NumberFormatException e) { - //if(log.isErrorEnabled()) log.error(JGroupsStrings.TCPGOSSIP_EXEPTION_IS__0, e); - } - } - - return tmp; - } - - @Override // GemStoneAddition - public void destroy() { // GemStoneAddition - get rid of gossip timer - if (gossip_client != null) { - gossip_client.destroy(); - gossip_client = null; - } - } - - -} - http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCPPING.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCPPING.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCPPING.java deleted file mode 100644 index 6459d49..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCPPING.java +++ /dev/null @@ -1,144 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -// $Id: TCPPING.java,v 1.24 2005/08/11 12:43:47 belaban Exp $ - -package com.gemstone.org.jgroups.protocols; - - - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.Properties; -import java.util.StringTokenizer; -import java.util.concurrent.atomic.AtomicBoolean; - -import com.gemstone.org.jgroups.Address; -import com.gemstone.org.jgroups.Event; -import com.gemstone.org.jgroups.Message; -import com.gemstone.org.jgroups.stack.IpAddress; -import com.gemstone.org.jgroups.util.ExternalStrings; - - -/** - * The TCPPING protocol layer retrieves the initial membership in answer to the GMS's - * FIND_INITIAL_MBRS event. The initial membership is retrieved by directly contacting other group - * members, sending point-to-point mebership requests. The responses should allow us to determine - * the coordinator whom we have to contact in case we want to join the group. When we are a server - * (after having received the BECOME_SERVER event), we'll respond to TCPPING requests with a TCPPING - * response. - * <p> - * The FIND_INITIAL_MBRS event will eventually be answered with a FIND_INITIAL_MBRS_OK event up - * the stack. - * <p> - * The TCPPING protocol requires a static conifiguration, which assumes that you to know in advance - * where to find other members of your group. For dynamic discovery, use the PING protocol, which - * uses multicast discovery, or the TCPGOSSIP protocol, which contacts a Gossip Router to acquire - * the initial membership. - * - * @author Bela Ban - */ -public class TCPPING extends Discovery { - int port_range=1; // number of ports to be probed for initial membership - - /** List of IpAddress */ - ArrayList initial_hosts=null; // hosts to be contacted for the initial membership - final static String name="TCPPING"; - - - - @Override // GemStoneAddition - public String getName() { - return name; - } - - - @Override // GemStoneAddition - public boolean setProperties(Properties props) { - String str; - - str=props.getProperty("port_range"); // if member cannot be contacted on base port, - if(str != null) { // how many times can we increment the port - port_range=Integer.parseInt(str); - if (port_range < 1) { - port_range = 1; - } - props.remove("port_range"); - } - - str=props.getProperty("initial_hosts"); - if(str != null) { - props.remove("initial_hosts"); - initial_hosts=createInitialHosts(str); - } - - return super.setProperties(props); - } - - - @Override // GemStoneAddition - public void localAddressSet(Address addr) { - // Add own address to initial_hosts if not present: we must always be able to ping ourself ! - if(initial_hosts != null && addr != null) { - if(initial_hosts.contains(addr)) { - initial_hosts.remove(addr); - if(log.isDebugEnabled()) log.debug("[SET_LOCAL_ADDRESS]: removing my own address (" + addr + - ") from initial_hosts; initial_hosts=" + initial_hosts); - } - } - } - - - @Override // GemStoneAddition - public void sendGetMembersRequest(AtomicBoolean waiter_sync) { - Message msg; - - wakeWaiter(waiter_sync); - - for(Iterator it=initial_hosts.iterator(); it.hasNext();) { - Address addr=(Address)it.next(); - // if(tmpMbrs.contains(addr)) { - // ; // continue; // changed as suggested by Mark Kopec - // } - msg=new Message(addr, null, null); - msg.putHeader(name, new PingHeader(PingHeader.GET_MBRS_REQ, null)); - - if(trace) log.trace("[FIND_INITIAL_MBRS] sending PING request to " + msg.getDest()); - passDown(new Event(Event.MSG, msg)); - } - } - - - - /* -------------------------- Private methods ---------------------------- */ - - /** - * Input is "daddy[8880],sindhu[8880],camille[5555]. Return List of IpAddresses - */ - private ArrayList createInitialHosts(String l) { - StringTokenizer tok=new StringTokenizer(l, ","); - String t; - IpAddress addr; - ArrayList retval=new ArrayList(); - - while(tok.hasMoreTokens()) { - try { - t=tok.nextToken(); - String host=t.substring(0, t.indexOf('[')); - int port=Integer.parseInt(t.substring(t.indexOf('[') + 1, t.indexOf(']'))); - for(int i=port; i < port + port_range; i++) { - addr=new IpAddress(host, i); - retval.add(addr); - } - } - catch(NumberFormatException e) { - if(log.isErrorEnabled()) log.error(ExternalStrings.TCPPING_EXEPTION_IS__0, e); - } - } - - return retval; - } - -} - http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCP_NIO.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCP_NIO.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCP_NIO.java deleted file mode 100644 index 3c9ba7b..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TCP_NIO.java +++ /dev/null @@ -1,118 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -package com.gemstone.org.jgroups.protocols; - -import com.gemstone.org.jgroups.blocks.ConnectionTable; -import com.gemstone.org.jgroups.blocks.ConnectionTableNIO; - -import java.net.InetAddress; -import java.util.Properties; - -public class TCP_NIO extends TCP - { - - /* - * (non-Javadoc) - * - * @see org.jgroups.protocols.TCP#getConnectionTable(long, long) - */ - @Override // GemStoneAddition - protected ConnectionTable getConnectionTable(long ri, long cet, - InetAddress b_addr, InetAddress bc_addr, int s_port, int e_port) throws Exception { - ConnectionTableNIO ct = null; - if (ri == 0 && cet == 0) { - ct = new ConnectionTableNIO(this, b_addr, bc_addr, s_port, e_port ); - } else { - if (ri == 0) { - ri = 5000; - if(warn) log.warn("reaper_interval was 0, set it to " - + ri); - } - if (cet == 0) { - cet = 1000 * 60 * 5; - if(warn) log.warn("conn_expire_time was 0, set it to " - + cet); - } - ct = new ConnectionTableNIO(this, b_addr, bc_addr, s_port, e_port, ri, cet); - } - return ct; - } - - @Override // GemStoneAddition - public String getName() { - return "TCP_NIO"; - } - - public int getReaderThreads() { return m_reader_threads; } - public int getWriterThreads() { return m_writer_threads; } - public int getProcessorThreads() { return m_processor_threads; } - public int getProcessorMinThreads() { return m_processor_minThreads;} - public int getProcessorMaxThreads() { return m_processor_maxThreads;} - public int getProcessorQueueSize() { return m_processor_queueSize; } - public int getProcessorKeepAliveTime() { return m_processor_keepAliveTime; } - - /** Setup the Protocol instance acording to the configuration string */ - @Override // GemStoneAddition - public boolean setProperties(Properties props) { - String str; - - str=props.getProperty("reader_threads"); - if(str != null) { - m_reader_threads=Integer.parseInt(str); - props.remove("reader_threads"); - } - - str=props.getProperty("writer_threads"); - if(str != null) { - m_writer_threads=Integer.parseInt(str); - props.remove("writer_threads"); - } - - str=props.getProperty("processor_threads"); - if(str != null) { - m_processor_threads=Integer.parseInt(str); - props.remove("processor_threads"); - } - - str=props.getProperty("processor_minThreads"); - if(str != null) { - m_processor_minThreads=Integer.parseInt(str); - props.remove("processor_minThreads"); - } - - str=props.getProperty("processor_maxThreads"); - if(str != null) { - m_processor_maxThreads =Integer.parseInt(str); - props.remove("processor_maxThreads"); - } - - str=props.getProperty("processor_queueSize"); - if(str != null) { - m_processor_queueSize=Integer.parseInt(str); - props.remove("processor_queueSize"); - } - - str=props.getProperty("processor_keepAliveTime"); - if(str != null) { - m_processor_keepAliveTime=Integer.parseInt(str); - props.remove("processor_keepAliveTime"); - } - - return super.setProperties(props); - } - - private int m_reader_threads = 8; - - private int m_writer_threads = 8; - - private int m_processor_threads = 10; // PooledExecutor.createThreads() - private int m_processor_minThreads = 10; // PooledExecutor.setMinimumPoolSize() - private int m_processor_maxThreads = 10; // PooledExecutor.setMaxThreads() - private int m_processor_queueSize=100; // Number of queued requests that can be pending waiting - // for a background thread to run the request. - private int m_processor_keepAliveTime = -1; // PooledExecutor.setKeepAliveTime( milliseconds); - // A negative value means to wait forever - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TOTAL.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TOTAL.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TOTAL.java deleted file mode 100644 index 2622a29..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/TOTAL.java +++ /dev/null @@ -1,1055 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -// $Id: TOTAL.java,v 1.11 2005/08/08 12:45:44 belaban Exp $ -package com.gemstone.org.jgroups.protocols; - - -import com.gemstone.org.jgroups.oswego.concurrent.ReadWriteLock; -import com.gemstone.org.jgroups.oswego.concurrent.WriterPreferenceReadWriteLock; -import com.gemstone.org.jgroups.Address; -import com.gemstone.org.jgroups.Event; -import com.gemstone.org.jgroups.Message; -import com.gemstone.org.jgroups.View; -import com.gemstone.org.jgroups.stack.AckSenderWindow; -import com.gemstone.org.jgroups.stack.Protocol; -import com.gemstone.org.jgroups.util.ExternalStrings; -import com.gemstone.org.jgroups.util.TimeScheduler; - -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.*; - - -/** - * Implements the total ordering layer using a message sequencer - * <p/> - * <p/> - * The protocol guarantees that all bcast sent messages will be delivered in - * the same order to all members. For that it uses a sequencer which assignes - * monotonically increasing sequence ID to broadcasts. Then all group members - * deliver the bcasts in ascending sequence ID order. - * <p/> - * <ul> - * <li> - * When a bcast message comes down to this layer, it is placed in the pending - * down queue. A bcast request is sent to the sequencer.</li> - * <li> - * When the sequencer receives a bcast request, it creates a bcast reply - * message and assigns to it a monotonically increasing seqID and sends it back - * to the source of the bcast request.</li> - * <li> - * When a broadcast reply is received, the corresponding bcast message is - * assigned the received seqID. Then it is broadcasted.</li> - * <li> - * Received bcasts are placed in the up queue. The queue is sorted according - * to the seqID of the bcast. Any message at the head of the up queue with a - * seqID equal to the next expected seqID is delivered to the layer above.</li> - * <li> - * Unicast messages coming from the layer below are forwarded above.</li> - * <li> - * Unicast messages coming from the layer above are forwarded below.</li> - * </ul> - * <p/> - * <i>Please note that once a <code>BLOCK_OK</code> is acknowledged messages - * coming from above are discarded!</i> Either the application must stop - * sending messages when a <code>BLOCK</code> event is received from the - * channel or a QUEUE layer should be placed above this one. Received messages - * are still delivered above though. - * <p/> - * bcast requests are retransmitted periodically until a bcast reply is - * received. In case a BCAST_REP is on its way during a BCAST_REQ - * retransmission, then the next BCAST_REP will be to a non-existing - * BCAST_REQ. So, a nulll BCAST message is sent to fill the created gap in - * the seqID of all members. - * - * @author i.georgia...@doc.ic.ac.uk - */ -public class TOTAL extends Protocol { - /** - * The header processed by the TOTAL layer and intended for TOTAL - * inter-stack communication - */ - public static class Header extends com.gemstone.org.jgroups.Header { - // Header types - /** - * Null value for the tag - */ - public static final int NULL_TYPE=-1; - /** - * Request to broadcast by the source - */ - public static final int REQ=0; - /** - * Reply to broadcast request. - */ - public static final int REP=1; - /** - * Unicast message - */ - public static final int UCAST=2; - /** - * Broadcast Message - */ - public static final int BCAST=3; - - /** - * The header's type tag - */ - public int type; - /** - * The ID used by the message source to match replies from the - * sequencer - */ - public long localSequenceID; - /** - * The ID imposing the total order of messages - */ - public long sequenceID; - - /** - * used for externalization - */ - public Header() { - } - - /** - * Create a header for the TOTAL layer - * - * @param type the header's type - * @param localSeqID the ID used by the sender of broadcasts to match - * requests with replies from the sequencer - * @param seqID the ID imposing the total order of messages - * @throws IllegalArgumentException if the provided header type is - * unknown - */ - public Header(int type, long localSeqID, long seqID) { - super(); - switch(type) { - case REQ: - case REP: - case UCAST: - case BCAST: - this.type=type; - break; - default: - this.type=NULL_TYPE; - throw new IllegalArgumentException("type"); - } - this.localSequenceID=localSeqID; - this.sequenceID=seqID; - } - - /** - * For debugging purposes - */ - @Override // GemStoneAddition - public String toString() { - StringBuffer buffer=new StringBuffer(); - String typeName; - buffer.append("[TOTAL.Header"); - switch(type) { - case REQ: - typeName="REQ"; - break; - case REP: - typeName="REP"; - break; - case UCAST: - typeName="UCAST"; - break; - case BCAST: - typeName="BCAST"; - break; - case NULL_TYPE: - typeName="NULL_TYPE"; - break; - default: - typeName=""; - break; - } - buffer.append(", type=" + typeName); - buffer.append(", " + "localID=" + localSequenceID); - buffer.append(", " + "seqID=" + sequenceID); - buffer.append(']'); - - return (buffer.toString()); - } - - /** - * Manual serialization - */ - public void writeExternal(ObjectOutput out) throws IOException { - out.writeInt(type); - out.writeLong(localSequenceID); - out.writeLong(sequenceID); - } - - /** - * Manual deserialization - */ - public void readExternal(ObjectInput in) throws IOException, - ClassNotFoundException { - type=in.readInt(); - localSequenceID=in.readLong(); - sequenceID=in.readLong(); - } - } - - - /** - * The retransmission listener - It is called by the - * <code>AckSenderWindow</code> when a retransmission should occur - */ - private class Command implements AckSenderWindow.RetransmitCommand { - Command() { - } - - public void retransmit(long seqNo, Message msg) { - _retransmitBcastRequest(seqNo); - } - // GemstoneAddition - public long getMaxRetransmissionBurst() { - return 0; - } - } - - - /** - * Protocol name - */ - private static final String PROT_NAME="TOTAL"; - /** - * Property names - */ - private static final String TRACE_PROP="trace"; - - /** - * Average time between broadcast request retransmissions - */ - private final long[] AVG_RETRANSMIT_INTERVAL=new long[]{1000, 2000, 3000, 4000}; - - /** - * Null value for the IDs - */ - private static final long NULL_ID=-1; - // Layer sending states - /** - * No group has been joined yet - */ - private static final int NULL_STATE=-1; - /** - * When set, all messages are sent/received - */ - private static final int RUN=0; - /** - * When set, only session-specific messages are sent/received, i.e. only - * messages essential to the session's integrity - */ - private static final int FLUSH=1; - /** - * No message is sent to the layer below - */ - private static final int BLOCK=2; - - - /** - * The state lock allowing multiple reads or a single write - */ - private final ReadWriteLock stateLock=new WriterPreferenceReadWriteLock(); - /** - * Protocol layer message-sending state - */ - private int state=NULL_STATE; - /** - * The address of this stack - */ - private Address addr=null; - /** - * The address of the sequencer - */ - private Address sequencerAddr=null; - /** - * The sequencer's seq ID. The ID of the most recently broadcast reply - * message - */ - private long sequencerSeqID=NULL_ID; - /** - * The local sequence ID, i.e. the ID sent with the last broadcast request - * message. This is increased with every broadcast request sent to the - * sequencer and it's used to match the requests with the sequencer's - * replies - */ - private long localSeqID=NULL_ID; - /** - * The total order sequence ID. This is the ID of the most recently - * delivered broadcast message. As the sequence IDs are increasing without - * gaps, this is used to detect missing broadcast messages - */ - private long seqID=NULL_ID; - /** - * The list of unanswered broadcast requests to the sequencer. The entries - * are stored in increasing local sequence ID, i.e. in the order they were - * <p/> - * sent localSeqID -> Broadcast msg to be sent. - */ - private SortedMap reqTbl; - /** - * The list of received broadcast messages that haven't yet been delivered - * to the layer above. The entries are stored in increasing sequence ID, - * i.e. in the order they must be delivered above - * <p/> - * seqID -> Received broadcast msg - */ - private SortedMap upTbl; - /** - * Retranmitter for pending broadcast requests - */ - private AckSenderWindow retransmitter; - - - /** - * Print addresses in host_ip:port form to bypass DNS - */ - private String _addrToString(Object addr) { - return ( - addr == null ? "<null>" : - ((addr instanceof com.gemstone.org.jgroups.stack.IpAddress) ? - (((com.gemstone.org.jgroups.stack.IpAddress)addr).getIpAddress( - ).getHostAddress() + ':' + - ((com.gemstone.org.jgroups.stack.IpAddress)addr).getPort()) : - addr.toString()) - ); - } - - - /** - * @return this protocol's name - */ - private String _getName() { - return (PROT_NAME); - } - - /** - * Configure the protocol based on the given list of properties - * - * @param properties the list of properties to use to setup this layer - * @return false if there was any unrecognized property or a property with - * an invalid value - */ - private boolean _setProperties(Properties properties) { - String value; - - // trace - // Parse & remove property but ignore it; use Trace.trace instead - value=properties.getProperty(TRACE_PROP); - if(value != null) properties.remove(TRACE_PROP); - if(properties.size() > 0) { - if(log.isErrorEnabled()) - log.error("The following properties are not " + - "recognized: " + properties.toString()); - return (false); - } - return (true); - } - - /** - * Events that some layer below must handle - * - * @return the set of <code>Event</code>s that must be handled by some layer - * below - */ - Vector _requiredDownServices() { - Vector services=new Vector(); - - return (services); - } - - /** - * Events that some layer above must handle - * - * @return the set of <code>Event</code>s that must be handled by some - * layer above - */ - Vector _requiredUpServices() { - Vector services=new Vector(); - - return (services); - } - - - /** - * Extract as many messages as possible from the pending up queue and send - * them to the layer above - */ - private void _deliverBcast() { - Message msg; - Header header; - - synchronized(upTbl) { - while((msg=(Message)upTbl.remove(Long.valueOf(seqID + 1))) != null) { - header=(Header)msg.removeHeader(getName()); - if(header.localSequenceID != NULL_ID) passUp(new Event(Event.MSG, msg)); - ++seqID; - } - } // synchronized(upTbl) - } - - - /** - * Add all undelivered bcasts sent by this member in the req queue and then - * replay this queue - */ - private void _replayBcast() { - Iterator it; - Message msg; - Header header; - - // i. Remove all undelivered bcasts sent by this member and place them - // again in the pending bcast req queue - - synchronized(upTbl) { - if(upTbl.size() > 0) - if(log.isInfoEnabled()) log.info(ExternalStrings.TOTAL_REPLAYING_UNDELIVERED_BCASTS); - - it=upTbl.entrySet().iterator(); - while(it.hasNext()) { - msg=(Message)((Map.Entry)it.next()).getValue(); - it.remove(); - if(!msg.getSrc().equals(addr)) { - if(log.isInfoEnabled()) - log.info("During replay: " + - "discarding BCAST[" + - ((TOTAL.Header)msg.getHeader(getName())).sequenceID + - "] from " + _addrToString(msg.getSrc())); - continue; - } - header=(Header)msg.removeHeader(getName()); - if(header.localSequenceID == NULL_ID) continue; - _sendBcastRequest(msg, header.localSequenceID); - } - } // synchronized(upTbl) - } - - - /** - * Send a unicast message: Add a <code>UCAST</code> header - * - * @param msg the message to unicast - * @return the message to send - */ - private Message _sendUcast(Message msg) { - msg.putHeader(getName(), new Header(Header.UCAST, NULL_ID, NULL_ID)); - return (msg); - } - - - /** - * Replace the original message with a broadcast request sent to the - * sequencer. The original bcast message is stored locally until a reply to - * bcast is received from the sequencer. This function has the side-effect - * of increasing the <code>localSeqID</code> - * - * @param msg the message to broadcast - */ - private void _sendBcastRequest(Message msg) { - _sendBcastRequest(msg, ++localSeqID); - } - - - /** - * Replace the original message with a broadcast request sent to the - * sequencer. The original bcast message is stored locally until a reply - * to bcast is received from the sequencer - * - * @param msg the message to broadcast - * @param id the local sequence ID to use - */ - private void _sendBcastRequest(Message msg, long id) { - - // i. Store away the message while waiting for the sequencer's reply - // ii. Send a bcast request immediatelly and also schedule a - // retransmission - synchronized(reqTbl) { - reqTbl.put(Long.valueOf(id), msg); - } - _transmitBcastRequest(id); - retransmitter.add(id, msg); - } - - - /** - * Send the bcast request with the given localSeqID - * - * @param seqID the local sequence id of the - */ - private void _transmitBcastRequest(long seqID) { - Message reqMsg; - - // i. If NULL_STATE, then ignore, just transient state before - // shutting down the retransmission thread - // ii. If blocked, be patient - reschedule - // iii. If the request is not pending any more, acknowledge it - // iv. Create a broadcast request and send it to the sequencer - - if(state == NULL_STATE) { - if(log.isInfoEnabled()) log.info(ExternalStrings.TOTAL_TRANSMIT_BCAST_REQ_0__IN_NULL_STATE, seqID); - return; - } - if(state == BLOCK) return; - - synchronized(reqTbl) { - if(!reqTbl.containsKey(Long.valueOf(seqID))) { - retransmitter.ack(seqID); - return; - } - } - reqMsg=new Message(sequencerAddr, addr, new byte[0]); - reqMsg.putHeader(getName(), new Header(Header.REQ, seqID, NULL_ID)); - - passDown(new Event(Event.MSG, reqMsg)); - } - - - /** - * Receive a unicast message: Remove the <code>UCAST</code> header - * - * @param msg the received unicast message - */ - private void _recvUcast(Message msg) { - msg.removeHeader(getName()); - } - - /** - * Receive a broadcast message: Put it in the pending up queue and then - * try to deliver above as many messages as possible - * - * @param msg the received broadcast message - */ - private void _recvBcast(Message msg) { - Header header=(Header)msg.getHeader(getName()); - - // i. Put the message in the up pending queue only if it's not - // already there, as it seems that the event may be received - // multiple times before a view change when all members are - // negotiating a common set of stable msgs - // - // ii. Deliver as many messages as possible - - synchronized(upTbl) { - if(header.sequenceID <= seqID) - return; - upTbl.put(Long.valueOf(header.sequenceID), msg); - } - - _deliverBcast(); - } - - - /** - * Received a bcast request - Ignore if not the sequencer, else send a - * bcast reply - * - * @param msg the broadcast request message - */ - private void _recvBcastRequest(Message msg) { - Header header; - Message repMsg; - - // i. If blocked, discard the bcast request - // ii. Assign a seqID to the message and send it back to the requestor - - if(!addr.equals(sequencerAddr)) { - if(log.isErrorEnabled()) - log.error("Received bcast request " + - "but not a sequencer"); - return; - } - if(state == BLOCK) { - if(log.isInfoEnabled()) log.info(ExternalStrings.TOTAL_BLOCKED_DISCARD_BCAST_REQ); - return; - } - header=(Header)msg.getHeader(getName()); - ++sequencerSeqID; - repMsg=new Message(msg.getSrc(), addr, new byte[0]); - repMsg.putHeader(getName(), new Header(Header.REP, header.localSequenceID, - sequencerSeqID)); - - passDown(new Event(Event.MSG, repMsg)); - } - - - /** - * Received a bcast reply - Match with the pending bcast request and move - * the message in the list of messages to be delivered above - * - * @param header the header of the bcast reply - */ - private void _recvBcastReply(Header header) { - Message msg; - long id; - - // i. If blocked, discard the bcast reply - // - // ii. Assign the received seqID to the message and broadcast it - // - // iii. - // - Acknowledge the message to the retransmitter - // - If non-existent BCAST_REQ, send a fake bcast to avoid seqID gaps - // - If localID == NULL_ID, it's a null BCAST, else normal BCAST - // - Set the seq ID of the message to the one sent by the sequencer - - if(state == BLOCK) { - if(log.isInfoEnabled()) log.info(ExternalStrings.TOTAL_BLOCKED_DISCARD_BCAST_REP); - return; - } - - synchronized(reqTbl) { - msg=(Message)reqTbl.remove(Long.valueOf(header.localSequenceID)); - } - - if(msg != null) { - retransmitter.ack(header.localSequenceID); - id=header.localSequenceID; - } - else { - if(log.isInfoEnabled()) - log.info("Bcast reply to " + - "non-existent BCAST_REQ[" + header.localSequenceID + - "], Sending NULL bcast"); - id=NULL_ID; - msg=new Message(null, addr, new byte[0]); - } - msg.putHeader(getName(), new Header(Header.BCAST, id, header.sequenceID)); - - passDown(new Event(Event.MSG, msg)); - } - - - /** - * Resend the bcast request with the given localSeqID - * - * @param seqID the local sequence id of the - */ - protected/*GemStoneAddition*/ void _retransmitBcastRequest(long seqID) { - // *** Get a shared lock - try { - stateLock.readLock().acquire(); - try { - if(log.isInfoEnabled()) log.info(ExternalStrings.TOTAL_RETRANSMIT_BCAST_REQ_0, seqID); - _transmitBcastRequest(seqID); - } - finally { - stateLock.readLock().release(); - } - } - catch(InterruptedException e) { - Thread.currentThread().interrupt(); // GemStoneAddition - log.error(ExternalStrings.TOTAL_FAILED_ACQUIRING_A_READ_LOCK, e); - } - } - - - /* Up event handlers - * If the return value is true the event travels further up the stack - * else it won't be forwarded - */ - - /** - * Prepare for a VIEW_CHANGE: switch to flushing state - * - * @return true if the event is to be forwarded further up - */ - private boolean _upBlock() { - // *** Get an exclusive lock - try { - stateLock.writeLock().acquire(); - try { - state=FLUSH; - // *** Revoke the exclusive lock - } - finally { - stateLock.writeLock().release(); - } - } - catch(InterruptedException e) { - Thread.currentThread().interrupt(); // GemStoneAddition - log.error(ExternalStrings.TOTAL_FAILED_ACQUIRING_THE_WRITE_LOCK, e); - } - - return (true); - } - - - /** - * Handle an up MSG event - * - * @param event the MSG event - * @return true if the event is to be forwarded further up - */ - private boolean _upMsg(Event event) { - Message msg; - Object obj; - Header header; - - // *** Get a shared lock - try { - stateLock.readLock().acquire(); - try { - - // If NULL_STATE, shouldn't receive any msg on the up queue! - if(state == NULL_STATE) { - if(log.isErrorEnabled()) log.error(ExternalStrings.TOTAL_UP_MSG_IN_NULL_STATE); - return (false); - } - - // Peek the header: - // - // (UCAST) A unicast message - Send up the stack - // (BCAST) A broadcast message - Handle specially - // (REQ) A broadcast request - Handle specially - // (REP) A broadcast reply from the sequencer - Handle specially - msg=(Message)event.getArg(); - if(!((obj=msg.getHeader(getName())) instanceof TOTAL.Header)) { - if(log.isErrorEnabled()) log.error(ExternalStrings.TOTAL_NO_TOTALHEADER_FOUND); - return (false); - } - header=(Header)obj; - - switch(header.type) { - case Header.UCAST: - _recvUcast(msg); - return (true); - case Header.BCAST: - _recvBcast(msg); - return (false); - case Header.REQ: - _recvBcastRequest(msg); - return (false); - case Header.REP: - _recvBcastReply(header); - return (false); - default: - if(log.isErrorEnabled()) log.error(ExternalStrings.TOTAL_UNKNOWN_HEADER_TYPE); - return (false); - } - - // ** Revoke the shared lock - } - finally { - stateLock.readLock().release(); - } - } - catch(InterruptedException e) { - Thread.currentThread().interrupt(); // GemStoneAddition - if(log.isErrorEnabled()) log.error(e.getMessage()); - } - - return (true); - } - - - /** - * Set the address of this group member - * - * @param event the SET_LOCAL_ADDRESS event - * @return true if event should be forwarded further up - */ - private boolean _upSetLocalAddress(Event event) { - // *** Get an exclusive lock - try { - stateLock.writeLock().acquire(); - try { - addr=(Address)event.getArg(); - } - finally { - stateLock.writeLock().release(); - } - } - catch(InterruptedException e) { - Thread.currentThread().interrupt(); - log.error(e.getMessage()); - } - return (true); - } - - - /** - * Handle view changes - * <p/> - * param event the VIEW_CHANGE event - * - * @return true if the event should be forwarded to the layer above - */ - private boolean _upViewChange(Event event) { - Object oldSequencerAddr; - - // *** Get an exclusive lock - try { - stateLock.writeLock().acquire(); - try { - - state=RUN; - - // i. See if this member is the sequencer - // ii. If this is the sequencer, reset the sequencer's sequence ID - // iii. Reset the last received sequence ID - // - // iv. Replay undelivered bcasts: Put all the undelivered bcasts - // sent by us back to the req queue and discard the rest - oldSequencerAddr=sequencerAddr; - sequencerAddr= - (Address)((View)event.getArg()).getMembers().elementAt(0); - if(addr.equals(sequencerAddr)) { - sequencerSeqID=NULL_ID; - if((oldSequencerAddr == null) || - (!addr.equals(oldSequencerAddr))) - if(log.isInfoEnabled()) log.info(ExternalStrings.TOTAL_IM_THE_NEW_SEQUENCER); - } - seqID=NULL_ID; - _replayBcast(); - - // *** Revoke the exclusive lock - } - finally { - stateLock.writeLock().release(); - } - } - catch(InterruptedException e) { - Thread.currentThread().interrupt(); // GemStoneAddition - log.error(e.getMessage()); - } - - return (true); - } - - - /* - * Down event handlers - * If the return value is true the event travels further down the stack - * else it won't be forwarded - */ - - - /** - * Blocking confirmed - No messages should come from above until a - * VIEW_CHANGE event is received. Switch to blocking state. - * - * @return true if event should travel further down - */ - private boolean _downBlockOk() { - // *** Get an exclusive lock - try { - stateLock.writeLock().acquire(); - try { - state=BLOCK; - } - finally { - stateLock.writeLock().release(); - } - } - catch(InterruptedException e) { - Thread.currentThread().interrupt(); // GemStoneAddition - log.error(e.getMessage()); - } - - return (true); - } - - - /** - * A MSG event travelling down the stack. Forward unicast messages, treat - * specially the broadcast messages.<br> - * <p/> - * If in <code>BLOCK</code> state, i.e. it has replied to a - * <code>BLOCk_OK</code> and hasn't yet received a - * <code>VIEW_CHANGE</code> event, messages are discarded<br> - * <p/> - * If in <code>FLUSH</code> state, forward unicast but queue broadcasts - * - * @param event the MSG event - * @return true if event should travel further down - */ - private boolean _downMsg(Event event) { - Message msg; - - // *** Get a shared lock - try { - stateLock.readLock().acquire(); - try { - - // i. Discard all msgs, if in NULL_STATE - // ii. Discard all msgs, if blocked - if(state == NULL_STATE) { - if(log.isErrorEnabled()) log.error(ExternalStrings.TOTAL_DISCARD_MSG_IN_NULL_STATE); - return (false); - } - if(state == BLOCK) { - if(log.isErrorEnabled()) log.error(ExternalStrings.TOTAL_BLOCKED_DISCARD_MSG); - return (false); - } - - msg=(Message)event.getArg(); - if(msg.getDest() == null) { - _sendBcastRequest(msg); - return (false); - } - else { - msg=_sendUcast(msg); - event.setArg(msg); - } - - // ** Revoke the shared lock - } - finally { - stateLock.readLock().release(); - } - } - catch(InterruptedException e) { - Thread.currentThread().interrupt(); // GemStoneAddition - log.error(e.getMessage()); - } - - return (true); - } - - - /** - * Prepare this layer to receive messages from above - */ - @Override // GemStoneAddition - public void start() throws Exception { - TimeScheduler timer; - - timer=stack != null ? stack.timer : null; - if(timer == null) - throw new Exception("TOTAL.start(): timer is null"); - - reqTbl=new TreeMap(); - upTbl=new TreeMap(); - retransmitter=new AckSenderWindow(new Command(), AVG_RETRANSMIT_INTERVAL); - } - - - /** - * Handle the stop() method travelling down the stack. - * <p/> - * The local addr is set to null, since after a Start->Stop->Start - * sequence this member's addr is not guaranteed to be the same - */ - @Override // GemStoneAddition - public void stop() { - try { - stateLock.writeLock().acquire(); - try { - state=NULL_STATE; - retransmitter.reset(); - reqTbl.clear(); - upTbl.clear(); - addr=null; - } - finally { - stateLock.writeLock().release(); - } - } - catch(InterruptedException e) { - Thread.currentThread().interrupt(); // GemStoneAddition - log.error(e.getMessage()); - } - } - - - /** - * Process an event coming from the layer below - * - * @param event the event to process - */ - private void _up(Event event) { - switch(event.getType()) { - case Event.BLOCK: - if(!_upBlock()) return; - break; - case Event.MSG: - if(!_upMsg(event)) return; - break; - case Event.SET_LOCAL_ADDRESS: - if(!_upSetLocalAddress(event)) return; - break; - case Event.VIEW_CHANGE: - if(!_upViewChange(event)) return; - break; - default: - break; - } - - passUp(event); - } - - - /** - * Process an event coming from the layer above - * - * @param event the event to process - */ - private void _down(Event event) { - switch(event.getType()) { - case Event.BLOCK_OK: - if(!_downBlockOk()) return; - break; - case Event.MSG: - if(!_downMsg(event)) return; - break; - default: - break; - } - - passDown(event); - } - - - /** - * Create the TOTAL layer - */ - public TOTAL() { - } - - - // Methods deriving from <code>Protocol</code> - // javadoc inherited from superclass - @Override // GemStoneAddition - public String getName() { - return (_getName()); - } - - // javadoc inherited from superclass - @Override // GemStoneAddition - public boolean setProperties(Properties properties) { - return (_setProperties(properties)); - } - - // javadoc inherited from superclass - @Override // GemStoneAddition - public Vector requiredDownServices() { - return (_requiredDownServices()); - } - - // javadoc inherited from superclass - @Override // GemStoneAddition - public Vector requiredUpServices() { - return (_requiredUpServices()); - } - - // javadoc inherited from superclass - @Override // GemStoneAddition - public void up(Event event) { - _up(event); - } - - // javadoc inherited from superclass - @Override // GemStoneAddition - public void down(Event event) { - _down(event); - } -}