Author: ssmiweve Date: 2008-12-01 14:06:26 +0100 (Mon, 01 Dec 2008) New Revision: 7005
Modified: branches/2.18/war/src/main/java/no/sesat/search/http/filters/SiteLocatorFilter.java Log: Issue SKER4321: (Load reducing filter that handles only current and last request from client) second attempt. this time correctly LIFO. smaller methods. ReentrantLock instead of Semaphore. Modified: branches/2.18/war/src/main/java/no/sesat/search/http/filters/SiteLocatorFilter.java =================================================================== --- branches/2.18/war/src/main/java/no/sesat/search/http/filters/SiteLocatorFilter.java 2008-11-30 21:30:59 UTC (rev 7004) +++ branches/2.18/war/src/main/java/no/sesat/search/http/filters/SiteLocatorFilter.java 2008-12-01 13:06:26 UTC (rev 7005) @@ -32,10 +32,15 @@ import java.util.Properties; import java.util.UUID; import java.text.MessageFormat; +import java.util.Deque; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingDeque; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import javax.servlet.Filter; import javax.servlet.FilterChain; import javax.servlet.FilterConfig; @@ -89,7 +94,7 @@ private static final String UNKNOWN = "unknown"; private static final String USER_REQUEST_QUEUE = "userRequestQueue"; - private static final String USER_REQUEST_SEMAPHORE = "userRequestSemaphore"; + private static final String USER_REQUEST_LOCK = "userRequestSemaphore"; private static final long WAIT_TIME = 5000; private static final int REQUEST_QUEUE_SIZE = 5; @@ -397,81 +402,112 @@ // Private ------------------------------------------------------- - @SuppressWarnings("unchecked") private static void doChainFilter( final FilterChain chain, final ServletRequest request, final ServletResponse response) throws IOException, ServletException { if (request instanceof HttpServletRequest) { - final HttpSession session = ((HttpServletRequest) request).getSession(); + doChainFilter(chain, (HttpServletRequest)request, (HttpServletResponse)response); + } else { + chain.doFilter(request, response); + } + } - // fetch the user's queue - BlockingQueue<ServletRequest> queue - = (BlockingQueue<ServletRequest>) session.getAttribute(USER_REQUEST_QUEUE); + private static void doChainFilter( + final FilterChain chain, + final HttpServletRequest request, + final HttpServletResponse response) throws IOException, ServletException { - // construct queue if necessary - if (null == queue) { - queue = new ArrayBlockingQueue<ServletRequest>(REQUEST_QUEUE_SIZE); - session.setAttribute(USER_REQUEST_QUEUE, queue); - session.setAttribute(USER_REQUEST_SEMAPHORE, new Semaphore(1)); - } + final HttpSession session = request.getSession(); - // queue has a time limit. start counting. - final long start = System.currentTimeMillis(); + // fetch the user's deque + final Deque<ServletRequest> deque = getUsersDeque(session); - // attempt to join queue - if (queue.offer(request)) { + // lock to execute + final ReentrantLock lock = (ReentrantLock) session.getAttribute(USER_REQUEST_LOCK); - // a semphore is used for waiting within the queue. it lets us sleep peacefully until front of queue. - final Semaphore semaphore = (Semaphore) session.getAttribute(USER_REQUEST_SEMAPHORE); + // deque has a time limit. start counting. + long timeLeft = WAIT_TIME; - try { - while(queue.peek() != request){ + // attempt to join deque + if (deque.offerFirst(request)) { + timeLeft = tryLock(request, deque, lock, timeLeft); + } - final long timeLeft = WAIT_TIME - (System.currentTimeMillis() - start); + if(lock.isHeldByCurrentThread()){ - // let's sleep. sleeping too long results in 409 response. - if(!semaphore.tryAcquire(timeLeft, TimeUnit.MILLISECONDS)){ + try{ + // waiting is over. and we can execute + chain.doFilter(request, response); - LOG.warn(" -- response 409 (Timeout: Waited " + (WAIT_TIME - timeLeft) + " ms. )"); - ((HttpServletResponse) response).sendError(HttpServletResponse.SC_CONFLICT); - return; // response is set + }finally{ + // take out of deque first + deque.remove(request); - }else if(queue.peek() != request){ - // we've acquire the semaphore but we're not at front of queue. - // mix up. and probably not possible in the jvm? - // release the semaphore and go back to sleep - semaphore.release(); - LOG.error("acquired semaphore without being at front of queue: " + queue); - } - } + // release the lock, waiting up the next request + lock.unlock(); + } + }else{ + // we failed to execute. return 409 response. + if (response instanceof HttpServletResponse) { - // waiting is over. we can execute. - chain.doFilter(request, response); + LOG.warn(" -- response 409 " + + (0 < timeLeft + ? "(More then " + REQUEST_QUEUE_SIZE + " requests already in queue)" + : "(Timeout: Waited " + WAIT_TIME + " ms)")); - }catch(InterruptedException ie){ - LOG.error("Failed using session's semaphore", ie); + response.sendError(HttpServletResponse.SC_CONFLICT); + } + } + } - }finally { + private static Deque<ServletRequest> getUsersDeque(final HttpSession session){ - // take out of queue first so (queue.peek() != request) is true for the next request. - queue.remove(); - // release the semaphore, waiting up the next request. - semaphore.release(); - } + @SuppressWarnings("unchecked") + Deque<ServletRequest> deque = (BlockingDeque<ServletRequest>) session.getAttribute(USER_REQUEST_QUEUE); - }else{ - // the queue is too long to join. immediately return 409 response. - if (response instanceof HttpServletResponse) { - LOG.warn(" -- response 409 (More then " + REQUEST_QUEUE_SIZE + " request in queue)"); - ((HttpServletResponse) response).sendError(HttpServletResponse.SC_CONFLICT); + // construct deque if necessary + if (null == deque) { + // it may be possible for duplicates across threads to be constructed here + deque = new LinkedBlockingDeque<ServletRequest>(REQUEST_QUEUE_SIZE); + session.setAttribute(USER_REQUEST_QUEUE, deque); + session.setAttribute(USER_REQUEST_LOCK, new ReentrantLock()); + } + + return deque; + } + + private static long tryLock( + final HttpServletRequest request, + final Deque<ServletRequest> deque, + final Lock lock, + long timeLeft){ + + final long start = System.currentTimeMillis(); + + try { + do{ + timeLeft = WAIT_TIME - (System.currentTimeMillis() - start); + + // let's sleep. sleeping too long results in 409 response + if(0 >= timeLeft || !lock.tryLock(timeLeft, TimeUnit.MILLISECONDS)){ + // we timed out or got the lock. waiting is over + break; + + }else if(deque.peek() != request){ + // we've acquired the lock but we're not at front of deque + // release the lock and try again + lock.unlock(); } - } + }while(deque.peek() != request); - } else { - chain.doFilter(request, response); + + }catch(InterruptedException ie){ + LOG.error("Failed using user's lock", ie); } + + return timeLeft; } private void doBeforeProcessing(final ServletRequest request, final ServletResponse response) _______________________________________________ Kernel-commits mailing list Kernel-commits@sesat.no http://sesat.no/mailman/listinfo/kernel-commits