I'm a little lost with using the SharedValue. I have upgraded to Curator
3.2.0.
When I start the class, the sharedValue connects to ZK and reads the value.
If I update the value from another client, it is read ok. But I cannot get
the SharedValue to failover to another zk server.
I have registered a ValueChangeListener, and when I kill the zookeeper that it
is connected to, I can see the stateChanged is invoked and "Connection
Iteruption" is logged.
I think my biggest question is Why do I never get a RECONNECTED. Nor will the
sharedValue receive updates from zookeeper anylonger.
The background EventThread simple is hung retrying..... (This is my own
RetryForeverPolicy so I can log inside of it)
Here is the log output...
<Jul 19, 2016 14:50:55 PM EDT><INFO><Curator-ConnectionStateManager-0>
com.bkfs.curator.CuratorClient : Zookeeper Connection State: SUSPENDED
<Jul 19, 2016 14:50:55 PM EDT><DEBUG><Curator-ConnectionStateManager-0>
com.bkfs.mg.rules.TestSharedValue : Connection Interuption
<Jul 19, 2016 14:51:25 PM EDT><DEBUG><Default : 2-EventThread>
com.bkfs.curator.RetryForeverPolicy : Retrying Again. Count: 0, ElapsedTimeMs:
37
<Jul 19, 2016 14:51:27 PM EDT><DEBUG><Default : 2-EventThread>
com.bkfs.curator.RetryForeverPolicy : Retrying Again. Count: 1, ElapsedTimeMs:
2038
<Jul 19, 2016 14:51:29 PM EDT><DEBUG><Default : 2-EventThread>
com.bkfs.curator.RetryForeverPolicy : Retrying Again. Count: 2, ElapsedTimeMs:
4038
<Jul 19, 2016 14:51:31 PM EDT><DEBUG><Default : 2-EventThread>
com.bkfs.curator.RetryForeverPolicy : Retrying Again. Count: 3, ElapsedTimeMs:
6076
<Jul 19, 2016 14:51:33 PM EDT><DEBUG><Default : 2-EventThread>
com.bkfs.curator.RetryForeverPolicy : Retrying Again. Count: 4, ElapsedTimeMs:
8047
<Jul 19, 2016 14:51:35 PM EDT><DEBUG><Default : 2-EventThread>
com.bkfs.curator.RetryForeverPolicy : Retrying Again. Count: 5, ElapsedTimeMs:
10107
<Jul 19, 2016 14:51:37 PM EDT><DEBUG><Default : 2-EventThread>
com.bkfs.curator.RetryForeverPolicy : Retrying Again. Count: 6, ElapsedTimeMs:
12078
<Jul 19, 2016 14:51:39 PM EDT><DEBUG><Default : 2-EventThread>
com.bkfs.curator.RetryForeverPolicy : Retrying Again. Count: 7, ElapsedTimeMs:
14139
Here is my test code..
package com.bkfs.mg.rules;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.shared.SharedValue;
import org.apache.curator.framework.recipes.shared.SharedValueListener;
import org.apache.curator.framework.recipes.shared.SharedValueReader;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.utils.CloseableUtils;
import com.bkfs.curator.CuratorClient;
import com.lps.mg.log.DeferLogger;
public class TestSharedValue {
private final static DeferLogger logger =
DeferLogger.getLogger(TestSharedValue.class);
private SharedValue sharedUpdateValue;
private static final String SHARED_PATH =
"/sharedValues/enginePool/lastUpdate";
private long lastUpdated = 0;
public void close() {
CloseableUtils.closeQuietly(sharedUpdateValue);
sharedUpdateValue = null;
}
public TestSharedValue() {
CuratorFramework client = CuratorClient.getInstance().getClient();
sharedUpdateValue = new SharedValue(client, SHARED_PATH,
"0".toString().getBytes());
sharedUpdateValue.getListenable().addListener(new
ValueChangedListener());
try {
sharedUpdateValue.start();
} catch (Exception e) {
logger.error("There was a problem Starting the sharedValue
after a Reconnect.",e);
}
String value = new String(sharedUpdateValue.getValue());
lastUpdated = Long.valueOf(value);
}
public long getLastUpdated() {
return lastUpdated;
}
public class ValueChangedListener implements SharedValueListener {
@Override
public void stateChanged(CuratorFramework client, ConnectionState
newState) {
if (
client.getConnectionStateErrorPolicy().isErrorState(newState) ) {
logger.debug("Connection Interuption");
} else
if (newState == ConnectionState.RECONNECTED) {
logger.debug("Connection Restored");
}
}
@Override
public void valueHasChanged(SharedValueReader valueReader, byte[]
newValue) throws Exception {
String value = new String(newValue);
long newLong = new Long(value).longValue();
if (getLastUpdated() != newLong) {
TestSharedValue.this.lastUpdated = newLong;
}
}
}
}
Thank you,
Curtis
The information contained in this message is proprietary and/or confidential.
If you are not the intended recipient, please: (i) delete the message and all
copies; (ii) do not disclose, distribute or use the message in any manner; and
(iii) notify the sender immediately. In addition, please be aware that any
message addressed to our domain is subject to archiving and review by persons
other than the intended recipient. Thank you.