> encountered a problem You haven't said what the problem is. Also, this is probably more appropriate for [email protected]
-Jordan > On May 14, 2020, at 5:06 AM, dong ma <[email protected]> wrote: > > hi,I recently encountered a problem. We use zookeeper to manage the status > of some user jobs. For example, when a user job is converted from NEW to > RUNNING state, we will update its `RUNNING` state to the / job / states / $ > {jobId} node. At the same time, we will also use PathChildrenCache to > monitor the directory. We correspond to multiple zookeeper clusters to > monitor the status of different clusters. The initialization code is as > follows: > > for(Map.Entry<String, String> zkEntry : zkStringMap.entrySet()) { > CuratorFramework curator = CuratorFrameworkFactory.builder() > .connectString(...) > .sessionTimeoutMs(60000) > .connectionTimeoutMs(15000) > .retryPolicy(new ExponentialBackoffRetry(5000, 3)) > .namespace("job/states") > .build(); > curator.start(); > > PathChildrenCache cache = new PathChildrenCache(curator, "/", true); > > cache.getListenable().addListener(new JobStateListener(zkEntry.getKey(), > stateHandler)); > > cache.start(); > } > > > > > if (node not exists) { > > jobStateCurator.create().withMode(CreateMode.EPHEMERAL).forPath(jobPath, > newJobStatus.toString().getBytes()); > } else { > jobStateCurator.setData().forPath(jobPath, > newJobStatus.toString().getBytes(charset)); > } > > if(node > > } else { > String finalJobPath = "/final-" + jobId.toString(); > LOG.info("Job switch state to final state {}", newJobStatus.toString()); > > jobStateCurator.create().forPath(finalJobPath, > newJobStatus.toString().getBytes(charset)); > int retry = 0; > Stat stat; > while((stat = jobStateCurator.checkExists().forPath(finalJobPath)) == null) > { > LOG.info("check " + finalJobPath + " not exists, retry after 50 ms, times " > + retry++); > Thread.sleep(50); > } > > LOG.info("Success create zk Node in final state for " + jobId.toString() ); > jobStateCurator.close(); > } > > if (!newJobStatus.isGloballyTerminalState()) { > > LOG.info("Job switch state to {}", newJobStatus.toString()); > if (jobStateCurator.checkExists().forPath(jobPath) == null) { > jobStateCurator.create().withMode(CreateMode.EPHEMERAL).forPath(jobPath, > newJobStatus.toString().getBytes()); > } else { > jobStateCurator.setData().forPath(jobPath, > newJobStatus.toString().getBytes(charset)); > } > } else { > String finalJobPath = "/final-" + jobId.toString(); > LOG.info("Job switch state to final state {}", newJobStatus.toString()); > > jobStateCurator.create().forPath(finalJobPath, > newJobStatus.toString().getBytes(charset)); > int retry = 0; > Stat stat; > while((stat = jobStateCurator.checkExists().forPath(finalJobPath)) == null) > { > LOG.info("check " + finalJobPath + " not exists, retry after 50 ms, times " > + retry++); > Thread.sleep(50); > } > > LOG.info("Success create zk Node in final state for " + jobId.toString() ); > jobStateCurator.close(); > }
