Author: psharples
Date: Thu Mar 24 12:02:02 2011
New Revision: 1084921

URL: http://svn.apache.org/viewvc?rev=1084921&view=rev
Log:
Queueing framework for sequential db access of preferences and shared data 
updates

Added:
    incubator/wookie/trunk/src/org/apache/wookie/queues/
    incubator/wookie/trunk/src/org/apache/wookie/queues/IQueueConsumer.java   
(with props)
    incubator/wookie/trunk/src/org/apache/wookie/queues/IQueueHandler.java   
(with props)
    incubator/wookie/trunk/src/org/apache/wookie/queues/QueueManager.java   
(with props)
    incubator/wookie/trunk/src/org/apache/wookie/queues/beans/
    incubator/wookie/trunk/src/org/apache/wookie/queues/beans/IQueuedBean.java  
 (with props)
    incubator/wookie/trunk/src/org/apache/wookie/queues/beans/impl/
    incubator/wookie/trunk/src/org/apache/wookie/queues/impl/
    incubator/wookie/trunk/src/org/apache/wookie/queues/util/
    
incubator/wookie/trunk/src/org/apache/wookie/queues/util/QueueWatcherTask.java  
 (with props)

Added: incubator/wookie/trunk/src/org/apache/wookie/queues/IQueueConsumer.java
URL: 
http://svn.apache.org/viewvc/incubator/wookie/trunk/src/org/apache/wookie/queues/IQueueConsumer.java?rev=1084921&view=auto
==============================================================================
--- incubator/wookie/trunk/src/org/apache/wookie/queues/IQueueConsumer.java 
(added)
+++ incubator/wookie/trunk/src/org/apache/wookie/queues/IQueueConsumer.java Thu 
Mar 24 12:02:02 2011
@@ -0,0 +1,48 @@
+/*
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.wookie.queues;
+
+import org.apache.wookie.queues.beans.IQueuedBean;
+
+/**
+ * 
+ * @author Paul Sharples
+ *
+ */
+public interface IQueueConsumer {
+
+       /**
+        * set the name of the queue
+        * @param string
+        */
+       void setName(String string);
+
+       /**
+        * run as daemon
+        * @param b
+        */
+       void setDaemon(boolean b);
+
+       /**
+        * start it
+        */
+       void start();
+
+       /**
+        * use interrupt to stop the thread
+        */
+       void interrupt();
+       
+       abstract void process(IQueuedBean bean);
+}

Propchange: 
incubator/wookie/trunk/src/org/apache/wookie/queues/IQueueConsumer.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/wookie/trunk/src/org/apache/wookie/queues/IQueueHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/wookie/trunk/src/org/apache/wookie/queues/IQueueHandler.java?rev=1084921&view=auto
==============================================================================
--- incubator/wookie/trunk/src/org/apache/wookie/queues/IQueueHandler.java 
(added)
+++ incubator/wookie/trunk/src/org/apache/wookie/queues/IQueueHandler.java Thu 
Mar 24 12:02:02 2011
@@ -0,0 +1,58 @@
+/*
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.wookie.queues;
+
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.wookie.queues.beans.IQueuedBean;
+/**
+ * 
+ * @author Paul Sharples
+ *
+ */
+public interface IQueueHandler {
+       /**
+        * Get the identifier of the queue
+        * @return
+        */
+       String getQueueIdentifer();
+       
+       /**
+        * return the current count of requests added to queue
+        * @return
+        */
+       int getThreadTerminationCount();
+       
+       /**
+        * update the current count of requests added to queue
+        * @param num
+        */
+       void setThreadTerminationCount(int num);
+       
+       /**
+        * return the queue
+        * @return
+        */
+       BlockingQueue<IQueuedBean> getQueue();
+       
+       /**
+        * cleanup
+        */
+       void destroy();
+       
+       /**
+        * return the consumer thread
+        */
+       IQueueConsumer getConsumer();
+}

Propchange: 
incubator/wookie/trunk/src/org/apache/wookie/queues/IQueueHandler.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/wookie/trunk/src/org/apache/wookie/queues/QueueManager.java
URL: 
http://svn.apache.org/viewvc/incubator/wookie/trunk/src/org/apache/wookie/queues/QueueManager.java?rev=1084921&view=auto
==============================================================================
--- incubator/wookie/trunk/src/org/apache/wookie/queues/QueueManager.java 
(added)
+++ incubator/wookie/trunk/src/org/apache/wookie/queues/QueueManager.java Thu 
Mar 24 12:02:02 2011
@@ -0,0 +1,128 @@
+/*
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.wookie.queues;
+
+import java.util.HashMap;
+import org.apache.log4j.Logger;
+import org.apache.wookie.queues.beans.IQueuedBean;
+import org.apache.wookie.queues.beans.impl.QueuedBean;
+import org.apache.wookie.queues.impl.PreferenceQueueHandler;
+import org.apache.wookie.queues.impl.SharedDataQueueHandler;
+/**
+ * QueueManager (One instance per wookie) is designed to queue requests to 
update
+ * preferences and shareddata records so that there are no concurrent
+ * modifications made to the SAME record at the SAME time in the database, 
this can happen
+ * particularly with shared data requests (imagine 100s of clients trying to 
update a chat log at the same time)
+ * Therefore two hashmaps (one for preferences, the other for shareddata) keep 
a list of queues for
+ * accessing a particular preference/shareddata. In the case of preferences, a 
queue is made unique 
+ * by assigning it a queue name of instancekey and preference name 
concatenated together. In the
+ * case of shared data, the shareddata key and shareddata name are 
concatenated together. 
+ * 
+ * The Hashtables in each case contains a list of QueueHandlers. A 
QueueHandler is responsible for its own queue,
+ * a consumer thread (which processes each request and commits it to the 
database) and a watcher task which will destroy the queue
+ * resources when needed.  The queues themselves are short lived and are 
designed to be destroyed after a short period of inactivity.
+ * 
+ * @author Paul Sharples
+ * @version $Id$ 
+ *
+ */
+public class QueueManager {
+
+       public static Logger logger = Logger.getLogger(QueueManager.class);
+       // list of preference queues
+       private static HashMap<String, PreferenceQueueHandler> 
preferenceQueueList = new HashMap<String, PreferenceQueueHandler>();
+       // list of shared data queues   
+       private static HashMap<String, SharedDataQueueHandler> 
sharedDataQueueList = new HashMap<String, SharedDataQueueHandler>();     
+       protected static QueueManager instance = null;
+       
+       /**
+        * Non public constructor  
+        */
+       private QueueManager(){};
+       
+       /**
+        * Should only ever be one queuemanager
+        * @return the instance
+        */
+       public static synchronized QueueManager getInstance(){
+               if (instance == null){
+                       //logger.info("QUEUE MANAGER 
CONSTRUCTOR***********SHOULD ONLY EVER BE CALLED ONCE************");
+                       instance = new QueueManager();                  
+               }                               
+               return instance;
+       }
+               
+       /**
+        * Add a request for setting a preference to the correct queue
+        * @param id_key
+        * @param key
+        * @param value
+        */
+       public static synchronized void queueSetPreferenceRequest(String 
id_key, String key, String value){
+               IQueuedBean preferenceBean = new QueuedBean(id_key, key, value, 
false);
+               String queueKey = id_key + "-" + key;
+               PreferenceQueueHandler queueHandler;
+               if(preferenceQueueList.containsKey(queueKey)){
+                       queueHandler = (PreferenceQueueHandler) 
preferenceQueueList.get(queueKey);                      
+               }
+               else{
+                       queueHandler = new PreferenceQueueHandler(queueKey);
+                       preferenceQueueList.put(queueKey, queueHandler);        
        
+               }
+               queueHandler.addBeanToQueue(preferenceBean);
+       }
+       
+       /**
+        * Add a request for setting a named shareddata value to the correct 
queue
+        * @param id_key
+        * @param sharedDataKey
+        * @param key
+        * @param value
+        * @param append
+        */
+       public static synchronized void queueSetSharedDataRequest(String 
id_key, String sharedDataKey, String key, String value, boolean append){       
+               IQueuedBean sharedDataBean = new QueuedBean(id_key, key, value, 
append);
+               String queueKey = sharedDataKey + "-" + key; // use 
shareddatakey here instead of instancekey
+               SharedDataQueueHandler queueHandler;
+               if(sharedDataQueueList.containsKey(queueKey)){
+                       queueHandler = (SharedDataQueueHandler) 
sharedDataQueueList.get(queueKey);                      
+               }
+               else{
+                       queueHandler = new SharedDataQueueHandler(queueKey);
+                       sharedDataQueueList.put(queueKey, queueHandler);        
        
+               }
+               queueHandler.addBeanToQueue(sharedDataBean);                    
+       }
+       
+       /**
+        * Remove the shared data queue from the manager
+        * @param key
+        */
+       public static synchronized void removeSharedDataQueueFromManager(String 
key){
+               if(sharedDataQueueList.containsKey(key)){
+                       sharedDataQueueList.remove(key);
+               }
+       }
+
+       /**
+        * Remove the preference queue from the manager
+        * @param key
+        */
+
+       public static synchronized void removePreferenceQueueFromManager(String 
key){
+               if(preferenceQueueList.containsKey(key)){
+                       preferenceQueueList.remove(key);
+               }
+       }
+}

Propchange: 
incubator/wookie/trunk/src/org/apache/wookie/queues/QueueManager.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: 
incubator/wookie/trunk/src/org/apache/wookie/queues/beans/IQueuedBean.java
URL: 
http://svn.apache.org/viewvc/incubator/wookie/trunk/src/org/apache/wookie/queues/beans/IQueuedBean.java?rev=1084921&view=auto
==============================================================================
--- incubator/wookie/trunk/src/org/apache/wookie/queues/beans/IQueuedBean.java 
(added)
+++ incubator/wookie/trunk/src/org/apache/wookie/queues/beans/IQueuedBean.java 
Thu Mar 24 12:02:02 2011
@@ -0,0 +1,45 @@
+/*
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.wookie.queues.beans;
+/**
+ * QueuedBean signatures
+ * 
+ * @author Paul Sharples
+ *
+ */
+public interface IQueuedBean {
+       /**
+        * get the instance key
+        * @return
+        */
+       String getId_key();
+       
+       /**
+        * get the name of the preference/shared data entity to be updated
+        * @return
+        */
+       String getKey();
+       
+       /**
+        * get the value of the preference/shared data entity to be updated
+        * @return
+        */
+       String getValue();
+       
+       /**
+        * If shared data, then determine if the value should be appended to 
the existing content
+        * @return
+        */
+       boolean append();
+}

Propchange: 
incubator/wookie/trunk/src/org/apache/wookie/queues/beans/IQueuedBean.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: 
incubator/wookie/trunk/src/org/apache/wookie/queues/util/QueueWatcherTask.java
URL: 
http://svn.apache.org/viewvc/incubator/wookie/trunk/src/org/apache/wookie/queues/util/QueueWatcherTask.java?rev=1084921&view=auto
==============================================================================
--- 
incubator/wookie/trunk/src/org/apache/wookie/queues/util/QueueWatcherTask.java 
(added)
+++ 
incubator/wookie/trunk/src/org/apache/wookie/queues/util/QueueWatcherTask.java 
Thu Mar 24 12:02:02 2011
@@ -0,0 +1,65 @@
+/*
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.wookie.queues.util;
+
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.wookie.queues.IQueueHandler;
+/**
+ * Watches a given queue and will stop the consumer thread when needed and 
then cleanup
+ * 
+ * @author Paul Sharples
+ *
+ */
+public class QueueWatcherTask extends TimerTask {
+
+       // parent object - so we can clean up when finished
+       private IQueueHandler parent;
+       // local timer
+       private Timer timer;
+       
+       /**
+        * start the timer (5 second interval)
+        */
+       public void init(){
+                timer = new Timer();            
+                timer.scheduleAtFixedRate(this, 5000L, 5000L);
+       }
+       
+       public QueueWatcherTask(IQueueHandler parentHandler) {
+               parent = parentHandler;
+               init();
+       }
+
+       /**
+        * This method looks at the parent queue and sees if it is empty. if it 
is then it will wait a further 
+        * 5 seconds by setting the threadterminationcount to zero and then 
checking again after 5 seconds
+        * This is just to make sure the queue has been empty for at least 5 
seconds before destroying it
+        * and doesn't catch it the split second it happens to be empty
+        */
+       @Override
+       public void run() {     
+               //System.out.println("(" + parent.getQueueIdentifer() + ") 
tcount="+parent.getThreadTerminationCount()+" queue 
size="+parent.getQueue().size());
+                if(parent.getThreadTerminationCount() < 1 && 
parent.getQueue().isEmpty()){
+                        timer.cancel();
+                        parent.destroy();
+                        this.cancel();
+                }
+                else{
+                        parent.setThreadTerminationCount(0);
+                }
+       }
+
+}

Propchange: 
incubator/wookie/trunk/src/org/apache/wookie/queues/util/QueueWatcherTask.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain


Reply via email to