Curator’s lock revoking mechanism is really pretty simple. All it does is write a value to the lock’s node as described in the ZK docs (http://zookeeper.apache.org/doc/trunk/recipes.html#sc_revocableSharedLocks <http://zookeeper.apache.org/doc/trunk/recipes.html#sc_revocableSharedLocks>). All the listeners/watchers are notified. That’s it. Nothing more. So, really, it’s a very simple inter-process com channel. If it’s not working for you I suggest a more robust implementation. For example, you might dedicate a ZNode as a kind of com channel for releasing locks. In each thread, allocate a NodeCache on that ZNode and watch for revoke messages. The node’s value could be a JSON object of some kind that gives details of which lock to revoke, etc. Alternatively, if you can think of some improvements we can make the Curator’s revoker a PR would be appreciated.
-Jordan > On Oct 29, 2015, at 1:41 PM, tathagata roy <[email protected]> wrote: > > Okay let me try to explain the problem statement in detail > > We have a clustered environment where each tomcat server in the cluster can > process a stream of messages that comes in a MQTT topic. Once one server > subscribes to the topic it will hold a lock in zookeeper so that other > servers cannot acquire a lock and as a result does not subscribe to the same > topic. If one server fails, it releases all the locks and we have a > TreeCacheListener which is notified when the locks are released so that other > servers in the cluster can now try to acquire the locks which server 1 > released. All this has been taken care of and running successfully. Note each > server can have multiple threads and each thread can subscribe to multiple > topics. Hence each thread has acquired a lock for each topic its subscribed > to. > > Now what happens when the 1st server starts again. The idea is to maintain > some kind of load balance so that all servers have approx same num of topics > subscribed to and none of them are overloaded. So when 1st server is > restarted it should try to acquire a few topics and therefore ask other > servers which holds the lock to release it. > > I am using InterProcessMutex since that implements Revocable. and hence the > makeRevocable(). Now if any of the thread in server 1 wants to acquire a lock > which has been acquired by server 2, it invokes Revoker.attemptRevoke(path). > The server 2 which has lock, is invoked in the callback method of > RevocationListener > > lock.makeRevocable(new > RevocationListener<InterProcessMutex>(){ > > @Override > public void revocationRequested(InterProcessMutex lock1) { > try { > if(lock.isAcquiredInThisProcess()){ > lock.release(); > } > > } catch (Exception e) { > // TODO Auto-generated catch block > e.printStackTrace(); > } > > } > > }); > > The problem is this another async thread and when lock.release() is invoked > it throws error java.lang.IllegalMonitorStateException: You do not own the > lock. > > I can't even interrupt the thread which holds the lock because that thread > can hold other locks which is dont want to be revoked. > > You suggested to use, InterProcessSemaphoreMutex but that doesnot implement > Revocable because i guess its a non reentrant lock, and on calling > lock.release() on InterProcessSemaphoreMutex which has been acquired by > another server, it throws java.lang.IllegalStateException: Not acquired. > > So i am stuck here. Hope the problem is clear now > > > > > > > On Thu, Oct 29, 2015 at 1:59 PM, Jordan Zimmerman <[email protected] > <mailto:[email protected]>> wrote: >> Also if i just catch the error it does not solve the problem of actually >> releasing the lock taken by another thread in another jvm. > I don’t understand. I assume every process has a RevocationListener right? > What’s the problem? > > -JZ > >> On Oct 29, 2015, at 12:51 PM, tathagata roy <[email protected] >> <mailto:[email protected]>> wrote: >> >> I think there is a disconnect. Let me know if my understanding is correct >> >> InterProcessSemaphoreMutex's isAcquiredInThisProcess() will return true if >> the lock has been acquired by any thread in that JVM process. So how will >> the second thread which is trying to acquire a lock on the path return true >> if its on another jvm? Also if i just catch the error it does not solve the >> problem of actually releasing the lock taken by another thread in another >> jvm. >> >> On Wed, Oct 28, 2015 at 9:08 PM, Jordan Zimmerman >> <[email protected] <mailto:[email protected]>> wrote: >> Well, InterProcessSemaphoreMutex has the isAcquiredInThisProcess() method. >> Also, you could just catch IllegalStateException and log it. If you continue >> to have trouble, I can write an example for you when I have some time. >> >> -JZ >> >> >>> On Oct 28, 2015, at 4:46 PM, tathagata roy <[email protected] >>> <mailto:[email protected]>> wrote: >>> >>> Jordan, >>> >>> I tried using InterProcessSemaphoreMutex but that too has problems which >>> are >>> >>> If the 2nd thread creates a lock object on the basepath and without >>> acquiring it and tries to release it, the Exception >>> java.lang.IllegalStateException: Not acquired is thrown. And the 2nd thread >>> is not able to acquire the lock in itself without releasing it. >>> >>> Also if we go by the first approach you suggested of some kind of >>> intercommunication between threads, that would be a problem in real time as >>> instead of the threads being in the same JVM, the threads can be in >>> separate JVMs. So we cant notify threads running on a different JVM. >>> >>> Do you think we can achieve this using the native zookeeper api? >>> >>> Note: i am using version 2.8.0 >>> >>> Thanks for all your help and suggestions >>> >>> >>> >>> On Wed, Oct 28, 2015 at 4:01 PM, tathagata roy <[email protected] >>> <mailto:[email protected]>> wrote: >>> Thanks I will try that out >>> >>> On Wed, Oct 28, 2015 at 4:00 PM, Jordan Zimmerman >>> <[email protected] <mailto:[email protected]>> wrote: >>> Another thing is to use InterProcessSemaphoreMutex instead. It is a >>> non-reentrant lock and can be unlocked from any thread. >>> >>> -JZ >>> >>>> On Oct 28, 2015, at 2:55 PM, tathagata roy <[email protected] >>>> <mailto:[email protected]>> wrote: >>>> >>>> Thanks Jordan for the response. >>>> >>>> I cannot use the strategy because the thread can hold multiple locks , and >>>> if i do that it will release all the locks. The example i have put in the >>>> question is just a sample i was trying but in the actual project a single >>>> thread holds multiple locks. >>>> >>>> Is there any other way i can do that? >>>> >>>> On Wed, Oct 28, 2015 at 3:43 PM, Jordan Zimmerman >>>> <[email protected] <mailto:[email protected]>> wrote: >>>> Your RevocationListener should interrupt the thread that holds the lock. >>>> Then, that thread can release the lock. >>>> >>>> -Jordan >>>> >>>>> On Oct 28, 2015, at 2:19 PM, tathagata roy <[email protected] >>>>> <mailto:[email protected]>> wrote: >>>>> >>>>> All, >>>>> >>>>> I am trying to test the revocable Locking in Apache Curator. I have two >>>>> threads which tries to acquire a lock. If the first test acquires the >>>>> lock, the second thread can ask the first thread to release the lock so >>>>> that the 2nd thread can acquire it >>>>> >>>>> RetryPolicy retryPolicy = new >>>>> ExponentialBackoffRetry(baseSleepTimeMills, maxRetries); >>>>> CuratorFramework client = >>>>> CuratorFrameworkFactory.newClient(hosts, retryPolicy); >>>>> client.start(); >>>>> >>>>> final InterProcessMutex lock = new InterProcessMutex(client, >>>>> lockBasePath); >>>>> >>>>> Collection<String> nodes = lock.getParticipantNodes(); >>>>> >>>>> lock.makeRevocable(new RevocationListener<InterProcessMutex>(){ >>>>> >>>>> @Override >>>>> public void revocationRequested(InterProcessMutex lock1) { >>>>> try { >>>>> if(lock.isAcquiredInThisProcess()){ >>>>> lock.release(); >>>>> } >>>>> >>>>> } catch (Exception e) { >>>>> // TODO Auto-generated catch block >>>>> e.printStackTrace(); >>>>> } >>>>> >>>>> } >>>>> >>>>> }); >>>>> >>>>> if(nodes!=null && !nodes.isEmpty()){ >>>>> Revoker.attemptRevoke(client, nodes.iterator().next()); >>>>> } >>>>> >>>>> if (lock.acquire(waitTimeSeconds, TimeUnit.SECONDS)) { >>>>> try { >>>>> doSomeWork(lockName); >>>>> } finally { >>>>> lock.release(); >>>>> } >>>>> } else { >>>>> System.err.printf("%s timed out after %d seconds waiting to >>>>> acquire lock on %s\n", >>>>> lockName, waitTimeSeconds, lockPath); >>>>> } >>>>> >>>>> The problem is, when the 2nd thread calls the attemptRevoke, the callback >>>>> async method is called on the first process, but since its a call back >>>>> method that's a third thread, and if that invokes the lock.release it >>>>> throws an Exception >>>>> >>>>> java.lang.IllegalMonitorStateException: You do not own the lock >>>>> >>>>> That is as per the api >>>>> >>>>> release() Perform one release of the mutex if the calling thread is the >>>>> same thread that acquired it. >>>>> >>>>> So basically this is never possible because callbacks will always be >>>>> another thread. Is my understanding right? Is there any other way to >>>>> achieve this? >>>>> >>>>> Thanks for any suggestions >>>>> >>>>> -Tatha >>>>> >>>>> >>>> >>>> >>> >>> >>> >> >> > >
