Can you post the code for CuratorClient.getInstance()?
> On Jul 19, 2016, at 2:04 PM, Cantrell, Curtis <[email protected]>
> wrote:
>
> I’m a little lost with using the SharedValue. I have upgraded to Curator
> 3.2.0.
>
> When I start the class, the sharedValue connects to ZK and reads the value.
> If I update the value from another client, it is read ok. But I cannot get
> the SharedValue to failover to another zk server.
>
> I have registered a ValueChangeListener, and when I kill the zookeeper that
> it is connected to, I can see the stateChanged is invoked and “Connection
> Iteruption” is logged.
>
> I think my biggest question is Why do I never get a RECONNECTED. Nor will
> the sharedValue receive updates from zookeeper anylonger.
>
> The background EventThread simple is hung retrying….. (This is my own
> RetryForeverPolicy so I can log inside of it)
>
> Here is the log output…
>
> <Jul 19, 2016 14:50:55 PM EDT><INFO><Curator-ConnectionStateManager-0>
> com.bkfs.curator.CuratorClient : Zookeeper Connection State: SUSPENDED
> <Jul 19, 2016 14:50:55 PM EDT><DEBUG><Curator-ConnectionStateManager-0>
> com.bkfs.mg.rules.TestSharedValue : Connection Interuption
> <Jul 19, 2016 14:51:25 PM EDT><DEBUG><Default : 2-EventThread>
> com.bkfs.curator.RetryForeverPolicy : Retrying Again. Count: 0,
> ElapsedTimeMs: 37
> <Jul 19, 2016 14:51:27 PM EDT><DEBUG><Default : 2-EventThread>
> com.bkfs.curator.RetryForeverPolicy : Retrying Again. Count: 1,
> ElapsedTimeMs: 2038
> <Jul 19, 2016 14:51:29 PM EDT><DEBUG><Default : 2-EventThread>
> com.bkfs.curator.RetryForeverPolicy : Retrying Again. Count: 2,
> ElapsedTimeMs: 4038
> <Jul 19, 2016 14:51:31 PM EDT><DEBUG><Default : 2-EventThread>
> com.bkfs.curator.RetryForeverPolicy : Retrying Again. Count: 3,
> ElapsedTimeMs: 6076
> <Jul 19, 2016 14:51:33 PM EDT><DEBUG><Default : 2-EventThread>
> com.bkfs.curator.RetryForeverPolicy : Retrying Again. Count: 4,
> ElapsedTimeMs: 8047
> <Jul 19, 2016 14:51:35 PM EDT><DEBUG><Default : 2-EventThread>
> com.bkfs.curator.RetryForeverPolicy : Retrying Again. Count: 5,
> ElapsedTimeMs: 10107
> <Jul 19, 2016 14:51:37 PM EDT><DEBUG><Default : 2-EventThread>
> com.bkfs.curator.RetryForeverPolicy : Retrying Again. Count: 6,
> ElapsedTimeMs: 12078
> <Jul 19, 2016 14:51:39 PM EDT><DEBUG><Default : 2-EventThread>
> com.bkfs.curator.RetryForeverPolicy : Retrying Again. Count: 7,
> ElapsedTimeMs: 14139
>
> Here is my test code..
>
> package com.bkfs.mg.rules;
>
> import org.apache.curator.framework.CuratorFramework;
> import org.apache.curator.framework.recipes.shared.SharedValue;
> import org.apache.curator.framework.recipes.shared.SharedValueListener;
> import org.apache.curator.framework.recipes.shared.SharedValueReader;
> import org.apache.curator.framework.state.ConnectionState;
> import org.apache.curator.utils.CloseableUtils;
> import com.bkfs.curator.CuratorClient;
> import com.lps.mg.log.DeferLogger;
>
>
> public class TestSharedValue {
>
> private final static DeferLogger logger =
> DeferLogger.getLogger(TestSharedValue.class);
>
> private SharedValue sharedUpdateValue;
>
> private static final String SHARED_PATH =
> "/sharedValues/enginePool/lastUpdate";
>
> private long lastUpdated = 0;
>
> public void close() {
> CloseableUtils.closeQuietly(sharedUpdateValue);
> sharedUpdateValue = null;
> }
>
> public TestSharedValue() {
>
> CuratorFramework client = CuratorClient.getInstance().getClient();
>
> sharedUpdateValue = new SharedValue(client, SHARED_PATH,
> "0".toString().getBytes());
> sharedUpdateValue.getListenable().addListener(new
> ValueChangedListener());
>
> try {
> sharedUpdateValue.start();
> } catch (Exception e) {
> logger.error("There was a problem Starting the sharedValue
> after a Reconnect.",e);
> }
>
> String value = new String(sharedUpdateValue.getValue());
> lastUpdated = Long.valueOf(value);
> }
>
> public long getLastUpdated() {
> return lastUpdated;
> }
>
> public class ValueChangedListener implements SharedValueListener {
>
> @Override
> public void stateChanged(CuratorFramework client, ConnectionState
> newState) {
>
> if (
> client.getConnectionStateErrorPolicy().isErrorState(newState) ) {
> logger.debug("Connection Interuption");
> } else
> if (newState == ConnectionState.RECONNECTED) {
> logger.debug("Connection Restored");
> }
> }
>
> @Override
> public void valueHasChanged(SharedValueReader valueReader, byte[]
> newValue) throws Exception {
>
> String value = new String(newValue);
> long newLong = new Long(value).longValue();
>
> if (getLastUpdated() != newLong) {
> TestSharedValue.this.lastUpdated = newLong;
> }
> }
> }
> }
>
> Thank you,
> Curtis
>
>
>
> The information contained in this message is proprietary and/or confidential.
> If you are not the intended recipient, please: (i) delete the message and all
> copies; (ii) do not disclose, distribute or use the message in any manner;
> and (iii) notify the sender immediately. In addition, please be aware that
> any message addressed to our domain is subject to archiving and review by
> persons other than the intended recipient. Thank you.