> encountered a problem

You haven't said what the problem is. Also, this is probably more appropriate 
for [email protected]

-Jordan

> On May 14, 2020, at 5:06 AM, dong ma <[email protected]> wrote:
> 
> hi,I recently encountered a problem. We use zookeeper to manage the status
> of some user jobs. For example, when a user job is converted from NEW to
> RUNNING state, we will update its `RUNNING` state to the / job / states / $
> {jobId} node. At the same time, we will also use PathChildrenCache to
> monitor the directory. We correspond to multiple zookeeper clusters to
> monitor the status of different clusters. The initialization code is as
> follows:
> 
> for(Map.Entry<String, String> zkEntry : zkStringMap.entrySet()) {
> CuratorFramework curator = CuratorFrameworkFactory.builder()
> .connectString(...)
> .sessionTimeoutMs(60000)
> .connectionTimeoutMs(15000)
> .retryPolicy(new ExponentialBackoffRetry(5000, 3))
> .namespace("job/states")
> .build();
> curator.start();
> 
> PathChildrenCache cache = new PathChildrenCache(curator, "/", true);
> 
> cache.getListenable().addListener(new JobStateListener(zkEntry.getKey(),
> stateHandler));
> 
> cache.start();
> }
> 
> 
> 
> 
> if  (node not exists) {
> 
> jobStateCurator.create().withMode(CreateMode.EPHEMERAL).forPath(jobPath,
> newJobStatus.toString().getBytes());
> } else {
>     jobStateCurator.setData().forPath(jobPath,
> newJobStatus.toString().getBytes(charset));
> }
> 
> if(node
> 
> } else {
> String finalJobPath = "/final-" + jobId.toString();
> LOG.info("Job switch state to final state {}", newJobStatus.toString());
> 
> jobStateCurator.create().forPath(finalJobPath,
> newJobStatus.toString().getBytes(charset));
> int retry = 0;
> Stat stat;
> while((stat = jobStateCurator.checkExists().forPath(finalJobPath)) == null)
> {
> LOG.info("check " + finalJobPath + " not exists, retry after 50 ms, times "
> + retry++);
> Thread.sleep(50);
> }
> 
> LOG.info("Success create zk Node in final state for " + jobId.toString() );
> jobStateCurator.close();
> }
> 
> if (!newJobStatus.isGloballyTerminalState()) {
> 
> LOG.info("Job switch state to {}", newJobStatus.toString());
> if (jobStateCurator.checkExists().forPath(jobPath) == null) {
> jobStateCurator.create().withMode(CreateMode.EPHEMERAL).forPath(jobPath,
> newJobStatus.toString().getBytes());
> } else {
> jobStateCurator.setData().forPath(jobPath,
> newJobStatus.toString().getBytes(charset));
> }
> } else {
> String finalJobPath = "/final-" + jobId.toString();
> LOG.info("Job switch state to final state {}", newJobStatus.toString());
> 
> jobStateCurator.create().forPath(finalJobPath,
> newJobStatus.toString().getBytes(charset));
> int retry = 0;
> Stat stat;
> while((stat = jobStateCurator.checkExists().forPath(finalJobPath)) == null)
> {
> LOG.info("check " + finalJobPath + " not exists, retry after 50 ms, times "
> + retry++);
> Thread.sleep(50);
> }
> 
> LOG.info("Success create zk Node in final state for " + jobId.toString() );
> jobStateCurator.close();
> }

Reply via email to