[ 
https://issues.apache.org/jira/browse/SOLR-1143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12749310#action_12749310
 ] 

Martijn van Groningen commented on SOLR-1143:
---------------------------------------------

I think we need to bird's-eye view at the partial results solution, so that we 
can hook in the partial results behaviour at the right place. This is quiet a 
long comment, but first I will describe how I think that distributed search 
works and then propose a solution. In think that this solutions is better than 
the current one in the patch.

>From my understanding the distributed search in the trunk currently works as 
>follows:
1) When it has been determined that the a search request is a multi shard 
request an instance of HttpCommComponent is created and outgoing and finished 
lists are initialised. Also the nextStage is set to zero.
2) The ResponseBuilder's stage is set to the nextStage and the nextStage is set 
to stage done. The distributedProcess(...) method is invoked on each search 
component. Each search component can add ShardRequests to the outgoing list in 
the ResponseBuilder. Besides adding ShardRequests, a search component also 
returns a stage. The lowest stage from all search components will end up to be 
the next stage.
{code:java}
// call all components
for( SearchComponent c : components ) {
        // the next stage is the minimum of what all components report
        nextStage = Math.min(nextStage, c.distributedProcess(rb));
}
{code}
3a) Next step is to send all the ShardRequests from ResponseBuilder's output 
list to the shards. First a ShardRequest is taken and removed from the 
ResponseBuilder's output list, then the actual shards are determined for the 
current ShardRequest. 
{code:java}ShardRequest sreq = rb.outgoing.remove(0);{code}
It checks if for the current overall search request the shards are specified 
and than use. If that is not the case the predefined shards become the actual 
shards.  
{code:java}
sreq.actualShards = sreq.shards;
if (sreq.actualShards==ShardRequest.ALL_SHARDS) {
        sreq.actualShards = rb.shards;
}
{code}
3b)Now that the actual shards are known, a request can be sent to each 
individual shard. The actual sending of the request is done by the 
HttpCommComponent.submit(...) method. Before the request is sent, a new 
SolrParams is constructed based on the overall search request parameters. But 
with some parameters removed and some parameters added. Then the SolrParams is 
given to HttpCommComponent.submit(...) method as a argument and is used to 
create a QueryRequest. In the HttpCommComponent.submit(...) a Callable is 
instantiated to handle sending request to a shard and receiving a response in 
an asynchronised manner. 

In the takes's call() method the actual request (QueryRequest) is created, that 
will be send to a shard. Also in this method the response is received and if an 
exception occurred, it is set on the shard response. The callable is then 
submitted to the completionService's submit method. The submit methods returns 
a Future that is then added to a set of futures named pending.From my 
understanding this pending list of futures is only used to keep track of how 
many request were send and to cancel a request when an exception occurred. 

4) When the request are sent for a stage, the next step is to receive the 
response for each shard request that has been sent. The 
comm.takeCompletedOrError() returns a shard response. It first checks if an 
exception was set on the response, if so the search is aborted and the 
exception is re-thrown. If all went well, then the request of the shard 
response is added to a list of successful request named finished. After that, 
the SearchComponent's handleResponses(...) method is invoked that allows the 
search components to inspect the shard response and perhaps do something with 
it. The behaviour is repeated until comm.takeCompletedOrError() returns null, 
which means that all response for the current stage were retrieved. 

The comm.takeCompletedOrError() handles each response from the shards 
individually (sub ShardRequest). It uses the completionService's take() method 
that get a future and uses that to remove that same instance for the pending 
set. Then the method get is invoked on the future and the response is returned. 
If the response contains an exception then the response is immediately 
returned. When the response does not contain a exception it is added to the 
responses of the ShardRequest. When the number of responses in the ShardRequest 
is equal to the number of shards then the last response from the get() method 
of Future is returned (it contains the ShardRequest that contains all the 
responses).      

5) When all request were sent and response were received, on each search 
component the finishStage(...) method is executed. This allows components to 
execute some custom logic that is only possible if all shard requests are 
collected. When that is done it checks if the current stage is not equal to 
stage done. It then continues with step 2 till 5, until the stage finish is the 
current stage. That indicates that the distributed search is finished and the 
response can be written to the client. 

I think the best way to handle shard failures in my opinion is by not sending a 
request to a shard that has failed. I think the best way to implement that is 
by doing the following:
1) Currently ShardRequest has a property actualShards that is a string array of 
shard host names. Let say we create a Shard data type that contains a string 
hostname and a boolean failed as properties. The actualShards property will be 
changed to this Shard data type.
2) In phase 4 when we discover that a ShardRequest failed we need to mark a 
shard as failed. Therefore the take() or takeCompletedOrError() need store the 
shard hostname with the exception. In the handleRequestBody we then check if 
one or more exceptions / hostnames were set, if so we mark those hostnames in 
ShardRequest as failed.
3) In phase 3b we only invoke HttpCommComponent.submit(...) on the shards that 
are not marked as failed. 
Something like this:
{code:java}
for (Shard shard : sreq.actualShards) {
        if (shard.hasFailed()) {
                continue;
        }
        ModifiableSolrParams params = new ModifiableSolrParams(sreq.params);
        params.remove(ShardParams.SHARDS);      // not a top-level request
        params.remove("indent");
        params.remove(CommonParams.HEADER_ECHO_PARAMS);
        params.set(ShardParams.IS_SHARD, true);  // a sub (shard) request
        String shardHandler = req.getParams().get(ShardParams.SHARDS_QT);
        if (shardHandler == null) {
                params.remove(CommonParams.QT);
        else {
                params.set(CommonParams.QT, shardHandler);
        }
        comm.submit(sreq, shard.getHostname(), params);
}
{code}

I think that this approach is much more efficient than the current approach, 
because no request is sent to the failed shard and thus HttpClient does not try 
to make a connection to a shard that would not response properly anyway. I 
think implementing this solution is not that much work. What are your thoughts 
about this approach?


> Return partial results when a connection to a shard is refused
> --------------------------------------------------------------
>
>                 Key: SOLR-1143
>                 URL: https://issues.apache.org/jira/browse/SOLR-1143
>             Project: Solr
>          Issue Type: Improvement
>          Components: search
>            Reporter: Nicolas Dessaigne
>            Assignee: Grant Ingersoll
>             Fix For: 1.4
>
>         Attachments: SOLR-1143-2.patch, SOLR-1143-3.patch, SOLR-1143.patch
>
>
> If any shard is down in a distributed search, a ConnectException it thrown.
> Here's a little patch that change this behaviour: if we can't connect to a 
> shard (ConnectException), we get partial results from the active shards. As 
> for TimeOut parameter (https://issues.apache.org/jira/browse/SOLR-502), we 
> set the parameter "partialResults" at true.
> This patch also adresses a problem expressed in the mailing list about a year 
> ago 
> (http://www.nabble.com/partialResults,-distributed-search---SOLR-502-td19002610.html)
> We have a use case that needs this behaviour and we would like to know your 
> thougths about such a behaviour? Should it be the default behaviour for 
> distributed search?

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to