Author: ryan
Date: Tue Jan 6 12:00:33 2009
New Revision: 732087
URL: http://svn.apache.org/viewvc?rev=732087&view=rev
Log:
SOLR-906 -- adding streaming update SolrServer
Added:
lucene/solr/trunk/src/solrj/org/apache/solr/client/solrj/impl/StreamingUpdateSolrServer.java
(with props)
lucene/solr/trunk/src/test/org/apache/solr/client/solrj/embedded/SolrExampleStreamingTest.java
Modified:
lucene/solr/trunk/CHANGES.txt
lucene/solr/trunk/src/solrj/org/apache/solr/client/solrj/request/UpdateRequest.java
Modified: lucene/solr/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/lucene/solr/trunk/CHANGES.txt?rev=732087&r1=732086&r2=732087&view=diff
==============================================================================
--- lucene/solr/trunk/CHANGES.txt (original)
+++ lucene/solr/trunk/CHANGES.txt Tue Jan 6 12:00:33 2009
@@ -132,6 +132,12 @@
For example, hl.fl=*_text will highlight all fieldnames ending with
_text. (Lars Kotthoff via yonik)
+29. SOLR-906: Adding a StreamingUpdateSolrServer that writes update commands
to
+ an open HTTP connection. If you are using solrj for bulk update requests
+ you should consider switching to this implementaion. However, note that
+ the error handling is not immediate as it is with the standard SolrServer.
+ (ryan)
+
Optimizations
----------------------
Added:
lucene/solr/trunk/src/solrj/org/apache/solr/client/solrj/impl/StreamingUpdateSolrServer.java
URL:
http://svn.apache.org/viewvc/lucene/solr/trunk/src/solrj/org/apache/solr/client/solrj/impl/StreamingUpdateSolrServer.java?rev=732087&view=auto
==============================================================================
---
lucene/solr/trunk/src/solrj/org/apache/solr/client/solrj/impl/StreamingUpdateSolrServer.java
(added)
+++
lucene/solr/trunk/src/solrj/org/apache/solr/client/solrj/impl/StreamingUpdateSolrServer.java
Tue Jan 6 12:00:33 2009
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.impl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.net.MalformedURLException;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.httpclient.HttpStatus;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.httpclient.methods.RequestEntity;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.util.ClientUtils;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.params.UpdateParams;
+import org.apache.solr.common.util.NamedList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * StreamingHttpSolrServer buffers all added documents and writes them
+ * into open http connections. This class is thread safe.
+ *
+ * Although any SolrServer request can be made with this implementation,
+ * it is only recommended to use the {...@link StreamingUpdateSolrServer} with
+ * /update requests. The query interface is better suited for
+ *
+ * @version $Id: CommonsHttpSolrServer.java 724175 2008-12-07 19:07:11Z ryan $
+ * @since solr 1.4
+ */
+public class StreamingUpdateSolrServer extends CommonsHttpSolrServer
+{
+ static final Logger log = LoggerFactory.getLogger(
StreamingUpdateSolrServer.class );
+
+ final BlockingQueue<UpdateRequest> queue;
+ final ExecutorService scheduler = Executors.newCachedThreadPool();
+ final String updateUrl = "/update";
+ final Queue<Runner> runners;
+ Lock lock = null; // used to block everything
+ int threadCount = 1;
+
+ public StreamingUpdateSolrServer(String solrServerUrl, int queueSize, int
threadCount ) throws MalformedURLException {
+ super( solrServerUrl );
+ queue = new LinkedBlockingQueue<UpdateRequest>( queueSize );
+ this.threadCount = threadCount;
+ runners = new LinkedList<Runner>();
+ }
+
+ /**
+ * Opens a connection and sends everything...
+ */
+ class Runner implements Runnable {
+ final Lock lock = new ReentrantLock();
+
+ public void run() {
+ lock.lock();
+
+ log.info( "starting runner: {}" , this );
+ PostMethod method = null;
+ try {
+ RequestEntity request = new RequestEntity() {
+ // we don't know the length
+ public long getContentLength() { return -1; }
+ public String getContentType() { return ClientUtils.TEXT_XML; }
+ public boolean isRepeatable() { return false; }
+
+ public void writeRequest(OutputStream out) throws IOException {
+ try {
+ OutputStreamWriter writer = new OutputStreamWriter( out );
+ writer.append( "<stream>" ); // can be anything...
+ UpdateRequest req = queue.poll( 250, TimeUnit.MILLISECONDS );
+ while( req != null ) {
+ log.info( "sending: {}" , req );
+ req.writeXML( writer );
+
+ // check for commit or optimize
+ SolrParams params = req.getParams();
+ if( params != null ) {
+ String fmt = null;
+ if( params.getBool( UpdateParams.OPTIMIZE, false ) ) {
+ fmt = "<optimize waitSearcher=\"%s\" waitFlush=\"%s\" />";
+ }
+ else if( params.getBool( UpdateParams.COMMIT, false ) ) {
+ fmt = "<commit waitSearcher=\"%s\" waitFlush=\"%s\" />";
+ }
+ if( fmt != null ) {
+ log.info( fmt );
+ writer.write( String.format( fmt,
+ params.getBool( UpdateParams.WAIT_SEARCHER, false )+"",
+ params.getBool( UpdateParams.WAIT_FLUSH, false )+"") );
+ }
+ }
+
+ writer.flush();
+ req = queue.poll( 250, TimeUnit.MILLISECONDS );
+ }
+ writer.append( "</stream>" );
+ writer.flush();
+ }
+ catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ method = new PostMethod(_baseURL+updateUrl );
+ method.setRequestEntity( request );
+ method.setFollowRedirects( false );
+ method.addRequestHeader( "User-Agent", AGENT );
+
+ int statusCode = getHttpClient().executeMethod(method);
+ if (statusCode != HttpStatus.SC_OK) {
+ StringBuilder msg = new StringBuilder();
+ msg.append( method.getStatusLine().getReasonPhrase() );
+ msg.append( "\n\n" );
+ msg.append( method.getStatusText() );
+ msg.append( "\n\n" );
+ msg.append( "request: "+method.getURI() );
+ handleError( new Exception( msg.toString() ) );
+ }
+ }
+ catch (Throwable e) {
+ handleError( e );
+ }
+ finally {
+ try {
+ // make sure to release the connection
+ method.releaseConnection();
+ }
+ catch( Exception ex ){}
+ if( !queue.isEmpty() ) {
+ run(); // run again, just in case
+ }
+
+ // remove it from the list of running things...
+ synchronized (runners) {
+ runners.remove( this );
+ }
+ log.info( "finished: {}" , this );
+ lock.unlock();
+ }
+ }
+ }
+
+ @Override
+ public NamedList<Object> request( final SolrRequest request ) throws
SolrServerException, IOException
+ {
+ if( !(request instanceof UpdateRequest) ) {
+ return super.request( request );
+ }
+ UpdateRequest req = (UpdateRequest)request;
+
+ // this happens for commit...
+ if( req.getDocuments()==null || req.getDocuments().isEmpty() ) {
+ blockUntilFinished();
+ return super.request( request );
+ }
+
+ SolrParams params = req.getParams();
+ if( params != null ) {
+ // check if it is waiting for the searcher
+ if( params.getBool( UpdateParams.WAIT_SEARCHER, false ) ) {
+ log.info( "blocking for commit/optimize" );
+ blockUntilFinished(); // empty the queue
+ return super.request( request );
+ }
+ }
+
+
+ if( lock != null ) {
+ lock.lock(); // keep it from adding new commands while we block
+ }
+ try {
+ queue.put( req );
+
+ if( runners.isEmpty()
+ || (queue.remainingCapacity() < queue.size()
+ && runners.size() < threadCount) )
+ {
+ synchronized( runners ) {
+ Runner r = new Runner();
+ scheduler.execute( r );
+ runners.add( r );
+ }
+ }
+ }
+ catch (InterruptedException e) {
+ log.error( "interuped", e );
+ throw new IOException( e.getLocalizedMessage() );
+ }
+ finally {
+ if( lock != null ) {
+ lock.unlock();
+ }
+ }
+
+ // RETURN A DUMMY result
+ NamedList<Object> dummy = new NamedList<Object>();
+ dummy.add( "NOTE", "the request is processed in a background stream" );
+ return dummy;
+ }
+
+ public synchronized void blockUntilFinished()
+ {
+ if( lock == null ) {
+ lock = new ReentrantLock();
+ }
+ lock.lock();
+
+ // Wait until no runners are running
+ Runner runner = runners.peek();
+ while( runner != null ) {
+ runner.lock.lock();
+ runner.lock.unlock();
+ runner = runners.peek();
+ }
+ lock.unlock();
+ lock = null;
+ }
+
+ public void handleError( Throwable ex )
+ {
+ log.error( "error", ex );
+ }
+}
Propchange:
lucene/solr/trunk/src/solrj/org/apache/solr/client/solrj/impl/StreamingUpdateSolrServer.java
------------------------------------------------------------------------------
svn:mergeinfo =
Modified:
lucene/solr/trunk/src/solrj/org/apache/solr/client/solrj/request/UpdateRequest.java
URL:
http://svn.apache.org/viewvc/lucene/solr/trunk/src/solrj/org/apache/solr/client/solrj/request/UpdateRequest.java?rev=732087&r1=732086&r2=732087&view=diff
==============================================================================
---
lucene/solr/trunk/src/solrj/org/apache/solr/client/solrj/request/UpdateRequest.java
(original)
+++
lucene/solr/trunk/src/solrj/org/apache/solr/client/solrj/request/UpdateRequest.java
Tue Jan 6 12:00:33 2009
@@ -19,6 +19,7 @@
import java.io.IOException;
import java.io.StringWriter;
+import java.io.Writer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -171,9 +172,22 @@
public Collection<ContentStream> getContentStreams() throws IOException {
return ClientUtils.toContentStreams( getXML(), ClientUtils.TEXT_XML );
}
-
+
public String getXML() throws IOException {
StringWriter writer = new StringWriter();
+ writeXML( writer );
+ writer.flush();
+
+ // If action is COMMIT or OPTIMIZE, it is sent with params
+ String xml = writer.toString();
+ //System.out.println( "SEND:"+xml );
+ return (xml.length() > 0) ? xml : null;
+ }
+
+ /**
+ * @since solr 1.4
+ */
+ public void writeXML( Writer writer ) throws IOException {
if( documents != null && documents.size() > 0 ) {
if( commitWithin > 0 ) {
writer.write("<add commitWithin=\""+commitWithin+"\">");
@@ -210,11 +224,6 @@
}
writer.append( "</delete>" );
}
-
- // If action is COMMIT or OPTIMIZE, it is sent with params
- String xml = writer.toString();
- //System.out.println( "SEND:"+xml );
- return (xml.length() > 0) ? xml : null;
}
Added:
lucene/solr/trunk/src/test/org/apache/solr/client/solrj/embedded/SolrExampleStreamingTest.java
URL:
http://svn.apache.org/viewvc/lucene/solr/trunk/src/test/org/apache/solr/client/solrj/embedded/SolrExampleStreamingTest.java?rev=732087&view=auto
==============================================================================
---
lucene/solr/trunk/src/test/org/apache/solr/client/solrj/embedded/SolrExampleStreamingTest.java
(added)
+++
lucene/solr/trunk/src/test/org/apache/solr/client/solrj/embedded/SolrExampleStreamingTest.java
Tue Jan 6 12:00:33 2009
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.embedded;
+
+import org.apache.solr.client.solrj.SolrExampleTests;
+import org.apache.solr.client.solrj.SolrServer;
+import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
+import org.apache.solr.client.solrj.impl.StreamingUpdateSolrServer;
+
+
+/**
+ *
+ * @version $Id: SolrExampleJettyTest.java 724175 2008-12-07 19:07:11Z ryan $
+ * @since solr 1.3
+ */
+public class SolrExampleStreamingTest extends SolrExampleTests {
+
+ SolrServer server;
+ JettySolrRunner jetty;
+
+ int port = 0;
+ static final String context = "/example";
+
+ @Override public void setUp() throws Exception
+ {
+ super.setUp();
+
+ jetty = new JettySolrRunner( context, 0 );
+ jetty.start();
+ port = jetty.getLocalPort();
+ System.out.println("Assigned Port#" + port);
+ server = this.createNewSolrServer();
+ }
+
+ @Override public void tearDown() throws Exception
+ {
+ super.tearDown();
+ jetty.stop(); // stop the server
+ }
+
+
+ @Override
+ protected SolrServer getSolrServer()
+ {
+ return server;
+ }
+
+ @Override
+ protected SolrServer createNewSolrServer()
+ {
+ try {
+ // setup the server...
+ String url = "http://localhost:"+port+context; // smaller queue
size hits locks more often
+ CommonsHttpSolrServer s = new StreamingUpdateSolrServer( url, 2, 5 ) {
+ @Override
+ public void handleError(Throwable ex) {
+ // do somethign...
+ }
+ };
+ s.setConnectionTimeout(100); // 1/10th sec
+ s.setDefaultMaxConnectionsPerHost(100);
+ s.setMaxTotalConnections(100);
+ return s;
+ }
+ catch( Exception ex ) {
+ throw new RuntimeException( ex );
+ }
+ }
+}