After more than 2 months, I finally found the time to move our
distributed locking mechanism over to JCluster. JCluster is very cool
by the way. Using JCluster is actually simpler than going straight
through JGroups, IMO.
The source files aren't currently in the best shape for a patch but I
wanted to get the code out and get people looking at it to make sure
there aren't any glaring problems with the distributed locking. We
haven't had a chance to really test the JCluster version but the logic
hasn't really changed from our JGroups version which we have tested
pretty well (although not as thoroughly as we would like.)
To compile these files you need JCluster and to run them you need
JGroups and the JGroups JCluster plug-in. I put all the .jar's in
JAMES/lib and everything seams to be working fine.
JCluster is available at (both .jar's are required):
http://jcluster-plugin.sourceforge.net/
JGroups is available at (use the latest version):
http://sourceforge.net/project/showfiles.php?group_id=6081
After getting more familiar with the James source, I believe it would be
better to make the distributed locking a block to make it more easily
configurable. There are conceivably a lot of settings that could be
changed and/or tweaked for distributed locking. Unfortunately, I'm not
sure how to go about creating a new block from scratch (although it
doesn't seam like it would be that difficult.)
-Mike
/***********************************************************************
* Copyright (c) 2000-2004 Avalon Digital Marketing Systems, Inc. *
* All rights reserved. *
* ------------------------------------------------------------------- *
* Licensed under the Apache License, Version 2.0 (the "License"); you *
* may not use this file except in compliance with the License. You *
* may obtain a copy of the License at: *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, software *
* distributed under the License is distributed on an "AS IS" BASIS, *
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or *
* implied. See the License for the specific language governing *
* permissions and limitations under the License. *
***********************************************************************/
package net.sf.jamesha.locking;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import net.jcluster.ClusterConnection;
import net.jcluster.ClusterDriver;
import net.jcluster.ClusterException;
import net.jcluster.ClusterInvalidStateException;
import net.jcluster.Message;
import net.jcluster.MessageListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* @author Mike Heath
*/
public class DistributedLock implements Lock {
static final Log LOG = LogFactory.getLog(DistributedLock.class);
public static final String DEFAULT_DRIVER_CLASS = "net.jcluster.plugins.jgroups.ClusterDriverImpl";
private static final String TOPIC_LOCK = "LOCK";
private static final String TOPIC_REPLY = "REPLY";
/** Maximum time to wait for a remote lock check to come back. */
public static final long REMOTE_LOCK_TIMEOUT = 30000;
/**
* JCluster connection.
*/
private ClusterConnection connection;
protected Map locks = Collections.synchronizedMap(new HashMap());
protected Map replyWaits = Collections.synchronizedMap(new HashMap());
public static final String DEFAULT_GROUP_NAME = "James";
public static final String DEFAULT_CONFIG = "default.xml";
public DistributedLock(String driverClass, String jclusterConfig, String groupName) throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException {
if (driverClass == null || driverClass.length() == 0) {
driverClass = DEFAULT_DRIVER_CLASS;
}
ClusterDriver driver = (ClusterDriver)Class.forName(driverClass).newInstance();
try {
connection = driver.createConnection(jclusterConfig);
connection.join(groupName);
// Create handler for lock requests.
connection.setMessageListener(TOPIC_LOCK, new MessageListener() {
public void onMessage(Message message) {
try {
if (!connection.getLocalAddress().equals(message.getSource())) {
LOG.debug("Received lock request");
Serializable key = (Serializable)message.getPayload();
Reply reply = new Reply();
reply.locked = isLockedLocal(key);
reply.key = key;
if (LOG.isDebugEnabled()) {
LOG.debug(reply.locked ? "Have lock locally" : "Do not have lock");
}
connection.send(message.getSource(), reply, TOPIC_REPLY);
}
} catch (ClusterInvalidStateException e) {
throw new RuntimeException(e);
}
}
});
// Create handler for dealing with replies.
connection.setMessageListener(TOPIC_REPLY, new MessageListener() {
public void onMessage(Message message) {
if (LOG.isDebugEnabled()) {
LOG.debug("Received a reply from: " + message.getSource());
}
Reply reply = (Reply)message.getPayload();
ReplyWait replyWait = (ReplyWait)replyWaits.get(reply.key);
if (replyWait == null) {
LOG.error("Could not find any thread waiting on remote lock check for key: " + reply.key);
} else {
synchronized (replyWait) {
replyWait.responses++;
replyWait.locked |= reply.locked;
}
if (LOG.isDebugEnabled()) {
LOG.debug((replyWait.locked) ? "Remote host has lock" : "Remote host does not have lock");
}
synchronized (DistributedLock.this) {
DistributedLock.this.notifyAll();
}
}
}
});
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
for(;;) {
String message = reader.readLine();
connection.sendAll(message, TOPIC_LOCK);
}
} catch (ClusterException e) {
throw new RuntimeException(e);
}
}
/**
* Check if the object is locked on the local host.
*
* @param key The object to check the lock for.
* @return True if the object is locked, false otherwise.
*/
protected boolean isLockedLocal(Serializable key) {
return locks.containsKey(key);
}
/**
* Check if the object is locked on a remote host.
*
* @param key The object to check the lock for.
* @return True if the object is locked, false otherwise.
*/
protected boolean isLockedRemote(Serializable key) {
boolean ownReplyWait = false;
try {
synchronized (this) {
ReplyWait replyWait = (ReplyWait)replyWaits.get(key);
if (replyWait == null) {
ownReplyWait = true;
replyWait = new ReplyWait();
replyWait.expectedResponses = connection.getMembership().size() - 1;
if (replyWait.expectedResponses == 0) {
return false;
}
replyWaits.put(key, replyWait);
}
connection.sendAll(key, TOPIC_LOCK);
long waitStart = System.currentTimeMillis();
while (replyWait.responses < replyWait.expectedResponses
&& (waitStart - System.currentTimeMillis()) < REMOTE_LOCK_TIMEOUT) {
wait(REMOTE_LOCK_TIMEOUT);
}
if (replyWait.responses < replyWait.expectedResponses) {
throw new Exception("Remote wait lock timeout");
}
return replyWait.locked;
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
if (ownReplyWait) {
replyWaits.remove(key);
}
}
}
public boolean isLocked(Serializable key) {
return isLockedLocal(key) || isLockedRemote(key);
}
public boolean lock(Serializable key) {
Object lock;
boolean unlockedLocal;
synchronized (this) {
lock = locks.get(key);
unlockedLocal = (lock == null);
if (unlockedLocal) {
locks.put(key, getCallingThread());
}
}
if (unlockedLocal) {
if (isLockedRemote(key)) {
locks.remove(key);
return false;
} else {
return true;
}
} else if (lock == getCallingThread()) {
return true;
} else {
return false;
}
}
public boolean unLock(Serializable key) {
Object lock;
boolean unlockedLocal;
synchronized (this) {
lock = locks.get(key);
unlockedLocal = (lock == null);
}
if (unlockedLocal) {
return !isLockedRemote(key);
} else if (lock == getCallingThread()) {
locks.remove(key);
return true;
} else {
return false;
}
}
private Thread getCallingThread() {
return Thread.currentThread();
}
}
/**
* Utility class for handling replies with JGroups.
*
* @author Mike Heath
*/
class Reply implements Serializable {
// The key for checking the lock.
Serializable key;
// Whether or not the remote host has a lock on the object.
boolean locked;
}
/**
* Utility class used for communicating between the JGroups listener thread and the thread
* that checking for a remote lock.
*
* @author Mike Heath
*/
class ReplyWait {
// Number of expected responses
int expectedResponses = 0;
// Current number of responses
int responses = 0;
// Whether the object is locked remotely or not.
boolean locked = false;
}/***********************************************************************
* Copyright (c) 2000-2004 Avalon Digital Marketing Systems, Inc. *
* All rights reserved. *
* ------------------------------------------------------------------- *
* Licensed under the Apache License, Version 2.0 (the "License"); you *
* may not use this file except in compliance with the License. You *
* may obtain a copy of the License at: *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, software *
* distributed under the License is distributed on an "AS IS" BASIS, *
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or *
* implied. See the License for the specific language governing *
* permissions and limitations under the License. *
***********************************************************************/
package net.sf.jamesha.locking;
import java.io.Serializable;
/**
* @author Mike Heath
*/
public interface Lock {
/**
* Check if the oject is locked.
*
* @param key The object to check the lock for.
* @return True if the object is locked, false otherwise.
*/
public abstract boolean isLocked(final Serializable key);
/**
* Creates a lock based on the <code>key</code>.
*
* <p>If <code>key</code> is not locked or is already locked by the current thread, <code>lock</code>
* will return true. If key is locked by a different thread or is locked on a second host,
* <code>lock</code> will return false.
*
* @param key The object to get the lock on.
* @return True if the lock was abtained, flase otherwise.
*/
public abstract boolean lock(final Serializable key);
/**
* Unlocks the object.
*
* <p>If the thread does not have the lock, return false.
*
* @param key The object to release the lock on.
* @return True if unlocked successfully, false otherwise.
*/
public abstract boolean unLock(final Serializable key);
}/***********************************************************************
* Copyright (c) 2000-2004 Avalon Digital Marketing Systems, Inc. *
* All rights reserved. *
* ------------------------------------------------------------------- *
* Licensed under the Apache License, Version 2.0 (the "License"); you *
* may not use this file except in compliance with the License. You *
* may obtain a copy of the License at: *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, software *
* distributed under the License is distributed on an "AS IS" BASIS, *
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or *
* implied. See the License for the specific language governing *
* permissions and limitations under the License. *
***********************************************************************/
package net.sf.jamesha.repository;
import net.sf.jamesha.locking.DistributedLock;
import net.sf.jamesha.locking.Lock;
import org.apache.avalon.framework.configuration.Configuration;
import org.apache.avalon.framework.configuration.ConfigurationException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.james.mailrepository.JDBCMailRepository;
/**
* Based off of James' JDBCMailRepository but using distributed locking.
*
* @author Mike Heath
*/
public class DistributedJDBCMailRepository extends JDBCMailRepository {
private static Log LOG = LogFactory.getLog(DistributedJDBCMailRepository.class);
private Lock lock;
/**
* Configures the distributed repository.
*
* <p>The config fields are:
* <ul>
* <li><b>groupName<b/> - The JGroups group name used for the distributed locking.</li>
* <li><b>protocolStack<b/>< - The JGroups protocol stack used for the distributed locking./li>
* </ul>
*
* @see org.apache.avalon.framework.configuration.Configurable#configure(org.apache.avalon.framework.configuration.Configuration)
*/
public void configure(Configuration conf) throws ConfigurationException {
super.configure(conf);
String groupName = conf.getAttribute("groupName", null);
if (groupName == null) {
LOG.warn("Using default group name for mail repository: " + this.repositoryName);
groupName = DistributedLock.DEFAULT_GROUP_NAME;
}
LOG.debug("Using group name '" + groupName + "' in repository: " + this.repositoryName);
String jclusterConfig = conf.getAttribute("jclusterConfig", null);
if (jclusterConfig == null) {
jclusterConfig = DistributedLock.DEFAULT_CONFIG;
}
try {
lock = new DistributedLock(null, jclusterConfig, groupName);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public synchronized boolean lock(String key) {
return lock.lock(key);
}
public synchronized boolean unlock(String key) {
return lock.unLock(key);
}
}
/***********************************************************************
* Copyright (c) 2000-2004 Avalon Digital Marketing Systems, Inc. *
* All rights reserved. *
* ------------------------------------------------------------------- *
* Licensed under the Apache License, Version 2.0 (the "License"); you *
* may not use this file except in compliance with the License. You *
* may obtain a copy of the License at: *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, software *
* distributed under the License is distributed on an "AS IS" BASIS, *
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or *
* implied. See the License for the specific language governing *
* permissions and limitations under the License. *
***********************************************************************/
package net.sf.jamesha.repository;
import org.apache.avalon.framework.configuration.Configuration;
import org.apache.avalon.framework.configuration.ConfigurationException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.james.mailrepository.JDBCSpoolRepository;
import net.sf.jamesha.locking.DistributedLock;
/**
* Based off of James JDBCSpoolRepository but has support for distributed locking.
*
* @author Mike Heath
*/
public class DistributedJDBCSpoolRepository extends JDBCSpoolRepository {
private static Log LOG = LogFactory.getLog(DistributedJDBCSpoolRepository.class);
private DistributedLock lock;
/**
* Configures the distributed spool.
*
* <p>The config fields are:
* <ul>
* <li><b>groupName<b/> - The JGroups group name used for the distributed locking.</li>
* <li><b>protocolStack<b/>< - The JGroups protocol stack used for the distributed locking./li>
* </ul>
*
* @see org.apache.avalon.framework.configuration.Configurable#configure(org.apache.avalon.framework.configuration.Configuration)
*/
public void configure(Configuration conf) throws ConfigurationException {
super.configure(conf);
String groupName = conf.getAttribute("groupName", null);
if (groupName == null) {
LOG.warn("Using default group name for spool: " + this.repositoryName);
groupName = DistributedLock.DEFAULT_GROUP_NAME;
}
LOG.debug("Using group name '" + groupName + "' in spool: " + this.repositoryName);
String jclusterConfig = conf.getAttribute("jclusterConfig", null);
if (jclusterConfig == null) {
jclusterConfig = DistributedLock.DEFAULT_CONFIG;
}
try {
lock = new DistributedLock(null, jclusterConfig, groupName);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public synchronized boolean lock(String key) {
return lock.lock(key);
}
public synchronized boolean unlock(String key) {
return lock.unLock(key);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]