hi,I recently encountered a problem. We use zookeeper to manage the status
of some user jobs. For example, when a user job is converted from NEW to
RUNNING state, we will update its `RUNNING` state to the / job / states / $
{jobId} node. At the same time, we will also use PathChildrenCache to
monitor the directory. We correspond to multiple zookeeper clusters to
monitor the status of different clusters. The initialization code is as
follows:
for(Map.Entry<String, String> zkEntry : zkStringMap.entrySet()) {
CuratorFramework curator = CuratorFrameworkFactory.builder()
.connectString(...)
.sessionTimeoutMs(60000)
.connectionTimeoutMs(15000)
.retryPolicy(new ExponentialBackoffRetry(5000, 3))
.namespace("job/states")
.build();
curator.start();
PathChildrenCache cache = new PathChildrenCache(curator, "/", true);
cache.getListenable().addListener(new JobStateListener(zkEntry.getKey(),
stateHandler));
cache.start();
}
if (node not exists) {
jobStateCurator.create().withMode(CreateMode.EPHEMERAL).forPath(jobPath,
newJobStatus.toString().getBytes());
} else {
jobStateCurator.setData().forPath(jobPath,
newJobStatus.toString().getBytes(charset));
}
if(node
} else {
String finalJobPath = "/final-" + jobId.toString();
LOG.info("Job switch state to final state {}", newJobStatus.toString());
jobStateCurator.create().forPath(finalJobPath,
newJobStatus.toString().getBytes(charset));
int retry = 0;
Stat stat;
while((stat = jobStateCurator.checkExists().forPath(finalJobPath)) == null)
{
LOG.info("check " + finalJobPath + " not exists, retry after 50 ms, times "
+ retry++);
Thread.sleep(50);
}
LOG.info("Success create zk Node in final state for " + jobId.toString() );
jobStateCurator.close();
}
if (!newJobStatus.isGloballyTerminalState()) {
LOG.info("Job switch state to {}", newJobStatus.toString());
if (jobStateCurator.checkExists().forPath(jobPath) == null) {
jobStateCurator.create().withMode(CreateMode.EPHEMERAL).forPath(jobPath,
newJobStatus.toString().getBytes());
} else {
jobStateCurator.setData().forPath(jobPath,
newJobStatus.toString().getBytes(charset));
}
} else {
String finalJobPath = "/final-" + jobId.toString();
LOG.info("Job switch state to final state {}", newJobStatus.toString());
jobStateCurator.create().forPath(finalJobPath,
newJobStatus.toString().getBytes(charset));
int retry = 0;
Stat stat;
while((stat = jobStateCurator.checkExists().forPath(finalJobPath)) == null)
{
LOG.info("check " + finalJobPath + " not exists, retry after 50 ms, times "
+ retry++);
Thread.sleep(50);
}
LOG.info("Success create zk Node in final state for " + jobId.toString() );
jobStateCurator.close();
}