I'm a little lost with using the SharedValue.     I have upgraded to Curator 
3.2.0.

When I start the class, the sharedValue connects to ZK and reads the value.    
If I update the value from another client, it is read ok.   But I cannot get 
the SharedValue to failover to another zk server.

I have registered a ValueChangeListener, and when I kill the zookeeper that it 
is connected to, I can see the stateChanged is invoked and "Connection 
Iteruption" is logged.

I think my biggest question is Why do I never get a RECONNECTED.  Nor will the 
sharedValue receive updates from zookeeper anylonger.

The background EventThread simple is hung retrying.....   (This is my own 
RetryForeverPolicy so I can log inside of it)

Here is the log output...

<Jul 19, 2016 14:50:55 PM EDT><INFO><Curator-ConnectionStateManager-0> 
com.bkfs.curator.CuratorClient : Zookeeper Connection State: SUSPENDED
<Jul 19, 2016 14:50:55 PM EDT><DEBUG><Curator-ConnectionStateManager-0> 
com.bkfs.mg.rules.TestSharedValue : Connection Interuption
<Jul 19, 2016 14:51:25 PM EDT><DEBUG><Default : 2-EventThread> 
com.bkfs.curator.RetryForeverPolicy : Retrying Again. Count: 0, ElapsedTimeMs: 
37
<Jul 19, 2016 14:51:27 PM EDT><DEBUG><Default : 2-EventThread> 
com.bkfs.curator.RetryForeverPolicy : Retrying Again. Count: 1, ElapsedTimeMs: 
2038
<Jul 19, 2016 14:51:29 PM EDT><DEBUG><Default : 2-EventThread> 
com.bkfs.curator.RetryForeverPolicy : Retrying Again. Count: 2, ElapsedTimeMs: 
4038
<Jul 19, 2016 14:51:31 PM EDT><DEBUG><Default : 2-EventThread> 
com.bkfs.curator.RetryForeverPolicy : Retrying Again. Count: 3, ElapsedTimeMs: 
6076
<Jul 19, 2016 14:51:33 PM EDT><DEBUG><Default : 2-EventThread> 
com.bkfs.curator.RetryForeverPolicy : Retrying Again. Count: 4, ElapsedTimeMs: 
8047
<Jul 19, 2016 14:51:35 PM EDT><DEBUG><Default : 2-EventThread> 
com.bkfs.curator.RetryForeverPolicy : Retrying Again. Count: 5, ElapsedTimeMs: 
10107
<Jul 19, 2016 14:51:37 PM EDT><DEBUG><Default : 2-EventThread> 
com.bkfs.curator.RetryForeverPolicy : Retrying Again. Count: 6, ElapsedTimeMs: 
12078
<Jul 19, 2016 14:51:39 PM EDT><DEBUG><Default : 2-EventThread> 
com.bkfs.curator.RetryForeverPolicy : Retrying Again. Count: 7, ElapsedTimeMs: 
14139

Here is my test code..

package com.bkfs.mg.rules;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.shared.SharedValue;
import org.apache.curator.framework.recipes.shared.SharedValueListener;
import org.apache.curator.framework.recipes.shared.SharedValueReader;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.utils.CloseableUtils;
import com.bkfs.curator.CuratorClient;
import com.lps.mg.log.DeferLogger;


public class TestSharedValue  {

      private final static DeferLogger logger = 
DeferLogger.getLogger(TestSharedValue.class);

      private SharedValue sharedUpdateValue;

      private static final String SHARED_PATH = 
"/sharedValues/enginePool/lastUpdate";

      private long lastUpdated = 0;

      public void close() {
            CloseableUtils.closeQuietly(sharedUpdateValue);
            sharedUpdateValue = null;
      }

      public TestSharedValue() {

            CuratorFramework client = CuratorClient.getInstance().getClient();

            sharedUpdateValue = new SharedValue(client, SHARED_PATH, 
"0".toString().getBytes());
            sharedUpdateValue.getListenable().addListener(new 
ValueChangedListener());

            try {
                  sharedUpdateValue.start();
            } catch (Exception e) {
                  logger.error("There was a problem Starting the sharedValue 
after a Reconnect.",e);
            }

            String value = new String(sharedUpdateValue.getValue());
            lastUpdated = Long.valueOf(value);
      }

      public long getLastUpdated() {
            return lastUpdated;
      }

      public class ValueChangedListener implements SharedValueListener {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState 
newState) {

                  if ( 
client.getConnectionStateErrorPolicy().isErrorState(newState) )   {
                        logger.debug("Connection Interuption");
                  } else
                  if (newState == ConnectionState.RECONNECTED) {
                        logger.debug("Connection Restored");
                  }
            }

            @Override
            public void valueHasChanged(SharedValueReader valueReader, byte[] 
newValue)      throws Exception {

                  String value = new String(newValue);
                  long newLong = new Long(value).longValue();

            if (getLastUpdated() != newLong) {
                  TestSharedValue.this.lastUpdated = newLong;
            }
            }
      }
}

Thank you,
Curtis



The information contained in this message is proprietary and/or confidential. 
If you are not the intended recipient, please: (i) delete the message and all 
copies; (ii) do not disclose, distribute or use the message in any manner; and 
(iii) notify the sender immediately. In addition, please be aware that any 
message addressed to our domain is subject to archiving and review by persons 
other than the intended recipient. Thank you.

Reply via email to