Author: ssmiweve
Date: 2008-12-01 14:06:26 +0100 (Mon, 01 Dec 2008)
New Revision: 7005

Modified:
   
branches/2.18/war/src/main/java/no/sesat/search/http/filters/SiteLocatorFilter.java
Log:
Issue SKER4321:  (Load reducing filter that handles only current and last 
request from client) 
 second attempt. this time correctly LIFO. smaller methods. ReentrantLock 
instead of Semaphore.

Modified: 
branches/2.18/war/src/main/java/no/sesat/search/http/filters/SiteLocatorFilter.java
===================================================================
--- 
branches/2.18/war/src/main/java/no/sesat/search/http/filters/SiteLocatorFilter.java
 2008-11-30 21:30:59 UTC (rev 7004)
+++ 
branches/2.18/war/src/main/java/no/sesat/search/http/filters/SiteLocatorFilter.java
 2008-12-01 13:06:26 UTC (rev 7005)
@@ -32,10 +32,15 @@
 import java.util.Properties;
 import java.util.UUID;
 import java.text.MessageFormat;
+import java.util.Deque;
 import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import javax.servlet.Filter;
 import javax.servlet.FilterChain;
 import javax.servlet.FilterConfig;
@@ -89,7 +94,7 @@
     private static final String UNKNOWN = "unknown";
 
     private static final String USER_REQUEST_QUEUE = "userRequestQueue";
-    private static final String USER_REQUEST_SEMAPHORE = 
"userRequestSemaphore";
+    private static final String USER_REQUEST_LOCK = "userRequestSemaphore";
     private static final long WAIT_TIME = 5000;
     private static final int REQUEST_QUEUE_SIZE = 5;
 
@@ -397,81 +402,112 @@
 
     // Private -------------------------------------------------------
 
-    @SuppressWarnings("unchecked")
     private static void doChainFilter(
             final FilterChain chain,
             final ServletRequest request,
             final ServletResponse response) throws IOException, 
ServletException {
 
         if (request instanceof HttpServletRequest) {
-            final HttpSession session = ((HttpServletRequest) 
request).getSession();
+            doChainFilter(chain, (HttpServletRequest)request, 
(HttpServletResponse)response);
+        } else {
+            chain.doFilter(request, response);
+        }
+    }
 
-            // fetch the user's queue
-            BlockingQueue<ServletRequest> queue
-                    = (BlockingQueue<ServletRequest>) 
session.getAttribute(USER_REQUEST_QUEUE);
+    private static void doChainFilter(
+            final FilterChain chain,
+            final HttpServletRequest request,
+            final HttpServletResponse response) throws IOException, 
ServletException {
 
-            // construct queue if necessary
-            if (null == queue) {
-                queue = new 
ArrayBlockingQueue<ServletRequest>(REQUEST_QUEUE_SIZE);
-                session.setAttribute(USER_REQUEST_QUEUE, queue);
-                session.setAttribute(USER_REQUEST_SEMAPHORE, new Semaphore(1));
-            }
+        final HttpSession session = request.getSession();
 
-            // queue has a time limit. start counting.
-            final long start = System.currentTimeMillis();
+        // fetch the user's deque
+        final Deque<ServletRequest> deque = getUsersDeque(session);
 
-            // attempt to join queue
-            if (queue.offer(request)) {
+        // lock to execute
+        final ReentrantLock lock = (ReentrantLock) 
session.getAttribute(USER_REQUEST_LOCK);
 
-                // a semphore is used for waiting within the queue. it lets us 
sleep peacefully until front of queue.
-                final Semaphore semaphore = (Semaphore) 
session.getAttribute(USER_REQUEST_SEMAPHORE);
+        // deque has a time limit. start counting.
+        long timeLeft = WAIT_TIME;
 
-                try {
-                    while(queue.peek() != request){
+        // attempt to join deque
+        if (deque.offerFirst(request)) {
+            timeLeft = tryLock(request, deque, lock, timeLeft);
+        }
 
-                        final long timeLeft = WAIT_TIME - 
(System.currentTimeMillis() - start);
+        if(lock.isHeldByCurrentThread()){
 
-                        // let's sleep. sleeping too long results in 409 
response.
-                        if(!semaphore.tryAcquire(timeLeft, 
TimeUnit.MILLISECONDS)){
+            try{
+                // waiting is over. and we can execute
+                chain.doFilter(request, response);
 
-                            LOG.warn(" -- response 409 (Timeout: Waited " + 
(WAIT_TIME - timeLeft) + " ms. )");
-                            ((HttpServletResponse) 
response).sendError(HttpServletResponse.SC_CONFLICT);
-                            return; // response is set
+            }finally{
+                // take out of deque first
+                deque.remove(request);
 
-                        }else if(queue.peek() != request){
-                            // we've acquire the semaphore but we're not at 
front of queue.
-                            // mix up. and probably not possible in the jvm?
-                            // release the semaphore and go back to sleep
-                            semaphore.release();
-                            LOG.error("acquired semaphore without being at 
front of queue: " + queue);
-                        }
-                    }
+                // release the lock, waiting up the next request
+                lock.unlock();
+            }
+        }else{
+            // we failed to execute. return 409 response.
+            if (response instanceof HttpServletResponse) {
 
-                    // waiting is over. we can execute.
-                    chain.doFilter(request, response);
+                LOG.warn(" -- response 409 " +
+                        (0 < timeLeft
+                        ? "(More then " + REQUEST_QUEUE_SIZE + " requests 
already in queue)"
+                        : "(Timeout: Waited " + WAIT_TIME + " ms)"));
 
-                }catch(InterruptedException ie){
-                    LOG.error("Failed using session's semaphore", ie);
+                response.sendError(HttpServletResponse.SC_CONFLICT);
+            }
+        }
+    }
 
-                }finally {
+    private static Deque<ServletRequest> getUsersDeque(final HttpSession 
session){
 
-                    // take out of queue first so (queue.peek() != request) is 
true for the next request.
-                    queue.remove();
-                    // release the semaphore, waiting up the next request.
-                    semaphore.release();
-                }
+        @SuppressWarnings("unchecked")
+        Deque<ServletRequest> deque = (BlockingDeque<ServletRequest>) 
session.getAttribute(USER_REQUEST_QUEUE);
 
-            }else{
-                // the queue is too long to join. immediately return 409 
response.
-                if (response instanceof HttpServletResponse) {
-                    LOG.warn(" -- response 409 (More then " + 
REQUEST_QUEUE_SIZE + " request in queue)");
-                    ((HttpServletResponse) 
response).sendError(HttpServletResponse.SC_CONFLICT);
+        // construct deque if necessary
+        if (null == deque) {
+            // it may be possible for duplicates across threads to be 
constructed here
+            deque = new 
LinkedBlockingDeque<ServletRequest>(REQUEST_QUEUE_SIZE);
+            session.setAttribute(USER_REQUEST_QUEUE, deque);
+            session.setAttribute(USER_REQUEST_LOCK, new ReentrantLock());
+        }
+
+        return deque;
+    }
+
+    private static long tryLock(
+            final HttpServletRequest request,
+            final Deque<ServletRequest> deque,
+            final Lock lock,
+            long timeLeft){
+
+        final long start = System.currentTimeMillis();
+
+        try {
+            do{
+                timeLeft = WAIT_TIME - (System.currentTimeMillis() - start);
+
+                // let's sleep. sleeping too long results in 409 response
+                if(0 >= timeLeft || !lock.tryLock(timeLeft, 
TimeUnit.MILLISECONDS)){
+                    // we timed out or got the lock. waiting is over
+                    break;
+
+                }else if(deque.peek() != request){
+                    // we've acquired the lock but we're not at front of deque
+                    // release the lock and try again
+                    lock.unlock();
                 }
-            }
+            }while(deque.peek() != request);
 
-        } else {
-            chain.doFilter(request, response);
+
+        }catch(InterruptedException ie){
+            LOG.error("Failed using user's lock", ie);
         }
+
+        return timeLeft;
     }
 
     private void doBeforeProcessing(final ServletRequest request, final 
ServletResponse response)

_______________________________________________
Kernel-commits mailing list
Kernel-commits@sesat.no
http://sesat.no/mailman/listinfo/kernel-commits

Reply via email to