No, all three of the zk servers are being passed in the connection string.    
I’m having not trouble with my LeaderSelector failing over properly.   But when 
I added a simple SharedValue, the EventThread goes crazy.   That is why I 
created this simple Test class.   I have no idea what I’m doing wrong with this 
SharedValue.

Curator 3.2.0, ZK 3.5.1-alpha     I am moving the CuratorFramework client to a 
single connection per application, like you told me to earlier.    So I have 
been doing some refactoring.

Unfortunately, the SharedValue will not work.   At least, I cannot get it to 
work.

I guess tomorrow, I will try to roll back a few versions in Curator unless 
someone has a better idea.

Thank you,
Curtis


From: Jordan Zimmerman [mailto:[email protected]]
Sent: Tuesday, July 19, 2016 3:41 PM
To: [email protected]
Subject: Re: Help with SharedValue FailOver

My guess is that you’re only passing one of the ZK instances in the connection 
string. You should be passing them all. A typical connection string is:

            “10.2.3.4:2181,10.2.3.5:2181,10.2.3.6:2181"

On Jul 19, 2016, at 2:36 PM, Cantrell, Curtis 
<[email protected]<mailto:[email protected]>> wrote:

package com.bkfs.curator;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.utils.CloseableUtils;

import com.lps.mg.log.DeferLogger;
import com.lps.mg.utils.MessageResources;
import com.lps.mg.utils.StringUtils;

public class CuratorClient implements ConnectionStateListener {

      /** Log4J Logger for this class     */
      private final static DeferLogger logger = 
DeferLogger.getLogger(CuratorClient.class);
      private static DeferLogger alertLog = 
DeferLogger.getLogger("SpectrumAlert");

      protected static MessageResources messages = 
MessageResources.getInstance();

      /** ZooKeeper properties in the config.properties file*/
      protected static final String ZOOKEEPER_CONN = 
"zookeeper.connection.string";

      protected static final String CONFIG         = "config";

      /** An internal reference to this singleton */
      private static CuratorClient instance = null;


      /** The connection String that Curator uses to connect to zookeeper. 
(e.g. 10.48.136.128:2181) */
      private String connectionString = null;

      /** The Zookeeper connection */
      private CuratorFramework client = null;

      protected static int SESSION_TIMEOUT = 30000;
      protected static int CONNECTION_TIMEOUT = 15000;
      protected static int RETRY_INTERVAL = 2000;

      /** Have I been Closed */
      private boolean closed = false;

      /**
      * Private Constructor for Singleton
      * @throws Exception if no Connection could be established with zookeeper 
after 15 seconds.
       */
      protected CuratorClient() throws RuntimeException  {
            connectClient();
      }


      private void connectClient() throws RuntimeException {
            try {
                  if (connectionString == null) {
                        connectionString = messages.getRawMessage(CONFIG, 
ZOOKEEPER_CONN);
                  }

                  if (StringUtils.isEmpty(connectionString))      {
                        logger.error("<Fatal> Error zookeeper.connection.string 
property not defined in config properties file");
                        alertLog.error("<Fatal> Error 
zookeeper.connection.string property not defined in config properties file");
                  }

                  client = CuratorFrameworkFactory.newClient(connectionString, 
SESSION_TIMEOUT, CONNECTION_TIMEOUT, 
newRetryForeverPolicy(60000,60000,RETRY_INTERVAL));
                  client.getConnectionStateListenable().addListener(this);
                  client.start();

                  //hold out here until we are connected
                  client.getZookeeperClient().blockUntilConnectedOrTimedOut();
                  if (client.getState() != CuratorFrameworkState.STARTED) {
                        logger.error("<Fatal> Could not establish a connection 
to zookeeper after Connection Timeout: " +CONNECTION_TIMEOUT);
                        throw new Exception("zookeeper startup timed out");
                  }

                  if (logger.isDebugEnabled()) logger.debug("Successfully 
Establised a Connection with Zookeeper.");

                  closed = false;

            } catch (Throwable t) {
                  logger.error("Problem Creating Curator Client", t);
                  throw new RuntimeException("Problem Creating Connection with 
Zookeeper", t);
            }
      }



      public CuratorFramework getClient() {
            if (!closed) {
                  return client;
            } else {
                  connectClient();
                  return client;
            }
      }

      /**
      * Should Only be Called by the Container when the Application is Shutdown
      */
      public void Close() {
            CloseableUtils.closeQuietly(client);
            client = null;
            closed = true;
            logger<http://logger.info/>.info<http://logger.info/>("Closed....");
      }



      /**
      * Gets access to the Singleton Instance.
      * @return The CuratorClient
      * @throws Exception if the Client cannot be created
      */
      public static CuratorClient getInstance()  {

            if (instance == null) {

                  synchronized(logger) {

                        if (instance == null) {

                              //Use a temporary pool variable and set it at 
last to the static "pool" object to avoid thread contention.
                              instance = new CuratorClient();
                              if (logger.isDebugEnabled())  
logger.debug("Created new Instace of the CuratorClient that is used by the 
application");
                        }
                  }

            }
            return instance;
      }


      @Override
      public void stateChanged(CuratorFramework client, ConnectionState 
newState) {

            if (newState == ConnectionState.CONNECTED) {
                  
logger<http://logger.info/>.info<http://logger.info/>("Zookeeper Connection 
State: CONNECTED");
            }
            else if (newState == ConnectionState.RECONNECTED) {

                  
logger<http://logger.info/>.info<http://logger.info/>("Zookeeper Connection 
State: RECONNECTED");
            }
            else if (newState == ConnectionState.SUSPENDED) {
                  
logger<http://logger.info/>.info<http://logger.info/>("Zookeeper Connection 
State: SUSPENDED");
            }
            else if (newState == ConnectionState.LOST) {
                  logger.error("Zookeeper Connection State: LOST");
            }
      }


}


From: Jordan Zimmerman [mailto:[email protected]]
Sent: Tuesday, July 19, 2016 3:13 PM
To: [email protected]<mailto:[email protected]>
Subject: Re: Help with SharedValue FailOver

Can you post the code for CuratorClient.getInstance()?

On Jul 19, 2016, at 2:04 PM, Cantrell, Curtis 
<[email protected]<mailto:[email protected]>> wrote:

I’m a little lost with using the SharedValue.     I have upgraded to Curator 
3.2.0.

When I start the class, the sharedValue connects to ZK and reads the value.    
If I update the value from another client, it is read ok.   But I cannot get 
the SharedValue to failover to another zk server.

I have registered a ValueChangeListener, and when I kill the zookeeper that it 
is connected to, I can see the stateChanged is invoked and “Connection 
Iteruption” is logged.

I think my biggest question is Why do I never get a RECONNECTED.  Nor will the 
sharedValue receive updates from zookeeper anylonger.

The background EventThread simple is hung retrying…..   (This is my own 
RetryForeverPolicy so I can log inside of it)

Here is the log output…

<Jul 19, 2016 14:50:55 PM EDT><INFO><Curator-ConnectionStateManager-0> 
com.bkfs.curator.CuratorClient : Zookeeper Connection State: SUSPENDED
<Jul 19, 2016 14:50:55 PM EDT><DEBUG><Curator-ConnectionStateManager-0> 
com.bkfs.mg.rules.TestSharedValue : Connection Interuption
<Jul 19, 2016 14:51:25 PM EDT><DEBUG><Default : 2-EventThread> 
com.bkfs.curator.RetryForeverPolicy : Retrying Again. Count: 0, ElapsedTimeMs: 
37
<Jul 19, 2016 14:51:27 PM EDT><DEBUG><Default : 2-EventThread> 
com.bkfs.curator.RetryForeverPolicy : Retrying Again. Count: 1, ElapsedTimeMs: 
2038
<Jul 19, 2016 14:51:29 PM EDT><DEBUG><Default : 2-EventThread> 
com.bkfs.curator.RetryForeverPolicy : Retrying Again. Count: 2, ElapsedTimeMs: 
4038
<Jul 19, 2016 14:51:31 PM EDT><DEBUG><Default : 2-EventThread> 
com.bkfs.curator.RetryForeverPolicy : Retrying Again. Count: 3, ElapsedTimeMs: 
6076
<Jul 19, 2016 14:51:33 PM EDT><DEBUG><Default : 2-EventThread> 
com.bkfs.curator.RetryForeverPolicy : Retrying Again. Count: 4, ElapsedTimeMs: 
8047
<Jul 19, 2016 14:51:35 PM EDT><DEBUG><Default : 2-EventThread> 
com.bkfs.curator.RetryForeverPolicy : Retrying Again. Count: 5, ElapsedTimeMs: 
10107
<Jul 19, 2016 14:51:37 PM EDT><DEBUG><Default : 2-EventThread> 
com.bkfs.curator.RetryForeverPolicy : Retrying Again. Count: 6, ElapsedTimeMs: 
12078
<Jul 19, 2016 14:51:39 PM EDT><DEBUG><Default : 2-EventThread> 
com.bkfs.curator.RetryForeverPolicy : Retrying Again. Count: 7, ElapsedTimeMs: 
14139

Here is my test code..

package com.bkfs.mg.rules;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.shared.SharedValue;
import org.apache.curator.framework.recipes.shared.SharedValueListener;
import org.apache.curator.framework.recipes.shared.SharedValueReader;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.utils.CloseableUtils;
import com.bkfs.curator.CuratorClient;
import com.lps.mg.log.DeferLogger;


public class TestSharedValue  {

      private final static DeferLogger logger = 
DeferLogger.getLogger(TestSharedValue.class);

      private SharedValue sharedUpdateValue;

      private static final String SHARED_PATH = 
"/sharedValues/enginePool/lastUpdate";

      private long lastUpdated = 0;

      public void close() {
            CloseableUtils.closeQuietly(sharedUpdateValue);
            sharedUpdateValue = null;
      }

      public TestSharedValue() {

            CuratorFramework client = CuratorClient.getInstance().getClient();

            sharedUpdateValue = new SharedValue(client, SHARED_PATH, 
"0".toString().getBytes());
            sharedUpdateValue.getListenable().addListener(new 
ValueChangedListener());

            try {
                  sharedUpdateValue.start();
            } catch (Exception e) {
                  logger.error("There was a problem Starting the sharedValue 
after a Reconnect.",e);
            }

            String value = new String(sharedUpdateValue.getValue());
            lastUpdated = Long.valueOf(value);
      }

      public long getLastUpdated() {
            return lastUpdated;
      }

      public class ValueChangedListener implements SharedValueListener {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState 
newState) {

                  if ( 
client.getConnectionStateErrorPolicy().isErrorState(newState) )   {
                        logger.debug("Connection Interuption");
                  } else
                  if (newState == ConnectionState.RECONNECTED) {
                        logger.debug("Connection Restored");
                  }
            }

            @Override
            public void valueHasChanged(SharedValueReader valueReader, byte[] 
newValue)      throws Exception {

                  String value = new String(newValue);
                  long newLong = new Long(value).longValue();

            if (getLastUpdated() != newLong) {
                  TestSharedValue.this.lastUpdated = newLong;
            }
            }
      }
}

Thank you,
Curtis



The information contained in this message is proprietary and/or confidential. 
If you are not the intended recipient, please: (i) delete the message and all 
copies; (ii) do not disclose, distribute or use the message in any manner; and 
(iii) notify the sender immediately. In addition, please be aware that any 
message addressed to our domain is subject to archiving and review by persons 
other than the intended recipient. Thank you.

The information contained in this message is proprietary and/or confidential. 
If you are not the intended recipient, please: (i) delete the message and all 
copies; (ii) do not disclose, distribute or use the message in any manner; and 
(iii) notify the sender immediately. In addition, please be aware that any 
message addressed to our domain is subject to archiving and review by persons 
other than the intended recipient. Thank you.

The information contained in this message is proprietary and/or confidential. 
If you are not the intended recipient, please: (i) delete the message and all 
copies; (ii) do not disclose, distribute or use the message in any manner; and 
(iii) notify the sender immediately. In addition, please be aware that any 
message addressed to our domain is subject to archiving and review by persons 
other than the intended recipient. Thank you.

Reply via email to