Author: psharples
Date: Thu Mar 24 12:02:02 2011
New Revision: 1084921
URL: http://svn.apache.org/viewvc?rev=1084921&view=rev
Log:
Queueing framework for sequential db access of preferences and shared data
updates
Added:
incubator/wookie/trunk/src/org/apache/wookie/queues/
incubator/wookie/trunk/src/org/apache/wookie/queues/IQueueConsumer.java
(with props)
incubator/wookie/trunk/src/org/apache/wookie/queues/IQueueHandler.java
(with props)
incubator/wookie/trunk/src/org/apache/wookie/queues/QueueManager.java
(with props)
incubator/wookie/trunk/src/org/apache/wookie/queues/beans/
incubator/wookie/trunk/src/org/apache/wookie/queues/beans/IQueuedBean.java
(with props)
incubator/wookie/trunk/src/org/apache/wookie/queues/beans/impl/
incubator/wookie/trunk/src/org/apache/wookie/queues/impl/
incubator/wookie/trunk/src/org/apache/wookie/queues/util/
incubator/wookie/trunk/src/org/apache/wookie/queues/util/QueueWatcherTask.java
(with props)
Added: incubator/wookie/trunk/src/org/apache/wookie/queues/IQueueConsumer.java
URL:
http://svn.apache.org/viewvc/incubator/wookie/trunk/src/org/apache/wookie/queues/IQueueConsumer.java?rev=1084921&view=auto
==============================================================================
--- incubator/wookie/trunk/src/org/apache/wookie/queues/IQueueConsumer.java
(added)
+++ incubator/wookie/trunk/src/org/apache/wookie/queues/IQueueConsumer.java Thu
Mar 24 12:02:02 2011
@@ -0,0 +1,48 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.wookie.queues;
+
+import org.apache.wookie.queues.beans.IQueuedBean;
+
+/**
+ *
+ * @author Paul Sharples
+ *
+ */
+public interface IQueueConsumer {
+
+ /**
+ * set the name of the queue
+ * @param string
+ */
+ void setName(String string);
+
+ /**
+ * run as daemon
+ * @param b
+ */
+ void setDaemon(boolean b);
+
+ /**
+ * start it
+ */
+ void start();
+
+ /**
+ * use interrupt to stop the thread
+ */
+ void interrupt();
+
+ abstract void process(IQueuedBean bean);
+}
Propchange:
incubator/wookie/trunk/src/org/apache/wookie/queues/IQueueConsumer.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/wookie/trunk/src/org/apache/wookie/queues/IQueueHandler.java
URL:
http://svn.apache.org/viewvc/incubator/wookie/trunk/src/org/apache/wookie/queues/IQueueHandler.java?rev=1084921&view=auto
==============================================================================
--- incubator/wookie/trunk/src/org/apache/wookie/queues/IQueueHandler.java
(added)
+++ incubator/wookie/trunk/src/org/apache/wookie/queues/IQueueHandler.java Thu
Mar 24 12:02:02 2011
@@ -0,0 +1,58 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.wookie.queues;
+
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.wookie.queues.beans.IQueuedBean;
+/**
+ *
+ * @author Paul Sharples
+ *
+ */
+public interface IQueueHandler {
+ /**
+ * Get the identifier of the queue
+ * @return
+ */
+ String getQueueIdentifer();
+
+ /**
+ * return the current count of requests added to queue
+ * @return
+ */
+ int getThreadTerminationCount();
+
+ /**
+ * update the current count of requests added to queue
+ * @param num
+ */
+ void setThreadTerminationCount(int num);
+
+ /**
+ * return the queue
+ * @return
+ */
+ BlockingQueue<IQueuedBean> getQueue();
+
+ /**
+ * cleanup
+ */
+ void destroy();
+
+ /**
+ * return the consumer thread
+ */
+ IQueueConsumer getConsumer();
+}
Propchange:
incubator/wookie/trunk/src/org/apache/wookie/queues/IQueueHandler.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/wookie/trunk/src/org/apache/wookie/queues/QueueManager.java
URL:
http://svn.apache.org/viewvc/incubator/wookie/trunk/src/org/apache/wookie/queues/QueueManager.java?rev=1084921&view=auto
==============================================================================
--- incubator/wookie/trunk/src/org/apache/wookie/queues/QueueManager.java
(added)
+++ incubator/wookie/trunk/src/org/apache/wookie/queues/QueueManager.java Thu
Mar 24 12:02:02 2011
@@ -0,0 +1,128 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.wookie.queues;
+
+import java.util.HashMap;
+import org.apache.log4j.Logger;
+import org.apache.wookie.queues.beans.IQueuedBean;
+import org.apache.wookie.queues.beans.impl.QueuedBean;
+import org.apache.wookie.queues.impl.PreferenceQueueHandler;
+import org.apache.wookie.queues.impl.SharedDataQueueHandler;
+/**
+ * QueueManager (One instance per wookie) is designed to queue requests to
update
+ * preferences and shareddata records so that there are no concurrent
+ * modifications made to the SAME record at the SAME time in the database,
this can happen
+ * particularly with shared data requests (imagine 100s of clients trying to
update a chat log at the same time)
+ * Therefore two hashmaps (one for preferences, the other for shareddata) keep
a list of queues for
+ * accessing a particular preference/shareddata. In the case of preferences, a
queue is made unique
+ * by assigning it a queue name of instancekey and preference name
concatenated together. In the
+ * case of shared data, the shareddata key and shareddata name are
concatenated together.
+ *
+ * The Hashtables in each case contains a list of QueueHandlers. A
QueueHandler is responsible for its own queue,
+ * a consumer thread (which processes each request and commits it to the
database) and a watcher task which will destroy the queue
+ * resources when needed. The queues themselves are short lived and are
designed to be destroyed after a short period of inactivity.
+ *
+ * @author Paul Sharples
+ * @version $Id$
+ *
+ */
+public class QueueManager {
+
+ public static Logger logger = Logger.getLogger(QueueManager.class);
+ // list of preference queues
+ private static HashMap<String, PreferenceQueueHandler>
preferenceQueueList = new HashMap<String, PreferenceQueueHandler>();
+ // list of shared data queues
+ private static HashMap<String, SharedDataQueueHandler>
sharedDataQueueList = new HashMap<String, SharedDataQueueHandler>();
+ protected static QueueManager instance = null;
+
+ /**
+ * Non public constructor
+ */
+ private QueueManager(){};
+
+ /**
+ * Should only ever be one queuemanager
+ * @return the instance
+ */
+ public static synchronized QueueManager getInstance(){
+ if (instance == null){
+ //logger.info("QUEUE MANAGER
CONSTRUCTOR***********SHOULD ONLY EVER BE CALLED ONCE************");
+ instance = new QueueManager();
+ }
+ return instance;
+ }
+
+ /**
+ * Add a request for setting a preference to the correct queue
+ * @param id_key
+ * @param key
+ * @param value
+ */
+ public static synchronized void queueSetPreferenceRequest(String
id_key, String key, String value){
+ IQueuedBean preferenceBean = new QueuedBean(id_key, key, value,
false);
+ String queueKey = id_key + "-" + key;
+ PreferenceQueueHandler queueHandler;
+ if(preferenceQueueList.containsKey(queueKey)){
+ queueHandler = (PreferenceQueueHandler)
preferenceQueueList.get(queueKey);
+ }
+ else{
+ queueHandler = new PreferenceQueueHandler(queueKey);
+ preferenceQueueList.put(queueKey, queueHandler);
+ }
+ queueHandler.addBeanToQueue(preferenceBean);
+ }
+
+ /**
+ * Add a request for setting a named shareddata value to the correct
queue
+ * @param id_key
+ * @param sharedDataKey
+ * @param key
+ * @param value
+ * @param append
+ */
+ public static synchronized void queueSetSharedDataRequest(String
id_key, String sharedDataKey, String key, String value, boolean append){
+ IQueuedBean sharedDataBean = new QueuedBean(id_key, key, value,
append);
+ String queueKey = sharedDataKey + "-" + key; // use
shareddatakey here instead of instancekey
+ SharedDataQueueHandler queueHandler;
+ if(sharedDataQueueList.containsKey(queueKey)){
+ queueHandler = (SharedDataQueueHandler)
sharedDataQueueList.get(queueKey);
+ }
+ else{
+ queueHandler = new SharedDataQueueHandler(queueKey);
+ sharedDataQueueList.put(queueKey, queueHandler);
+ }
+ queueHandler.addBeanToQueue(sharedDataBean);
+ }
+
+ /**
+ * Remove the shared data queue from the manager
+ * @param key
+ */
+ public static synchronized void removeSharedDataQueueFromManager(String
key){
+ if(sharedDataQueueList.containsKey(key)){
+ sharedDataQueueList.remove(key);
+ }
+ }
+
+ /**
+ * Remove the preference queue from the manager
+ * @param key
+ */
+
+ public static synchronized void removePreferenceQueueFromManager(String
key){
+ if(preferenceQueueList.containsKey(key)){
+ preferenceQueueList.remove(key);
+ }
+ }
+}
Propchange:
incubator/wookie/trunk/src/org/apache/wookie/queues/QueueManager.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added:
incubator/wookie/trunk/src/org/apache/wookie/queues/beans/IQueuedBean.java
URL:
http://svn.apache.org/viewvc/incubator/wookie/trunk/src/org/apache/wookie/queues/beans/IQueuedBean.java?rev=1084921&view=auto
==============================================================================
--- incubator/wookie/trunk/src/org/apache/wookie/queues/beans/IQueuedBean.java
(added)
+++ incubator/wookie/trunk/src/org/apache/wookie/queues/beans/IQueuedBean.java
Thu Mar 24 12:02:02 2011
@@ -0,0 +1,45 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.wookie.queues.beans;
+/**
+ * QueuedBean signatures
+ *
+ * @author Paul Sharples
+ *
+ */
+public interface IQueuedBean {
+ /**
+ * get the instance key
+ * @return
+ */
+ String getId_key();
+
+ /**
+ * get the name of the preference/shared data entity to be updated
+ * @return
+ */
+ String getKey();
+
+ /**
+ * get the value of the preference/shared data entity to be updated
+ * @return
+ */
+ String getValue();
+
+ /**
+ * If shared data, then determine if the value should be appended to
the existing content
+ * @return
+ */
+ boolean append();
+}
Propchange:
incubator/wookie/trunk/src/org/apache/wookie/queues/beans/IQueuedBean.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added:
incubator/wookie/trunk/src/org/apache/wookie/queues/util/QueueWatcherTask.java
URL:
http://svn.apache.org/viewvc/incubator/wookie/trunk/src/org/apache/wookie/queues/util/QueueWatcherTask.java?rev=1084921&view=auto
==============================================================================
---
incubator/wookie/trunk/src/org/apache/wookie/queues/util/QueueWatcherTask.java
(added)
+++
incubator/wookie/trunk/src/org/apache/wookie/queues/util/QueueWatcherTask.java
Thu Mar 24 12:02:02 2011
@@ -0,0 +1,65 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.wookie.queues.util;
+
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.wookie.queues.IQueueHandler;
+/**
+ * Watches a given queue and will stop the consumer thread when needed and
then cleanup
+ *
+ * @author Paul Sharples
+ *
+ */
+public class QueueWatcherTask extends TimerTask {
+
+ // parent object - so we can clean up when finished
+ private IQueueHandler parent;
+ // local timer
+ private Timer timer;
+
+ /**
+ * start the timer (5 second interval)
+ */
+ public void init(){
+ timer = new Timer();
+ timer.scheduleAtFixedRate(this, 5000L, 5000L);
+ }
+
+ public QueueWatcherTask(IQueueHandler parentHandler) {
+ parent = parentHandler;
+ init();
+ }
+
+ /**
+ * This method looks at the parent queue and sees if it is empty. if it
is then it will wait a further
+ * 5 seconds by setting the threadterminationcount to zero and then
checking again after 5 seconds
+ * This is just to make sure the queue has been empty for at least 5
seconds before destroying it
+ * and doesn't catch it the split second it happens to be empty
+ */
+ @Override
+ public void run() {
+ //System.out.println("(" + parent.getQueueIdentifer() + ")
tcount="+parent.getThreadTerminationCount()+" queue
size="+parent.getQueue().size());
+ if(parent.getThreadTerminationCount() < 1 &&
parent.getQueue().isEmpty()){
+ timer.cancel();
+ parent.destroy();
+ this.cancel();
+ }
+ else{
+ parent.setThreadTerminationCount(0);
+ }
+ }
+
+}
Propchange:
incubator/wookie/trunk/src/org/apache/wookie/queues/util/QueueWatcherTask.java
------------------------------------------------------------------------------
svn:mime-type = text/plain