Author: ssmiweve
Date: 2007-08-20 19:42:59 +0200 (Mon, 20 Aug 2007)
New Revision: 5634
Modified:
trunk/core-api/src/main/java/no/sesat/search/run/RunningQueryImpl.java
trunk/core-api/src/test/java/no/sesat/search/mode/command/AllSearchCommandsTest.java
trunk/core-api/src/test/java/no/sesat/search/result/test/MockupSearchCommand.java
trunk/search-command-config-spi/src/main/java/no/sesat/search/mode/SearchMode.java
trunk/search-command-config-spi/src/main/java/no/sesat/search/mode/config/CommandConfig.java
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/command/AbstractSearchCommand.java
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/command/SearchCommand.java
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/executor/AbstractSearchCommandExecutor.java
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/executor/ParallelSearchCommandExecutor.java
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/executor/SearchCommandExecutor.java
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/executor/SequentialSearchCommandExecutor.java
Log:
SEARCH-2147 - Kernel Security and Quality-of-Service
Modified: trunk/core-api/src/main/java/no/sesat/search/run/RunningQueryImpl.java
===================================================================
--- trunk/core-api/src/main/java/no/sesat/search/run/RunningQueryImpl.java
2007-08-20 15:55:06 UTC (rev 5633)
+++ trunk/core-api/src/main/java/no/sesat/search/run/RunningQueryImpl.java
2007-08-20 17:42:59 UTC (rev 5634)
@@ -17,7 +17,6 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
-import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -43,6 +42,7 @@
import no.sesat.search.mode.SearchCommandFactory;
import no.sesat.search.mode.config.SearchConfiguration;
import no.sesat.search.mode.SearchMode;
+import no.sesat.search.mode.executor.SearchCommandExecutor;
import no.sesat.search.mode.executor.SearchCommandExecutorFactory;
import no.sesat.search.query.parser.AbstractQueryParserContext;
import no.sesat.search.query.Query;
@@ -233,8 +233,7 @@
// Increment it onwards to SEARCH_COMMAND_CONSTRUCTION.
dataModelFactory.assignControlLevel(datamodel,
ControlLevel.SEARCH_COMMAND_CONSTRUCTION);
- final Collection<Callable<ResultList<? extends ResultItem>>>
commands
- = new ArrayList<Callable<ResultList<? extends
ResultItem>>>();
+ final Collection<SearchCommand> commands = new
ArrayList<SearchCommand>();
final boolean isRss = parameters.get(PARAM_OUTPUT) != null
&& parameters.get(PARAM_OUTPUT).getString().equals("rss");
@@ -358,28 +357,23 @@
// DataModel's ControlLevel will be SEARCH_COMMAND_CONSTRUCTION
// Increment it onwards to SEARCH_COMMAND_CONSTRUCTION.
dataModelFactory.assignControlLevel(datamodel,
ControlLevel.SEARCH_COMMAND_EXECUTION);
+
+ Map<Future<ResultList<? extends ResultItem>>,SearchCommand>
results = null;
- final Map<Future<ResultList<? extends
ResultItem>>,Callable<ResultList<? extends ResultItem>>> results =
- SearchCommandExecutorFactory
- .getController(context.getSearchMode().getExecutor())
- .invokeAll(commands, TIMEOUT);
+ try{
+
+ final SearchCommandExecutor executor =
SearchCommandExecutorFactory
+ .getController(context.getSearchMode().getExecutor());
+
+ results = executor.waitForAll(executor.invokeAll(commands),
TIMEOUT);
- // Give the commands a chance to finish its work
- // Note the current time and subtract any elapsed time from the
timeout value
- // (as the timeout value is intended overall and not for each).
- final long invokedAt = System.currentTimeMillis();
- for (Future<ResultList<? extends ResultItem>> task :
results.keySet()) {
- try{
- task.get(TIMEOUT - (System.currentTimeMillis() -
invokedAt), TimeUnit.MILLISECONDS);
-
- }catch(TimeoutException te){
- LOG.error(ERR_COMMAND_TIMEOUT + task);
- }
+ }catch(TimeoutException te){
+ LOG.error(ERR_COMMAND_TIMEOUT + te.getMessage());
}
- // Ensure any cancellations are properly handled
- for(Callable<ResultList<? extends ResultItem>> command : commands){
- allCancelled &= ((SearchCommand)command).handleCancellation();
+ // Check that we have atleast one valid execution
+ for(SearchCommand command : commands){
+ allCancelled &= command.isCancelled();
}
// DataModel's ControlLevel will be SEARCH_COMMAND_CONSTRUCTION
@@ -399,8 +393,7 @@
if (searchResult != null) {
// Information we need about and for the
enrichment
- final SearchConfiguration config
- =
((SearchCommand)results.get(task)).getSearchConfiguration();
+ final SearchConfiguration config =
results.get(task).getSearchConfiguration();
final String name = config.getName();
final SearchTab.EnrichmentHint eHint
Modified:
trunk/core-api/src/test/java/no/sesat/search/mode/command/AllSearchCommandsTest.java
===================================================================
---
trunk/core-api/src/test/java/no/sesat/search/mode/command/AllSearchCommandsTest.java
2007-08-20 15:55:06 UTC (rev 5633)
+++
trunk/core-api/src/test/java/no/sesat/search/mode/command/AllSearchCommandsTest.java
2007-08-20 17:42:59 UTC (rev 5634)
@@ -15,16 +15,11 @@
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Future;
import no.sesat.search.mode.SearchCommandFactory;
import no.sesat.search.mode.config.SearchConfiguration;
import no.sesat.search.mode.executor.SearchCommandExecutorFactory;
-import no.sesat.search.result.ResultItem;
import no.sesat.search.run.RunningQuery;
-import no.sesat.search.result.ResultList;
import no.sesat.search.site.SiteKeyedFactoryInstantiationException;
import no.sesat.search.site.Site;
import no.sesat.search.site.SiteContext;
@@ -126,8 +121,8 @@
final RunningTestQuery rq = new RunningTestQuery(rqCxt, query);
rqCxt.getDataModel().getJunkYard().getValues().put("query", rq);
- final Collection<Callable<ResultList<? extends ResultItem>>> commands
- = new ArrayList<Callable<ResultList<? extends ResultItem>>>();
+ final Collection<SearchCommand> commands
+ = new ArrayList<SearchCommand>();
final SearchCommandFactory.Context commandFactoryContext = new
SearchCommandFactory.Context() {
public Site getSite() {
@@ -154,8 +149,7 @@
}
try{
-
SearchCommandExecutorFactory.getController(rqCxt.getSearchMode().getExecutor())
- .invokeAll(commands, Integer.MAX_VALUE);
+
SearchCommandExecutorFactory.getController(rqCxt.getSearchMode().getExecutor()).invokeAll(commands);
} catch (InterruptedException ex) {
throw new AssertionError(ex);
Modified:
trunk/core-api/src/test/java/no/sesat/search/result/test/MockupSearchCommand.java
===================================================================
---
trunk/core-api/src/test/java/no/sesat/search/result/test/MockupSearchCommand.java
2007-08-20 15:55:06 UTC (rev 5633)
+++
trunk/core-api/src/test/java/no/sesat/search/result/test/MockupSearchCommand.java
2007-08-20 17:42:59 UTC (rev 5634)
@@ -97,4 +97,8 @@
public boolean handleCancellation() {
return false;
}
+
+ public boolean isCancelled(){
+ return false;
+ }
}
Modified:
trunk/search-command-config-spi/src/main/java/no/sesat/search/mode/SearchMode.java
===================================================================
---
trunk/search-command-config-spi/src/main/java/no/sesat/search/mode/SearchMode.java
2007-08-20 15:55:06 UTC (rev 5633)
+++
trunk/search-command-config-spi/src/main/java/no/sesat/search/mode/SearchMode.java
2007-08-20 17:42:59 UTC (rev 5634)
@@ -41,9 +41,8 @@
/**
*
*/
- @Controller("ParallelSearchCommandExecutor")
+ @Controller("ThrottledSearchCommandExecutor")
PARALLEL;
-
/**
*
*/
Modified:
trunk/search-command-config-spi/src/main/java/no/sesat/search/mode/config/CommandConfig.java
===================================================================
---
trunk/search-command-config-spi/src/main/java/no/sesat/search/mode/config/CommandConfig.java
2007-08-20 15:55:06 UTC (rev 5633)
+++
trunk/search-command-config-spi/src/main/java/no/sesat/search/mode/config/CommandConfig.java
2007-08-20 17:42:59 UTC (rev 5634)
@@ -208,6 +208,7 @@
}
/** [EMAIL PROTECTED] **/
+ @Override
public String toString(){
return getClass().getSimpleName() + " [" + name + "]";
}
Modified:
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/command/AbstractSearchCommand.java
===================================================================
---
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/command/AbstractSearchCommand.java
2007-08-20 15:55:06 UTC (rev 5633)
+++
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/command/AbstractSearchCommand.java
2007-08-20 17:42:59 UTC (rev 5634)
@@ -250,7 +250,7 @@
if (!completed) {
LOG.error(ERR_HANDLING_CANCELLATION
+ getSearchConfiguration().getName()
- + " [" + getClass().getSimpleName() + "]");
+ + " [" + getClass().getSimpleName() + ']');
if (null != thread) {
thread.interrupt();
@@ -260,6 +260,10 @@
}
return !completed;
}
+
+ public synchronized boolean isCancelled(){
+ return null == thread && !completed;
+ }
// AbstractReflectionVisitor overrides
----------------------------------------------
Modified:
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/command/SearchCommand.java
===================================================================
---
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/command/SearchCommand.java
2007-08-20 15:55:06 UTC (rev 5633)
+++
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/command/SearchCommand.java
2007-08-20 17:42:59 UTC (rev 5634)
@@ -43,4 +43,7 @@
* @return if cleaning was actually performed
**/
boolean handleCancellation();
+
+ boolean isCancelled();
+
}
Modified:
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/executor/AbstractSearchCommandExecutor.java
===================================================================
---
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/executor/AbstractSearchCommandExecutor.java
2007-08-20 15:55:06 UTC (rev 5633)
+++
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/executor/AbstractSearchCommandExecutor.java
2007-08-20 17:42:59 UTC (rev 5634)
@@ -10,9 +10,12 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import no.sesat.search.mode.command.SearchCommand;
import no.sesat.search.result.ResultItem;
import no.sesat.search.result.ResultList;
import org.apache.log4j.Logger;
@@ -46,30 +49,54 @@
// Public --------------------------------------------------------
- public Map<Future<ResultList<? extends ResultItem>>,Callable<ResultList<?
extends ResultItem>>> invokeAll(
- Collection<Callable<ResultList<? extends ResultItem>>> callables,
- int timeoutInMillis) throws InterruptedException {
+ public Map<Future<ResultList<? extends ResultItem>>,SearchCommand>
invokeAll(
+ final Collection<SearchCommand> callables) throws
InterruptedException {
LOG.debug(DEBUG_INVOKEALL + getClass().getSimpleName());
- final Map<Future<ResultList<? extends
ResultItem>>,Callable<ResultList<? extends ResultItem>>> results
- = new
HashMap<Future<ResultList<?>>,Callable<ResultList<?>>>();
+ final Map<Future<ResultList<? extends ResultItem>>,SearchCommand>
results
+ = new HashMap<Future<ResultList<?>>,SearchCommand>();
- final ExecutorService es = getExecutorService();
-
- for (Callable<ResultList<?>> c : callables) {
+ for (SearchCommand c : callables) {
+
+ final ExecutorService es = getExecutorService(c);
results.put(es.submit(c), c);
}
+
+ return results;
+ }
+
+ public Map<Future<ResultList<? extends ResultItem>>,SearchCommand>
waitForAll(
+ final Map<Future<ResultList<? extends ResultItem>>,SearchCommand>
results,
+ final int timeoutInMillis) throws InterruptedException,
TimeoutException, ExecutionException{
+ // Give the commands a chance to finish its work
+ // Note the current time and subtract any elapsed time from the
timeout value
+ // (as the timeout value is intended overall and not for each).
+ final long invokedAt = System.currentTimeMillis();
+ for (Future<ResultList<? extends ResultItem>> task : results.keySet())
{
+
+ task.get(timeoutInMillis - (System.currentTimeMillis() -
invokedAt), TimeUnit.MILLISECONDS);
+ }
+
+ // Ensure any cancellations are properly handled
+ for(SearchCommand command : results.values()){
+ command.handleCancellation();
+ }
+
return results;
}
public void stop() {
LOG.warn("Shutting down thread pool");
- getExecutorService().shutdownNow();
+ for(ExecutorService service : getExecutorServices()){
+ service.shutdownNow();
+ }
}
- protected abstract ExecutorService getExecutorService();
+ protected abstract ExecutorService getExecutorService(SearchCommand
searchCommand);
+
+ protected abstract Collection<ExecutorService> getExecutorServices();
}
Modified:
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/executor/ParallelSearchCommandExecutor.java
===================================================================
---
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/executor/ParallelSearchCommandExecutor.java
2007-08-20 15:55:06 UTC (rev 5633)
+++
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/executor/ParallelSearchCommandExecutor.java
2007-08-20 17:42:59 UTC (rev 5634)
@@ -8,10 +8,12 @@
package no.sesat.search.mode.executor;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.Vector;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -19,6 +21,7 @@
import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import no.sesat.search.mode.command.SearchCommand;
import no.sesat.search.result.ResultItem;
import no.sesat.search.result.ResultList;
@@ -28,7 +31,7 @@
* @author <a href="mailto:[EMAIL PROTECTED]">Magnus Eklund</a>
* @version <tt>$Id$</tt>
*/
-public final class ParallelSearchCommandExecutor extends
AbstractSearchCommandExecutor {
+final class ParallelSearchCommandExecutor extends
AbstractSearchCommandExecutor {
private static final ExecutorService EXECUTOR =
Executors.newCachedThreadPool();
// Alternative to find memory leakages
@@ -45,39 +48,42 @@
}
@Override
- public Map<Future<ResultList<? extends ResultItem>>,Callable<ResultList<?
extends ResultItem>>> invokeAll(
- Collection<Callable<ResultList<? extends ResultItem>>> callables,
- int timeoutInMillis) throws InterruptedException {
+ public Map<Future<ResultList<? extends ResultItem>>,SearchCommand>
invokeAll(
+ Collection<SearchCommand> callables) throws InterruptedException {
- if(LOG.isDebugEnabled() && getExecutorService() instanceof
ThreadPoolExecutor){
-
- final ThreadPoolExecutor tpe =
(ThreadPoolExecutor)getExecutorService();
- LOG.debug(DEBUG_POOL_COUNT + tpe.getActiveCount() + '/' +
tpe.getPoolSize());
-
- if(tpe instanceof
ParallelSearchCommandExecutor.DebugThreadPoolExecutor){
-
- final ParallelSearchCommandExecutor.DebugThreadPoolExecutor
dtpe
- =
(ParallelSearchCommandExecutor.DebugThreadPoolExecutor)tpe;
-
- LOG.debug("Still executing...");
-
- synchronized( dtpe.EXECUTING ){
- for(Runnable r : dtpe.EXECUTING){
- try {
- LOG.debug(" " + ((FutureTask)r).get());
-
- } catch (InterruptedException ex) {
- LOG.debug(ex);
- } catch (ExecutionException ex) {
- LOG.debug(ex);
+ if(LOG.isDebugEnabled()){
+ for(SearchCommand command : callables){
+ if( getExecutorService(command) instanceof ThreadPoolExecutor){
+
+ final ThreadPoolExecutor tpe =
(ThreadPoolExecutor)getExecutorService(command);
+ LOG.debug(DEBUG_POOL_COUNT + tpe.getActiveCount() + '/' +
tpe.getPoolSize());
+
+ if(tpe instanceof
ParallelSearchCommandExecutor.DebugThreadPoolExecutor){
+
+ final
ParallelSearchCommandExecutor.DebugThreadPoolExecutor dtpe
+ =
(ParallelSearchCommandExecutor.DebugThreadPoolExecutor)tpe;
+
+ LOG.debug("Still executing...");
+
+ synchronized( dtpe.EXECUTING ){
+ for(Runnable r : dtpe.EXECUTING){
+ try {
+ LOG.debug(" " + ((FutureTask)r).get());
+
+ } catch (InterruptedException ex) {
+ LOG.debug(ex);
+ } catch (ExecutionException ex) {
+ LOG.debug(ex);
+ }
+ }
}
}
}
}
}
- return super.invokeAll(callables, timeoutInMillis);
+ return super.invokeAll(callables);
}
/**
@@ -85,13 +91,19 @@
* @return
*/
@Override
- protected ExecutorService getExecutorService(){
+ protected ExecutorService getExecutorService(final SearchCommand command){
return EXECUTOR;
}
+ @Override
+ protected Collection<ExecutorService> getExecutorServices(){
+
+ return null;
+ }
+
static class DebugThreadPoolExecutor extends ThreadPoolExecutor{
- final Vector<Runnable> EXECUTING = new Vector<Runnable>();
+ final Collection<Runnable> EXECUTING = new
ConcurrentSkipListSet<Runnable>();
DebugThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
@@ -102,12 +114,14 @@
super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
workQueue);
}
+ @Override
protected void beforeExecute(final Thread t, final Runnable r) {
super.beforeExecute(t, r);
EXECUTING.add(r);
}
+ @Override
protected void afterExecute(final Runnable r, final Throwable t) {
super.afterExecute(r, t);
Modified:
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/executor/SearchCommandExecutor.java
===================================================================
---
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/executor/SearchCommandExecutor.java
2007-08-20 15:55:06 UTC (rev 5633)
+++
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/executor/SearchCommandExecutor.java
2007-08-20 17:42:59 UTC (rev 5634)
@@ -9,8 +9,10 @@
import java.util.Collection;
import java.util.Map;
-import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
+import no.sesat.search.mode.command.SearchCommand;
import no.sesat.search.result.ResultItem;
import no.sesat.search.result.ResultList;
@@ -36,8 +38,11 @@
* @return the list of Futures holding the results.
* @throws InterruptedException
*/
- Map<Future<ResultList<? extends ResultItem>>,Callable<ResultList<? extends
ResultItem>>> invokeAll(
- Collection<Callable<ResultList<? extends ResultItem>>> callables,
- int timeoutInMillis) throws InterruptedException;
+ Map<Future<ResultList<? extends ResultItem>>,SearchCommand> invokeAll(
+ Collection<SearchCommand> callables) throws InterruptedException;
+
+ Map<Future<ResultList<? extends ResultItem>>,SearchCommand> waitForAll(
+ final Map<Future<ResultList<? extends ResultItem>>,SearchCommand>
results,
+ final int timeoutInMillis) throws InterruptedException,
TimeoutException, ExecutionException;
}
Modified:
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/executor/SequentialSearchCommandExecutor.java
===================================================================
---
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/executor/SequentialSearchCommandExecutor.java
2007-08-20 15:55:06 UTC (rev 5633)
+++
trunk/search-command-control-spi/src/main/java/no/sesat/search/mode/executor/SequentialSearchCommandExecutor.java
2007-08-20 17:42:59 UTC (rev 5634)
@@ -8,8 +8,10 @@
*/
package no.sesat.search.mode.executor;
+import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import no.sesat.search.mode.command.SearchCommand;
/**
* A simple SearchCommandExecutor that executes the tasks sequentially
@@ -17,11 +19,17 @@
* @author <a href="mailto:[EMAIL PROTECTED]">Magnus Eklund</a>
* @version <tt>$Revision$</tt>
*/
-public final class SequentialSearchCommandExecutor extends
AbstractSearchCommandExecutor {
+final class SequentialSearchCommandExecutor extends
AbstractSearchCommandExecutor {
private static final ExecutorService EXECUTOR =
Executors.newSingleThreadExecutor();
+
+ public SequentialSearchCommandExecutor(){}
- protected ExecutorService getExecutorService() {
+ protected ExecutorService getExecutorService(final SearchCommand command) {
return EXECUTOR;
}
+
+ protected List<ExecutorService> getExecutorServices() {
+ return null;
+ }
}
_______________________________________________
Kernel-commits mailing list
[email protected]
http://sesat.no/mailman/listinfo/kernel-commits