hmmm. I was using fixed thread pool so that I have that number of threads running in parallel. As I mention in my above question, I don't want to have sequential events instead of that I want to have EventA, B and C called at the same time.
Do you think I should use anything else? On Wed, Jul 23, 2014 at 9:07 PM, Jordan Zimmerman < [email protected]> wrote: > Yes. Though, I don’t know why you’d use a fixed thread pool. > > > From: Check Peck <[email protected]> <[email protected]> > Reply: [email protected] <[email protected]>> > <[email protected]> > Date: July 23, 2014 at 11:06:42 PM > To: user <[email protected]>> <[email protected]> > Cc: Cameron McKenzie <[email protected]>> <[email protected]> > > Subject: Re: How to use PathChildrenCache properly for keeping a watch > on three znodes on Zookeeper? > > Thanks Jordan. That is some useful information. If I need to use > ThreadUtils class, then I will be using like this - Right? > > cache.getListenable().addListener(listener, > ThreadUtils.newFixedThreadPool(15, "ServerMonitoring")); > > > On Wed, Jul 23, 2014 at 3:12 PM, Jordan Zimmerman < > [email protected]> wrote: > >> FYI >> >> Curator has the ThreadUtils class that makes it easier to make thread >> pools that are daemons. Also, Tech Note 1 describes the single thread >> issue: https://cwiki.apache.org/confluence/display/CURATOR/TN1 >> >> -JZ >> >> >> From: Cameron McKenzie <[email protected]> <[email protected]> >> Reply: [email protected] <[email protected]>> >> <[email protected]> >> Date: July 23, 2014 at 5:11:09 PM >> To: [email protected] <[email protected]>> >> <[email protected]> >> Subject: Re: How to use PathChildrenCache properly for keeping a watch >> on three znodes on Zookeeper? >> >> That should be fine, remember to shutdown executors when your system >> is shutting down though. >> >> Keep in mind though that the delivery of events to your childEvent() >> method will no longer be serialised. (i.e. Where you would previously get >> EventA, EventB, EventC one after another, you will now get EventA, B and C >> called at the same time. This may be fine for you, but something to keep in >> mind. >> >> If this is an issue, then instead of passing the executor to the >> addListener call, you can call your executor directly from the childEvent() >> method whenever you need to do something long running. >> >> >> On Thu, Jul 24, 2014 at 8:05 AM, Check Peck <[email protected]> >> wrote: >> >>> Thanks a lot Cameron. That makes sense a lot. So If I need to do it >>> properly I should have my executors declare like this at the top of my >>> class - >>> >>> private static ExecutorService service = >>> Executors.newFixedThreadPool(15); >>> >>> and then use the above "ExecutorService" like below - >>> >>> cache.getListenable().addListener(listener, service); >>> >>> Am I got everything right with the above code? >>> >>> >>> >>> On Wed, Jul 23, 2014 at 2:49 PM, Cameron McKenzie < >>> [email protected]> wrote: >>> >>>> When you call cache.getListenable().addListener(listener), if you >>>> don't provide an executor service, then it will use the default Curator >>>> one, which will only have a single thread. This means that your listener >>>> will only be called by a single thread, so if you do anything in that >>>> listener that blocks (i.e restarting the server) then you're not going to >>>> get any other events until after your blocking event has finished. >>>> >>>> You could either provide an executor that allows more than a single >>>> thread (and thus your childEvent method could get called concurrently >>>> for different events), or you could use your own executor to do the >>>> restarts, which would allow the curator event thread to keep processing >>>> stuff. >>>> >>>> >>>> On Thu, Jul 24, 2014 at 6:33 AM, Check Peck <[email protected]> >>>> wrote: >>>> >>>>> I am using Curator library for Zookeeper. I am using zookeeper to >>>>> monitor whether my app servers are up or not. If they are not up or shut >>>>> down, then bring them up. I need to keep a watch on three of my znodes on >>>>> the zookeeper. I am keeping a watch on ("/test/proc/phx/server", >>>>> ("/test/proc/slc/server") and ("/test/proc/lvs/server"))I have a znode >>>>> structure like below. >>>>> >>>>> /test/proc >>>>> /phx >>>>> /server >>>>> /h1 >>>>> /h2 >>>>> /h3 >>>>> /h4 >>>>> /h5 >>>>> /slc >>>>> /server >>>>> /h1 >>>>> /h2 >>>>> /h3 >>>>> /h4 >>>>> /h5 >>>>> /lvs >>>>> /server >>>>> /h1 >>>>> /h2 >>>>> /h3 >>>>> /h4 >>>>> /h5 >>>>> >>>>> As you can see above, for "/test/proc/phx/server", we have 5 hosts >>>>> starting with "h", similary for slc and lvs as well. And all those hosts >>>>> starting with "h" are ephimeral znodes. Now as soon as any server dies, >>>>> let's say for PHX, h4 machine went down, then the "h4" ephemeral znodes >>>>> gets deleted from the "/test/proc/phx/server" and then I will try to >>>>> re-start h4 machine on PHX datacenter. Similarly with SLC and LVS. >>>>> >>>>> Below is my code by which I am keeping a watch and re-starting the >>>>> servers if any machine went down in any datacenters. With the below code >>>>> what I am seeing is, suppose if three machine went down in same >>>>> datacenter, >>>>> then it restart those three one by one. Meaning let's say h1, h3, h5 went >>>>> down in PHX datacenter, then first it will restart h1 and as soon as h1 is >>>>> done, then it will restart h3 and then h5. So it is always waiting for one >>>>> to get finished and then restart another host. I am not sure why? Those >>>>> three should be restarted instantly right since it's a background thread ? >>>>> >>>>> And also sometimes what I am seeing if all the hosts went down >>>>> instantly then it doesn't restart anything? May be thread is getting >>>>> stuck? >>>>> Does my below code looks right with the way I am keeping a watch on three >>>>> Datacenters PHX ("/test/proc/phx/server"), SLC("/test/proc/slc/server") >>>>> and >>>>> LVS("/test/proc/lvs/server") >>>>> >>>>> List<String> datacenters = Arrays.asList("PHX", "SLC", "LVS"); >>>>> for (String dc : datacenters) { >>>>> // in this example we will cache data. Notice that this is >>>>> optional. >>>>> PathChildrenCache cache = new >>>>> PathChildrenCache(zookClient.getClient(), "/test/proc" + "/" + dc + "/" + >>>>> "server", true); >>>>> cache.start(); >>>>> >>>>> addListener(cache); >>>>> } >>>>> >>>>> private static void addListener(PathChildrenCache cache) { >>>>> >>>>> PathChildrenCacheListener listener = new >>>>> PathChildrenCacheListener() { >>>>> public void childEvent(CuratorFramework client, >>>>> PathChildrenCacheEvent event) throws Exception { >>>>> switch (event.getType()) { >>>>> case CHILD_ADDED: { >>>>> if (zookClient.isLeader()) { >>>>> String path = >>>>> ZKPaths.getPathAndNode(event.getData().getPath()).getPath(); >>>>> String node = >>>>> ZKPaths.getNodeFromPath(event.getData().getPath()); >>>>> String datacenter = path.split("/")[3]; >>>>> >>>>> System.out.println("Node added: Path= ", path, >>>>> ", Actual Node= ", node, ", Datacenter= ", datacenter); >>>>> >>>>> break; >>>>> } >>>>> } >>>>> >>>>> case CHILD_UPDATED: { >>>>> if (zookClient.isLeader()) { >>>>> String path = >>>>> ZKPaths.getPathAndNode(event.getData().getPath()).getPath(); >>>>> String node = >>>>> ZKPaths.getNodeFromPath(event.getData().getPath()); >>>>> String datacenter = path.split("/")[3]; >>>>> >>>>> System.out.println("Node updated: Path= ", >>>>> path, ", Actual Node= ", node, ", Datacenter= ", datacenter); >>>>> >>>>> break; >>>>> } >>>>> } >>>>> >>>>> case CHILD_REMOVED: { >>>>> if (zookClient.isLeader()) { >>>>> String path = >>>>> ZKPaths.getPathAndNode(event.getData().getPath()).getPath(); >>>>> String node = >>>>> ZKPaths.getNodeFromPath(event.getData().getPath()); >>>>> String datacenter = path.split("/")[3]; >>>>> >>>>> System.out.println("Node removed: Path= ", >>>>> path, ", Actual Node= ", node, ", Datacenter= ", datacenter); >>>>> >>>>> // restart machine which goes down >>>>> // I am assuming as soon as any machine went >>>>> down, call will come here instantly without waiting for anything? >>>>> >>>>> break; >>>>> } >>>>> } >>>>> default: >>>>> break; >>>>> >>>>> } >>>>> } >>>>> }; >>>>> cache.getListenable().addListener(listener); >>>>> } >>>>> >>>> >>>> >>> >> >
