Author: ssmiweve
Date: 2007-08-20 19:42:59 +0200 (Mon, 20 Aug 2007)
New Revision: 5634

Modified:
   trunk/core-api/src/main/java/no/sesat/search/run/RunningQueryImpl.java
   
trunk/core-api/src/test/java/no/sesat/search/mode/command/AllSearchCommandsTest.java
   
trunk/core-api/src/test/java/no/sesat/search/result/test/MockupSearchCommand.java
   
trunk/search-command-config-spi/src/main/java/no/sesat/search/mode/SearchMode.java
   
trunk/search-command-config-spi/src/main/java/no/sesat/search/mode/config/CommandConfig.java
   
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/command/AbstractSearchCommand.java
   
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/command/SearchCommand.java
   
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/executor/AbstractSearchCommandExecutor.java
   
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/executor/ParallelSearchCommandExecutor.java
   
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/executor/SearchCommandExecutor.java
   
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/executor/SequentialSearchCommandExecutor.java
Log:
SEARCH-2147 - Kernel Security and Quality-of-Service


Modified: trunk/core-api/src/main/java/no/sesat/search/run/RunningQueryImpl.java
===================================================================
--- trunk/core-api/src/main/java/no/sesat/search/run/RunningQueryImpl.java      
2007-08-20 15:55:06 UTC (rev 5633)
+++ trunk/core-api/src/main/java/no/sesat/search/run/RunningQueryImpl.java      
2007-08-20 17:42:59 UTC (rev 5634)
@@ -17,7 +17,6 @@
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
-import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -43,6 +42,7 @@
 import no.sesat.search.mode.SearchCommandFactory;
 import no.sesat.search.mode.config.SearchConfiguration;
 import no.sesat.search.mode.SearchMode;
+import no.sesat.search.mode.executor.SearchCommandExecutor;
 import no.sesat.search.mode.executor.SearchCommandExecutorFactory;
 import no.sesat.search.query.parser.AbstractQueryParserContext;
 import no.sesat.search.query.Query;
@@ -233,8 +233,7 @@
             //  Increment it onwards to SEARCH_COMMAND_CONSTRUCTION.
             dataModelFactory.assignControlLevel(datamodel, 
ControlLevel.SEARCH_COMMAND_CONSTRUCTION);
 
-            final Collection<Callable<ResultList<? extends ResultItem>>> 
commands
-                    = new ArrayList<Callable<ResultList<? extends 
ResultItem>>>();
+            final Collection<SearchCommand> commands = new 
ArrayList<SearchCommand>();
 
             final boolean isRss = parameters.get(PARAM_OUTPUT) != null 
                     && parameters.get(PARAM_OUTPUT).getString().equals("rss");
@@ -358,28 +357,23 @@
             // DataModel's ControlLevel will be SEARCH_COMMAND_CONSTRUCTION
             //  Increment it onwards to SEARCH_COMMAND_CONSTRUCTION.
             dataModelFactory.assignControlLevel(datamodel, 
ControlLevel.SEARCH_COMMAND_EXECUTION);
+            
+            Map<Future<ResultList<? extends ResultItem>>,SearchCommand> 
results = null;
 
-            final Map<Future<ResultList<? extends 
ResultItem>>,Callable<ResultList<? extends ResultItem>>> results =
-                    SearchCommandExecutorFactory
-                    .getController(context.getSearchMode().getExecutor())
-                    .invokeAll(commands, TIMEOUT);
+            try{
+                
+                final SearchCommandExecutor executor = 
SearchCommandExecutorFactory
+                    .getController(context.getSearchMode().getExecutor());  
+                
+                results = executor.waitForAll(executor.invokeAll(commands), 
TIMEOUT);
 
-            // Give the commands a chance to finish its work
-            //  Note the current time and subtract any elapsed time from the 
timeout value
-            //   (as the timeout value is intended overall and not for each).
-            final long invokedAt = System.currentTimeMillis();
-            for (Future<ResultList<? extends ResultItem>> task : 
results.keySet()) {
-                try{
-                    task.get(TIMEOUT - (System.currentTimeMillis() - 
invokedAt), TimeUnit.MILLISECONDS);
-
-                }catch(TimeoutException te){
-                    LOG.error(ERR_COMMAND_TIMEOUT + task);
-                }
+            }catch(TimeoutException te){
+                LOG.error(ERR_COMMAND_TIMEOUT + te.getMessage());
             }
 
-            // Ensure any cancellations are properly handled
-            for(Callable<ResultList<? extends ResultItem>> command : commands){
-                allCancelled &= ((SearchCommand)command).handleCancellation();
+            // Check that we have atleast one valid execution
+            for(SearchCommand command : commands){
+                allCancelled &= command.isCancelled();
             }
 
             // DataModel's ControlLevel will be SEARCH_COMMAND_CONSTRUCTION
@@ -399,8 +393,7 @@
                             if (searchResult != null) {
 
                                 // Information we need about and for the 
enrichment
-                                final SearchConfiguration config
-                                        = 
((SearchCommand)results.get(task)).getSearchConfiguration();
+                                final SearchConfiguration config = 
results.get(task).getSearchConfiguration();
 
                                 final String name = config.getName();
                                 final SearchTab.EnrichmentHint eHint

Modified: 
trunk/core-api/src/test/java/no/sesat/search/mode/command/AllSearchCommandsTest.java
===================================================================
--- 
trunk/core-api/src/test/java/no/sesat/search/mode/command/AllSearchCommandsTest.java
        2007-08-20 15:55:06 UTC (rev 5633)
+++ 
trunk/core-api/src/test/java/no/sesat/search/mode/command/AllSearchCommandsTest.java
        2007-08-20 17:42:59 UTC (rev 5634)
@@ -15,16 +15,11 @@
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Future;
 import no.sesat.search.mode.SearchCommandFactory;
 import no.sesat.search.mode.config.SearchConfiguration;
 import no.sesat.search.mode.executor.SearchCommandExecutorFactory;
-import no.sesat.search.result.ResultItem;
 import no.sesat.search.run.RunningQuery;
-import no.sesat.search.result.ResultList;
 import no.sesat.search.site.SiteKeyedFactoryInstantiationException;
 import no.sesat.search.site.Site;
 import no.sesat.search.site.SiteContext;
@@ -126,8 +121,8 @@
         final RunningTestQuery rq = new RunningTestQuery(rqCxt, query);
         rqCxt.getDataModel().getJunkYard().getValues().put("query", rq);
 
-        final Collection<Callable<ResultList<? extends ResultItem>>> commands 
-                = new ArrayList<Callable<ResultList<? extends ResultItem>>>();
+        final Collection<SearchCommand> commands 
+                = new ArrayList<SearchCommand>();
 
         final SearchCommandFactory.Context commandFactoryContext = new 
SearchCommandFactory.Context() {
             public Site getSite() {
@@ -154,8 +149,7 @@
         }
         try{
 
-            
SearchCommandExecutorFactory.getController(rqCxt.getSearchMode().getExecutor())
-                    .invokeAll(commands, Integer.MAX_VALUE);
+            
SearchCommandExecutorFactory.getController(rqCxt.getSearchMode().getExecutor()).invokeAll(commands);
             
         } catch (InterruptedException ex) {
             throw new AssertionError(ex);

Modified: 
trunk/core-api/src/test/java/no/sesat/search/result/test/MockupSearchCommand.java
===================================================================
--- 
trunk/core-api/src/test/java/no/sesat/search/result/test/MockupSearchCommand.java
   2007-08-20 15:55:06 UTC (rev 5633)
+++ 
trunk/core-api/src/test/java/no/sesat/search/result/test/MockupSearchCommand.java
   2007-08-20 17:42:59 UTC (rev 5634)
@@ -97,4 +97,8 @@
     public boolean handleCancellation() {
         return false;
     }
+    
+    public boolean isCancelled(){
+        return false;
+    }
 }

Modified: 
trunk/search-command-config-spi/src/main/java/no/sesat/search/mode/SearchMode.java
===================================================================
--- 
trunk/search-command-config-spi/src/main/java/no/sesat/search/mode/SearchMode.java
  2007-08-20 15:55:06 UTC (rev 5633)
+++ 
trunk/search-command-config-spi/src/main/java/no/sesat/search/mode/SearchMode.java
  2007-08-20 17:42:59 UTC (rev 5634)
@@ -41,9 +41,8 @@
         /**
          *
          */
-        @Controller("ParallelSearchCommandExecutor")
+        @Controller("ThrottledSearchCommandExecutor")
         PARALLEL;
-
         /**
          *
          */

Modified: 
trunk/search-command-config-spi/src/main/java/no/sesat/search/mode/config/CommandConfig.java
===================================================================
--- 
trunk/search-command-config-spi/src/main/java/no/sesat/search/mode/config/CommandConfig.java
        2007-08-20 15:55:06 UTC (rev 5633)
+++ 
trunk/search-command-config-spi/src/main/java/no/sesat/search/mode/config/CommandConfig.java
        2007-08-20 17:42:59 UTC (rev 5634)
@@ -208,6 +208,7 @@
     }
 
     /** [EMAIL PROTECTED] **/
+    @Override
     public String toString(){
         return getClass().getSimpleName() + " [" + name + "]";
     }

Modified: 
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/command/AbstractSearchCommand.java
===================================================================
--- 
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/command/AbstractSearchCommand.java
      2007-08-20 15:55:06 UTC (rev 5633)
+++ 
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/command/AbstractSearchCommand.java
      2007-08-20 17:42:59 UTC (rev 5634)
@@ -250,7 +250,7 @@
         if (!completed) {
             LOG.error(ERR_HANDLING_CANCELLATION
                     + getSearchConfiguration().getName()
-                    + " [" + getClass().getSimpleName() + "]");
+                    + " [" + getClass().getSimpleName() + ']');
 
             if (null != thread) {
                 thread.interrupt();
@@ -260,6 +260,10 @@
         }
         return !completed;
     }
+    
+    public synchronized boolean isCancelled(){
+        return null == thread && !completed;
+    }
 
     // AbstractReflectionVisitor overrides 
----------------------------------------------
 

Modified: 
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/command/SearchCommand.java
===================================================================
--- 
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/command/SearchCommand.java
      2007-08-20 15:55:06 UTC (rev 5633)
+++ 
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/command/SearchCommand.java
      2007-08-20 17:42:59 UTC (rev 5634)
@@ -43,4 +43,7 @@
      * @return if cleaning was actually performed
      **/
     boolean handleCancellation();
+    
+    boolean isCancelled();
+    
 }

Modified: 
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/executor/AbstractSearchCommandExecutor.java
===================================================================
--- 
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/executor/AbstractSearchCommandExecutor.java
     2007-08-20 15:55:06 UTC (rev 5633)
+++ 
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/executor/AbstractSearchCommandExecutor.java
     2007-08-20 17:42:59 UTC (rev 5634)
@@ -10,9 +10,12 @@
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import no.sesat.search.mode.command.SearchCommand;
 import no.sesat.search.result.ResultItem;
 import no.sesat.search.result.ResultList;
 import org.apache.log4j.Logger;
@@ -46,30 +49,54 @@
     // Public --------------------------------------------------------
 
 
-    public Map<Future<ResultList<? extends ResultItem>>,Callable<ResultList<? 
extends ResultItem>>> invokeAll(
-            Collection<Callable<ResultList<? extends ResultItem>>> callables, 
-            int timeoutInMillis) throws InterruptedException  {
+    public Map<Future<ResultList<? extends ResultItem>>,SearchCommand> 
invokeAll(
+            final Collection<SearchCommand> callables) throws 
InterruptedException  {
 
         LOG.debug(DEBUG_INVOKEALL + getClass().getSimpleName());
         
-        final Map<Future<ResultList<? extends 
ResultItem>>,Callable<ResultList<? extends ResultItem>>> results 
-                = new 
HashMap<Future<ResultList<?>>,Callable<ResultList<?>>>(); 
+        final Map<Future<ResultList<? extends ResultItem>>,SearchCommand> 
results 
+                = new HashMap<Future<ResultList<?>>,SearchCommand>(); 
         
-        final ExecutorService es = getExecutorService();
-                
-        for (Callable<ResultList<?>> c : callables) {
+        for (SearchCommand c : callables) {
+            
+            final ExecutorService es = getExecutorService(c);
             results.put(es.submit(c), c);
         }
+
+        return results;
+    }
+    
+    public Map<Future<ResultList<? extends ResultItem>>,SearchCommand> 
waitForAll(
+            final Map<Future<ResultList<? extends ResultItem>>,SearchCommand> 
results,
+            final int timeoutInMillis) throws InterruptedException, 
TimeoutException, ExecutionException{
         
+        // Give the commands a chance to finish its work
+        //  Note the current time and subtract any elapsed time from the 
timeout value
+        //   (as the timeout value is intended overall and not for each).
+        final long invokedAt = System.currentTimeMillis();
+        for (Future<ResultList<? extends ResultItem>> task : results.keySet()) 
{
+
+            task.get(timeoutInMillis - (System.currentTimeMillis() - 
invokedAt), TimeUnit.MILLISECONDS);
+        }
+        
+        // Ensure any cancellations are properly handled
+        for(SearchCommand command : results.values()){
+            command.handleCancellation();
+        }
+        
         return results;
     }
 
     public void stop() {
         
         LOG.warn("Shutting down thread pool");
-        getExecutorService().shutdownNow();
+        for(ExecutorService service : getExecutorServices()){
+            service.shutdownNow();
+        }
     }
 
-    protected abstract ExecutorService getExecutorService();
+    protected abstract ExecutorService getExecutorService(SearchCommand 
searchCommand);
+    
+    protected abstract Collection<ExecutorService> getExecutorServices();
 
 }

Modified: 
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/executor/ParallelSearchCommandExecutor.java
===================================================================
--- 
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/executor/ParallelSearchCommandExecutor.java
     2007-08-20 15:55:06 UTC (rev 5633)
+++ 
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/executor/ParallelSearchCommandExecutor.java
     2007-08-20 17:42:59 UTC (rev 5634)
@@ -8,10 +8,12 @@
 package no.sesat.search.mode.executor;
 
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.Vector;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -19,6 +21,7 @@
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import no.sesat.search.mode.command.SearchCommand;
 import no.sesat.search.result.ResultItem;
 import no.sesat.search.result.ResultList;
 
@@ -28,7 +31,7 @@
  * @author <a href="mailto:[EMAIL PROTECTED]">Magnus Eklund</a>
  * @version <tt>$Id$</tt>
  */
-public final class ParallelSearchCommandExecutor extends 
AbstractSearchCommandExecutor {
+final class ParallelSearchCommandExecutor extends 
AbstractSearchCommandExecutor {
 
     private static final ExecutorService EXECUTOR = 
Executors.newCachedThreadPool();
             // Alternative to find memory leakages
@@ -45,39 +48,42 @@
     }
     
     @Override
-    public Map<Future<ResultList<? extends ResultItem>>,Callable<ResultList<? 
extends ResultItem>>> invokeAll(
-            Collection<Callable<ResultList<? extends ResultItem>>> callables, 
-            int timeoutInMillis) throws InterruptedException  {
+    public Map<Future<ResultList<? extends ResultItem>>,SearchCommand> 
invokeAll(
+            Collection<SearchCommand> callables) throws InterruptedException  {
 
         
-        if(LOG.isDebugEnabled() && getExecutorService() instanceof 
ThreadPoolExecutor){
-            
-            final ThreadPoolExecutor tpe = 
(ThreadPoolExecutor)getExecutorService();
-            LOG.debug(DEBUG_POOL_COUNT + tpe.getActiveCount() + '/' + 
tpe.getPoolSize());
-            
-            if(tpe instanceof 
ParallelSearchCommandExecutor.DebugThreadPoolExecutor){
-                
-                final ParallelSearchCommandExecutor.DebugThreadPoolExecutor 
dtpe 
-                        = 
(ParallelSearchCommandExecutor.DebugThreadPoolExecutor)tpe;
-                
-                LOG.debug("Still executing...");
-                
-                synchronized( dtpe.EXECUTING ){
-                    for(Runnable r : dtpe.EXECUTING){
-                        try {
-                            LOG.debug(" " + ((FutureTask)r).get());
-                            
-                        } catch (InterruptedException ex) {
-                            LOG.debug(ex);
-                        } catch (ExecutionException ex) {
-                            LOG.debug(ex);
+        if(LOG.isDebugEnabled()){
+            for(SearchCommand command : callables){
+                if( getExecutorService(command) instanceof ThreadPoolExecutor){
+
+                    final ThreadPoolExecutor tpe = 
(ThreadPoolExecutor)getExecutorService(command);
+                    LOG.debug(DEBUG_POOL_COUNT + tpe.getActiveCount() + '/' + 
tpe.getPoolSize());
+
+                    if(tpe instanceof 
ParallelSearchCommandExecutor.DebugThreadPoolExecutor){
+
+                        final 
ParallelSearchCommandExecutor.DebugThreadPoolExecutor dtpe 
+                                = 
(ParallelSearchCommandExecutor.DebugThreadPoolExecutor)tpe;
+
+                        LOG.debug("Still executing...");
+
+                        synchronized( dtpe.EXECUTING ){
+                            for(Runnable r : dtpe.EXECUTING){
+                                try {
+                                    LOG.debug(" " + ((FutureTask)r).get());
+
+                                } catch (InterruptedException ex) {
+                                    LOG.debug(ex);
+                                } catch (ExecutionException ex) {
+                                    LOG.debug(ex);
+                                }
+                            }
                         }
                     }
                 }
             }
         }
         
-        return super.invokeAll(callables, timeoutInMillis);
+        return super.invokeAll(callables);
     }    
 
     /**
@@ -85,13 +91,19 @@
      * @return 
      */
     @Override
-    protected ExecutorService getExecutorService(){
+    protected ExecutorService getExecutorService(final SearchCommand command){
         return EXECUTOR;
     }
     
+    @Override
+    protected Collection<ExecutorService> getExecutorServices(){
+        
+        return null;
+    }
+            
     static class DebugThreadPoolExecutor extends ThreadPoolExecutor{
         
-        final Vector<Runnable> EXECUTING = new Vector<Runnable>();
+        final Collection<Runnable> EXECUTING = new 
ConcurrentSkipListSet<Runnable>();
         
         DebugThreadPoolExecutor(int corePoolSize,
                               int maximumPoolSize,
@@ -102,12 +114,14 @@
             super(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
workQueue);
         }
         
+        @Override
         protected void beforeExecute(final Thread t, final Runnable r) {
             super.beforeExecute(t, r);
             
             EXECUTING.add(r);
         }
 
+        @Override
         protected void afterExecute(final Runnable r, final Throwable t) {
             super.afterExecute(r, t);
             

Modified: 
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/executor/SearchCommandExecutor.java
===================================================================
--- 
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/executor/SearchCommandExecutor.java
     2007-08-20 15:55:06 UTC (rev 5633)
+++ 
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/executor/SearchCommandExecutor.java
     2007-08-20 17:42:59 UTC (rev 5634)
@@ -9,8 +9,10 @@
 
 import java.util.Collection;
 import java.util.Map;
-import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
+import no.sesat.search.mode.command.SearchCommand;
 import no.sesat.search.result.ResultItem;
 import no.sesat.search.result.ResultList;
 
@@ -36,8 +38,11 @@
      * @return the list of Futures holding the results.
      * @throws InterruptedException
      */
-    Map<Future<ResultList<? extends ResultItem>>,Callable<ResultList<? extends 
ResultItem>>> invokeAll(
-            Collection<Callable<ResultList<? extends ResultItem>>> callables, 
-            int timeoutInMillis) throws InterruptedException;
+    Map<Future<ResultList<? extends ResultItem>>,SearchCommand> invokeAll(
+            Collection<SearchCommand> callables) throws InterruptedException;
+    
+    Map<Future<ResultList<? extends ResultItem>>,SearchCommand> waitForAll(
+            final Map<Future<ResultList<? extends ResultItem>>,SearchCommand> 
results,
+            final int timeoutInMillis) throws InterruptedException, 
TimeoutException, ExecutionException;
 
 }

Modified: 
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/executor/SequentialSearchCommandExecutor.java
===================================================================
--- 
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/executor/SequentialSearchCommandExecutor.java
   2007-08-20 15:55:06 UTC (rev 5633)
+++ 
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/executor/SequentialSearchCommandExecutor.java
   2007-08-20 17:42:59 UTC (rev 5634)
@@ -8,8 +8,10 @@
  */
 package no.sesat.search.mode.executor;
 
+import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import no.sesat.search.mode.command.SearchCommand;
 
 /**
  * A simple SearchCommandExecutor that executes the tasks sequentially
@@ -17,11 +19,17 @@
  * @author <a href="mailto:[EMAIL PROTECTED]">Magnus Eklund</a>
  * @version <tt>$Revision$</tt>
  */
-public final class SequentialSearchCommandExecutor extends 
AbstractSearchCommandExecutor {
+final class SequentialSearchCommandExecutor extends 
AbstractSearchCommandExecutor {
 
     private static final ExecutorService EXECUTOR = 
Executors.newSingleThreadExecutor();
+    
+    public SequentialSearchCommandExecutor(){}
 
-    protected ExecutorService getExecutorService() {
+    protected ExecutorService getExecutorService(final SearchCommand command) {
         return EXECUTOR;
     }
+    
+    protected List<ExecutorService> getExecutorServices() {
+        return null;
+    }
 }

_______________________________________________
Kernel-commits mailing list
[email protected]
http://sesat.no/mailman/listinfo/kernel-commits

Reply via email to