No, all three of the zk servers are being passed in the connection string. I’m having not trouble with my LeaderSelector failing over properly. But when I added a simple SharedValue, the EventThread goes crazy. That is why I created this simple Test class. I have no idea what I’m doing wrong with this SharedValue.
Curator 3.2.0, ZK 3.5.1-alpha I am moving the CuratorFramework client to a single connection per application, like you told me to earlier. So I have been doing some refactoring. Unfortunately, the SharedValue will not work. At least, I cannot get it to work. I guess tomorrow, I will try to roll back a few versions in Curator unless someone has a better idea. Thank you, Curtis From: Jordan Zimmerman [mailto:[email protected]] Sent: Tuesday, July 19, 2016 3:41 PM To: [email protected] Subject: Re: Help with SharedValue FailOver My guess is that you’re only passing one of the ZK instances in the connection string. You should be passing them all. A typical connection string is: “10.2.3.4:2181,10.2.3.5:2181,10.2.3.6:2181" On Jul 19, 2016, at 2:36 PM, Cantrell, Curtis <[email protected]<mailto:[email protected]>> wrote: package com.bkfs.curator; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.imps.CuratorFrameworkState; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.utils.CloseableUtils; import com.lps.mg.log.DeferLogger; import com.lps.mg.utils.MessageResources; import com.lps.mg.utils.StringUtils; public class CuratorClient implements ConnectionStateListener { /** Log4J Logger for this class */ private final static DeferLogger logger = DeferLogger.getLogger(CuratorClient.class); private static DeferLogger alertLog = DeferLogger.getLogger("SpectrumAlert"); protected static MessageResources messages = MessageResources.getInstance(); /** ZooKeeper properties in the config.properties file*/ protected static final String ZOOKEEPER_CONN = "zookeeper.connection.string"; protected static final String CONFIG = "config"; /** An internal reference to this singleton */ private static CuratorClient instance = null; /** The connection String that Curator uses to connect to zookeeper. (e.g. 10.48.136.128:2181) */ private String connectionString = null; /** The Zookeeper connection */ private CuratorFramework client = null; protected static int SESSION_TIMEOUT = 30000; protected static int CONNECTION_TIMEOUT = 15000; protected static int RETRY_INTERVAL = 2000; /** Have I been Closed */ private boolean closed = false; /** * Private Constructor for Singleton * @throws Exception if no Connection could be established with zookeeper after 15 seconds. */ protected CuratorClient() throws RuntimeException { connectClient(); } private void connectClient() throws RuntimeException { try { if (connectionString == null) { connectionString = messages.getRawMessage(CONFIG, ZOOKEEPER_CONN); } if (StringUtils.isEmpty(connectionString)) { logger.error("<Fatal> Error zookeeper.connection.string property not defined in config properties file"); alertLog.error("<Fatal> Error zookeeper.connection.string property not defined in config properties file"); } client = CuratorFrameworkFactory.newClient(connectionString, SESSION_TIMEOUT, CONNECTION_TIMEOUT, newRetryForeverPolicy(60000,60000,RETRY_INTERVAL)); client.getConnectionStateListenable().addListener(this); client.start(); //hold out here until we are connected client.getZookeeperClient().blockUntilConnectedOrTimedOut(); if (client.getState() != CuratorFrameworkState.STARTED) { logger.error("<Fatal> Could not establish a connection to zookeeper after Connection Timeout: " +CONNECTION_TIMEOUT); throw new Exception("zookeeper startup timed out"); } if (logger.isDebugEnabled()) logger.debug("Successfully Establised a Connection with Zookeeper."); closed = false; } catch (Throwable t) { logger.error("Problem Creating Curator Client", t); throw new RuntimeException("Problem Creating Connection with Zookeeper", t); } } public CuratorFramework getClient() { if (!closed) { return client; } else { connectClient(); return client; } } /** * Should Only be Called by the Container when the Application is Shutdown */ public void Close() { CloseableUtils.closeQuietly(client); client = null; closed = true; logger<http://logger.info/>.info<http://logger.info/>("Closed...."); } /** * Gets access to the Singleton Instance. * @return The CuratorClient * @throws Exception if the Client cannot be created */ public static CuratorClient getInstance() { if (instance == null) { synchronized(logger) { if (instance == null) { //Use a temporary pool variable and set it at last to the static "pool" object to avoid thread contention. instance = new CuratorClient(); if (logger.isDebugEnabled()) logger.debug("Created new Instace of the CuratorClient that is used by the application"); } } } return instance; } @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { if (newState == ConnectionState.CONNECTED) { logger<http://logger.info/>.info<http://logger.info/>("Zookeeper Connection State: CONNECTED"); } else if (newState == ConnectionState.RECONNECTED) { logger<http://logger.info/>.info<http://logger.info/>("Zookeeper Connection State: RECONNECTED"); } else if (newState == ConnectionState.SUSPENDED) { logger<http://logger.info/>.info<http://logger.info/>("Zookeeper Connection State: SUSPENDED"); } else if (newState == ConnectionState.LOST) { logger.error("Zookeeper Connection State: LOST"); } } } From: Jordan Zimmerman [mailto:[email protected]] Sent: Tuesday, July 19, 2016 3:13 PM To: [email protected]<mailto:[email protected]> Subject: Re: Help with SharedValue FailOver Can you post the code for CuratorClient.getInstance()? On Jul 19, 2016, at 2:04 PM, Cantrell, Curtis <[email protected]<mailto:[email protected]>> wrote: I’m a little lost with using the SharedValue. I have upgraded to Curator 3.2.0. When I start the class, the sharedValue connects to ZK and reads the value. If I update the value from another client, it is read ok. But I cannot get the SharedValue to failover to another zk server. I have registered a ValueChangeListener, and when I kill the zookeeper that it is connected to, I can see the stateChanged is invoked and “Connection Iteruption” is logged. I think my biggest question is Why do I never get a RECONNECTED. Nor will the sharedValue receive updates from zookeeper anylonger. The background EventThread simple is hung retrying….. (This is my own RetryForeverPolicy so I can log inside of it) Here is the log output… <Jul 19, 2016 14:50:55 PM EDT><INFO><Curator-ConnectionStateManager-0> com.bkfs.curator.CuratorClient : Zookeeper Connection State: SUSPENDED <Jul 19, 2016 14:50:55 PM EDT><DEBUG><Curator-ConnectionStateManager-0> com.bkfs.mg.rules.TestSharedValue : Connection Interuption <Jul 19, 2016 14:51:25 PM EDT><DEBUG><Default : 2-EventThread> com.bkfs.curator.RetryForeverPolicy : Retrying Again. Count: 0, ElapsedTimeMs: 37 <Jul 19, 2016 14:51:27 PM EDT><DEBUG><Default : 2-EventThread> com.bkfs.curator.RetryForeverPolicy : Retrying Again. Count: 1, ElapsedTimeMs: 2038 <Jul 19, 2016 14:51:29 PM EDT><DEBUG><Default : 2-EventThread> com.bkfs.curator.RetryForeverPolicy : Retrying Again. Count: 2, ElapsedTimeMs: 4038 <Jul 19, 2016 14:51:31 PM EDT><DEBUG><Default : 2-EventThread> com.bkfs.curator.RetryForeverPolicy : Retrying Again. Count: 3, ElapsedTimeMs: 6076 <Jul 19, 2016 14:51:33 PM EDT><DEBUG><Default : 2-EventThread> com.bkfs.curator.RetryForeverPolicy : Retrying Again. Count: 4, ElapsedTimeMs: 8047 <Jul 19, 2016 14:51:35 PM EDT><DEBUG><Default : 2-EventThread> com.bkfs.curator.RetryForeverPolicy : Retrying Again. Count: 5, ElapsedTimeMs: 10107 <Jul 19, 2016 14:51:37 PM EDT><DEBUG><Default : 2-EventThread> com.bkfs.curator.RetryForeverPolicy : Retrying Again. Count: 6, ElapsedTimeMs: 12078 <Jul 19, 2016 14:51:39 PM EDT><DEBUG><Default : 2-EventThread> com.bkfs.curator.RetryForeverPolicy : Retrying Again. Count: 7, ElapsedTimeMs: 14139 Here is my test code.. package com.bkfs.mg.rules; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.shared.SharedValue; import org.apache.curator.framework.recipes.shared.SharedValueListener; import org.apache.curator.framework.recipes.shared.SharedValueReader; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.utils.CloseableUtils; import com.bkfs.curator.CuratorClient; import com.lps.mg.log.DeferLogger; public class TestSharedValue { private final static DeferLogger logger = DeferLogger.getLogger(TestSharedValue.class); private SharedValue sharedUpdateValue; private static final String SHARED_PATH = "/sharedValues/enginePool/lastUpdate"; private long lastUpdated = 0; public void close() { CloseableUtils.closeQuietly(sharedUpdateValue); sharedUpdateValue = null; } public TestSharedValue() { CuratorFramework client = CuratorClient.getInstance().getClient(); sharedUpdateValue = new SharedValue(client, SHARED_PATH, "0".toString().getBytes()); sharedUpdateValue.getListenable().addListener(new ValueChangedListener()); try { sharedUpdateValue.start(); } catch (Exception e) { logger.error("There was a problem Starting the sharedValue after a Reconnect.",e); } String value = new String(sharedUpdateValue.getValue()); lastUpdated = Long.valueOf(value); } public long getLastUpdated() { return lastUpdated; } public class ValueChangedListener implements SharedValueListener { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { if ( client.getConnectionStateErrorPolicy().isErrorState(newState) ) { logger.debug("Connection Interuption"); } else if (newState == ConnectionState.RECONNECTED) { logger.debug("Connection Restored"); } } @Override public void valueHasChanged(SharedValueReader valueReader, byte[] newValue) throws Exception { String value = new String(newValue); long newLong = new Long(value).longValue(); if (getLastUpdated() != newLong) { TestSharedValue.this.lastUpdated = newLong; } } } } Thank you, Curtis The information contained in this message is proprietary and/or confidential. If you are not the intended recipient, please: (i) delete the message and all copies; (ii) do not disclose, distribute or use the message in any manner; and (iii) notify the sender immediately. In addition, please be aware that any message addressed to our domain is subject to archiving and review by persons other than the intended recipient. Thank you. The information contained in this message is proprietary and/or confidential. If you are not the intended recipient, please: (i) delete the message and all copies; (ii) do not disclose, distribute or use the message in any manner; and (iii) notify the sender immediately. In addition, please be aware that any message addressed to our domain is subject to archiving and review by persons other than the intended recipient. Thank you. The information contained in this message is proprietary and/or confidential. If you are not the intended recipient, please: (i) delete the message and all copies; (ii) do not disclose, distribute or use the message in any manner; and (iii) notify the sender immediately. In addition, please be aware that any message addressed to our domain is subject to archiving and review by persons other than the intended recipient. Thank you.
