Author: shalin
Date: Tue Nov 25 06:19:14 2008
New Revision: 720502
URL: http://svn.apache.org/viewvc?rev=720502&view=rev
Log:
SOLR-829 -- Allow slaves to request compressed files from master during
replication
Modified:
lucene/solr/trunk/CHANGES.txt
lucene/solr/trunk/example/solr/conf/solrconfig.xml
lucene/solr/trunk/src/java/org/apache/solr/handler/ReplicationHandler.java
lucene/solr/trunk/src/java/org/apache/solr/handler/SnapPuller.java
lucene/solr/trunk/src/webapp/web/admin/replication/index.jsp
Modified: lucene/solr/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/lucene/solr/trunk/CHANGES.txt?rev=720502&r1=720501&r2=720502&view=diff
==============================================================================
--- lucene/solr/trunk/CHANGES.txt (original)
+++ lucene/solr/trunk/CHANGES.txt Tue Nov 25 06:19:14 2008
@@ -86,7 +86,10 @@
before Tokenizer/TokenFilters. (koji)
16. SOLR-868: Adding solrjs as a contrib package: contrib/javascript.
- (Matthias Epheser via ryan)
+ (Matthias Epheser via ryan)
+
+17. SOLR-829: Allow slaves to request compressed files from master during
replication
+ (Simon Collins, Noble Paul, Akshay Ukey via shalin)
Optimizations
Modified: lucene/solr/trunk/example/solr/conf/solrconfig.xml
URL:
http://svn.apache.org/viewvc/lucene/solr/trunk/example/solr/conf/solrconfig.xml?rev=720502&r1=720501&r2=720502&view=diff
==============================================================================
--- lucene/solr/trunk/example/solr/conf/solrconfig.xml (original)
+++ lucene/solr/trunk/example/solr/conf/solrconfig.xml Tue Nov 25 06:19:14 2008
@@ -403,6 +403,7 @@
</lst>
</requestHandler>
+<!-- Please refer to http://wiki.apache.org/solr/SolrReplication for details
on configuring replication -->
<!--Master config-->
<!--
<requestHandler name="/replication" class="solr.ReplicationHandler" >
@@ -414,9 +415,9 @@
-->
<!-- Slave config-->
<!--
-<requestHandler name="/replication" class="solr.ReplicationHandler" >
+<requestHandler name="/replication" class="solr.ReplicationHandler">
<lst name="slave">
- <str name="masterUrl">http://localhost:8983/solr/replication</str>
+ <str name="masterUrl">http://localhost:8983/solr/replication</str>
<str name="pollInterval">00:00:60</str>
</lst>
</requestHandler>
Modified:
lucene/solr/trunk/src/java/org/apache/solr/handler/ReplicationHandler.java
URL:
http://svn.apache.org/viewvc/lucene/solr/trunk/src/java/org/apache/solr/handler/ReplicationHandler.java?rev=720502&r1=720501&r2=720502&view=diff
==============================================================================
--- lucene/solr/trunk/src/java/org/apache/solr/handler/ReplicationHandler.java
(original)
+++ lucene/solr/trunk/src/java/org/apache/solr/handler/ReplicationHandler.java
Tue Nov 25 06:19:14 2008
@@ -45,6 +45,7 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.zip.Adler32;
import java.util.zip.Checksum;
+import java.util.zip.DeflaterOutputStream;
/**
* <p> A Handler which provides a REST API for replication and serves
replication requests from Slaves.
@@ -126,7 +127,12 @@
} else if (command.equals(CMD_SNAP_SHOOT)) {
doSnapShoot(rsp);
} else if (command.equals(CMD_SNAP_PULL)) {
- doSnapPull();
+ new Thread() {
+ public void run() {
+ doSnapPull();
+ }
+ }.start();
+ rsp.add("status", "OK");
} else if (command.equals(CMD_DISABLE_POLL)) {
if (snapPuller != null)
snapPuller.disablePoll();
@@ -501,16 +507,11 @@
closeNoExp(inFile);
}
- HttpClient client = null;
try {
- client = new HttpClient();
- NamedList nl = snapPuller.getCommandResponse(client, CMD_DETAILS);
+ NamedList nl = snapPuller.getCommandResponse(CMD_DETAILS);
details.add("masterDetails", nl.get(CMD_DETAILS));
} catch (IOException e) {
LOG.warn("Exception while invoking a 'details' method on master ", e);
- } finally {
- if (client != null)
- client.getHttpConnectionManager().closeIdleConnections(0);
}
details.add(MASTER_URL, snapPuller.getMasterUrl());
if (snapPuller.getPollInterval() != null) {
@@ -776,15 +777,20 @@
delPolicy = core.getDeletionPolicy();
}
- public void write(OutputStream out) {
- fos = new FastOutputStream(out);
+ public void write(OutputStream out) throws IOException {
String fileName = params.get(FILE);
String cfileName = params.get(CONF_FILE_SHORT);
String sOffset = params.get(OFFSET);
String sLen = params.get(LEN);
+ String compress = params.get(COMPRESSION);
String sChecksum = params.get(CHECKSUM);
String sindexVersion = params.get(CMD_INDEX_VERSION);
if (sindexVersion != null) indexVersion = Long.parseLong(sindexVersion);
+ if (Boolean.parseBoolean(compress)) {
+ fos = new FastOutputStream(new DeflaterOutputStream(out));
+ } else {
+ fos = new FastOutputStream(out);
+ }
FileInputStream inputStream = null;
int packetsWritten = 0;
try {
@@ -918,4 +924,10 @@
public static final String RESERVE = "commitReserveDuration";
+ public static final String COMPRESSION = "compression";
+
+ public static final String EXTERNAL = "external";
+
+ public static final String INTERNAL = "internal";
+
}
Modified: lucene/solr/trunk/src/java/org/apache/solr/handler/SnapPuller.java
URL:
http://svn.apache.org/viewvc/lucene/solr/trunk/src/java/org/apache/solr/handler/SnapPuller.java?rev=720502&r1=720501&r2=720502&view=diff
==============================================================================
--- lucene/solr/trunk/src/java/org/apache/solr/handler/SnapPuller.java
(original)
+++ lucene/solr/trunk/src/java/org/apache/solr/handler/SnapPuller.java Tue Nov
25 06:19:14 2008
@@ -16,9 +16,7 @@
*/
package org.apache.solr.handler;
-import org.apache.commons.httpclient.HttpClient;
-import org.apache.commons.httpclient.HttpStatus;
-import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
+import org.apache.commons.httpclient.*;
import org.apache.commons.httpclient.methods.PostMethod;
import org.apache.lucene.index.IndexCommit;
import org.apache.solr.common.SolrException;
@@ -46,6 +44,8 @@
import java.util.regex.Pattern;
import java.util.zip.Adler32;
import java.util.zip.Checksum;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.InflaterInputStream;
/**
* <p/> Provides functionality equivalent to the snappull script as well as a
@@ -88,10 +88,15 @@
private volatile boolean stop = false;
+ private boolean useInternal = false;
+
+ private boolean useExternal = false;
+
/**
* Disable the timer task for polling
*/
private AtomicBoolean pollDisabled = new AtomicBoolean(false);
+ private final HttpClient client = new HttpClient(new
MultiThreadedHttpConnectionManager());
public SnapPuller(NamedList initArgs, ReplicationHandler handler, SolrCore
sc) {
solrCore = sc;
@@ -102,6 +107,9 @@
this.replicationHandler = handler;
pollIntervalStr = (String) initArgs.get(POLL_INTERVAL);
pollInterval = readInterval(pollIntervalStr);
+ String compress = (String) initArgs.get(COMPRESSION);
+ useInternal = INTERNAL.equals(compress);
+ useExternal = EXTERNAL.equals(compress);
if (pollInterval != null && pollInterval > 0) {
startExecutorService();
} else {
@@ -133,21 +141,21 @@
* Gets the latest commit version and generation from the master
*/
@SuppressWarnings("unchecked")
- NamedList getLatestVersion(HttpClient client) throws IOException {
+ NamedList getLatestVersion() throws IOException {
PostMethod post = new PostMethod(masterUrl);
post.addParameter(COMMAND, CMD_INDEX_VERSION);
post.addParameter("wt", "javabin");
- return getNamedListResponse(client, post);
+ return getNamedListResponse(post);
}
- NamedList getCommandResponse(HttpClient client, String cmd) throws
IOException {
+ NamedList getCommandResponse(String cmd) throws IOException {
PostMethod post = new PostMethod(masterUrl);
post.addParameter(COMMAND, cmd);
post.addParameter("wt", "javabin");
- return getNamedListResponse(client, post);
+ return getNamedListResponse(post);
}
- private NamedList getNamedListResponse(HttpClient client, PostMethod method)
throws IOException {
+ private NamedList getNamedListResponse(PostMethod method) throws IOException
{
try {
int status = client.executeMethod(method);
if (status != HttpStatus.SC_OK) {
@@ -166,12 +174,12 @@
/**
* Fetches the list of files in a given index commit point
*/
- void fetchFileList(long version, HttpClient client) throws IOException {
+ void fetchFileList(long version) throws IOException {
PostMethod post = new PostMethod(masterUrl);
post.addParameter(COMMAND, CMD_GET_FILE_LIST);
post.addParameter(CMD_INDEX_VERSION, String.valueOf(version));
post.addParameter("wt", "javabin");
- NamedList nl = getNamedListResponse(client, post);
+ NamedList nl = getNamedListResponse(post);
List<Map<String, Object>> f = (List<Map<String, Object>>)
nl.get(CMD_GET_FILE_LIST);
if (f != null)
filesToDownload = Collections.synchronizedList(f);
@@ -191,15 +199,10 @@
*/
@SuppressWarnings("unchecked")
boolean fetchLatestIndex(SolrCore core) throws IOException {
- HttpClient client = null;
replicationStartTime = System.currentTimeMillis();
try {
- client = new HttpClient();
- // The closing is done in a different thread. So use multiThreaded conn
manager
- // else it prints out a warning
- client.setHttpConnectionManager(new
MultiThreadedHttpConnectionManager());
//get the current 'replicateable' index version in the master
- NamedList response = getLatestVersion(client);
+ NamedList response = getLatestVersion();
long latestVersion = (Long) response.get(CMD_INDEX_VERSION);
long latestGeneration = (Long) response.get(GENERATION);
if (latestVersion == 0L) {
@@ -224,7 +227,7 @@
LOG.info("Slave's version: " + commit.getVersion() + ", generation: " +
commit.getGeneration());
LOG.info("Starting replication process");
// get the list of files first
- fetchFileList(latestVersion, client);
+ fetchFileList(latestVersion);
LOG.info("Number of files in latest snapshot in master: " +
filesToDownload.size());
// use a synchronized list because the list is read by other threads (to
show details)
@@ -238,11 +241,11 @@
boolean successfulInstall = false;
try {
File indexDir = new File(core.getIndexDir());
- downloadIndexFiles(isSnapNeeded, tmpIndexDir, client, latestVersion);
+ downloadIndexFiles(isSnapNeeded, tmpIndexDir, latestVersion);
LOG.info("Total time taken for download : " +
((System.currentTimeMillis() - replicationStartTime) / 1000) + " secs");
Collection<Map<String, Object>> modifiedConfFiles =
getModifiedConfFiles(confFilesToDownload);
if (modifiedConfFiles != null && !modifiedConfFiles.isEmpty()) {
- downloadConfFiles(client, confFilesToDownload, latestVersion);
+ downloadConfFiles(confFilesToDownload, latestVersion);
if (isSnapNeeded) {
modifyIndexProps(tmpIndexDir.getName());
} else {
@@ -283,7 +286,6 @@
replicationStartTime = 0;
fileFetcher = null;
stop = false;
- client.getHttpConnectionManager().closeIdleConnections(0);
}
}
@@ -380,7 +382,7 @@
}.start();
}
- private void downloadConfFiles(HttpClient client, List<Map<String, Object>>
confFilesToDownload, long latestVersion) throws Exception {
+ private void downloadConfFiles(List<Map<String, Object>>
confFilesToDownload, long latestVersion) throws Exception {
LOG.info("Starting download of configuration files from master: " +
confFilesToDownload);
confFilesDownloaded = Collections.synchronizedList(new
ArrayList<Map<String, Object>>());
File tmpconfDir = new File(solrCore.getResourceLoader().getConfigDir(),
"conf." + getDateAsStr(new Date()));
@@ -390,7 +392,7 @@
"Failed to create temporary config folder: " +
tmpconfDir.getName());
}
for (Map<String, Object> file : confFilesToDownload) {
- fileFetcher = new FileFetcher(tmpconfDir, file, (String) file.get(NAME),
client, true, latestVersion);
+ fileFetcher = new FileFetcher(tmpconfDir, file, (String) file.get(NAME),
true, latestVersion);
currentFile = file;
fileFetcher.fetchFile();
confFilesDownloaded.add(new HashMap<String, Object>(file));
@@ -401,18 +403,15 @@
/**
* Download the index files. If a new index is needed, download all the
files.
*
- * @param downloadCompleteIndex is it a fresh index copy
- * @param snapDir the directory to which files need to be downloadeed
to
- * @param client the httpclient instance
- * @param latestVersion the version number
+ * @param downloadCompleteIndex is it a fresh index copy
+ * @param snapDir the directory to which files need to be
downloadeed to
+ * @param latestVersion the version number
*/
- private void downloadIndexFiles(boolean downloadCompleteIndex, File snapDir,
- HttpClient client, long latestVersion)
throws Exception {
+ private void downloadIndexFiles(boolean downloadCompleteIndex, File snapDir,
long latestVersion) throws Exception {
for (Map<String, Object> file : filesToDownload) {
File localIndexFile = new File(solrCore.getIndexDir(), (String)
file.get(NAME));
if (!localIndexFile.exists() || downloadCompleteIndex) {
- fileFetcher = new FileFetcher(snapDir, file, (String) file.get(NAME),
- client, false, latestVersion);
+ fileFetcher = new FileFetcher(snapDir, file, (String) file.get(NAME),
false, latestVersion);
currentFile = file;
fileFetcher.fetchFile();
filesDownloaded.add(new HashMap<String, Object>(file));
@@ -445,7 +444,7 @@
/**
* Copy a file by the File#renameTo() method. If it fails, it is considered
* a failure
- *
+ * <p/>
* Todo may be we should try a simple copy if it fails
*/
private boolean copyAFile(File snapDir, File indexDir, String fname,
List<String> copiedfiles) {
@@ -688,6 +687,7 @@
/**
* The class acts as a client for ReplicationHandler.FileStream.
* It understands the protocol of wt=filestream
+ *
* @see org.apache.solr.handler.ReplicationHandler.FileStream
*/
private class FileFetcher {
@@ -703,8 +703,6 @@
long bytesDownloaded = 0;
- HttpClient client;
-
FileChannel fileChannel;
byte[] buf = new byte[1024 * 1024];
@@ -724,11 +722,10 @@
private Long indexVersion;
FileFetcher(File dir, Map<String, Object> fileDetails, String saveAs,
- HttpClient client, boolean isConf, long latestVersion) throws
FileNotFoundException {
+ boolean isConf, long latestVersion) throws
FileNotFoundException {
this.snapDir = dir;
this.fileName = (String) fileDetails.get(NAME);
this.size = (Long) fileDetails.get(SIZE);
- this.client = client;
this.isConf = isConf;
this.saveAs = saveAs;
indexVersion = latestVersion;
@@ -759,18 +756,7 @@
}
//if there is an error continue. But continue from the point where
it got broken
} finally {
- // closing Inputstream and HTTP connection takes a long time,
- // so replication status shows as 'replicating' even though it is
aborted.
- new Thread() {
- public void run() {
- closeNoExp(is);
- try {
- if (post != null)
- post.releaseConnection();
- } catch (Exception e) {
- }
- }
- }.start();
+ closeNoExp(is);
}
}
} finally {
@@ -870,6 +856,9 @@
fileChannel.close();
} catch (Exception e) {/* noop */
}
+ try {
+ post.releaseConnection();
+ } catch (Exception e) {}
if (bytesDownloaded != size) {
//if the download is not complete then
//delete the file being downloaded
@@ -901,6 +890,12 @@
} else {
post.addParameter(FILE, fileName);
}
+ if (useInternal) {
+ post.addParameter(COMPRESSION, "true");
+ }
+ if (useExternal) {
+ post.setRequestHeader(new Header("Accept-Encoding", "gzip,deflate"));
+ }
//use checksum
if (this.includeChecksum)
post.addParameter(CHECKSUM, "true");
@@ -914,10 +909,42 @@
client.executeMethod(post);
InputStream is = post.getResponseBodyAsStream();
//wrap it using FastInputStream
+ if (useInternal) {
+ is = new InflaterInputStream(is);
+ } else if (useExternal) {
+ is = checkCompressed(post, is);
+ }
return new FastInputStream(is);
}
}
+ /*
+ * This is copied from CommonsHttpSolrServer
+ */
+ private InputStream checkCompressed(HttpMethod method, InputStream respBody)
throws IOException {
+ Header contentEncodingHeader =
method.getResponseHeader("Content-Encoding");
+ if (contentEncodingHeader != null) {
+ String contentEncoding = contentEncodingHeader.getValue();
+ if (contentEncoding.contains("gzip")) {
+ respBody = new GZIPInputStream(respBody);
+ } else if (contentEncoding.contains("deflate")) {
+ respBody = new InflaterInputStream(respBody);
+ }
+ } else {
+ Header contentTypeHeader = method.getResponseHeader("Content-Type");
+ if (contentTypeHeader != null) {
+ String contentType = contentTypeHeader.getValue();
+ if (contentType != null) {
+ if (contentType.startsWith("application/x-gzip-compressed")) {
+ respBody = new GZIPInputStream(respBody);
+ } else if (contentType.startsWith("application/x-deflate")) {
+ respBody = new InflaterInputStream(respBody);
+ }
+ }
+ }
+ }
+ return respBody;
+ }
static Integer readInterval(String interval) {
Pattern pattern = Pattern.compile(INTERVAL_PATTERN);
@@ -954,6 +981,7 @@
public void destroy() {
if (executorService != null) executorService.shutdown();
+ client.getHttpConnectionManager().closeIdleConnections(0);
}
String getMasterUrl() {
Modified: lucene/solr/trunk/src/webapp/web/admin/replication/index.jsp
URL:
http://svn.apache.org/viewvc/lucene/solr/trunk/src/webapp/web/admin/replication/index.jsp?rev=720502&r1=720501&r2=720502&view=diff
==============================================================================
--- lucene/solr/trunk/src/webapp/web/admin/replication/index.jsp (original)
+++ lucene/solr/trunk/src/webapp/web/admin/replication/index.jsp Tue Nov 25
06:19:14 2008
@@ -311,21 +311,12 @@
String abortParam = request.getParameter("abort");
if (replicateParam != null)
if (replicateParam.equals("now")) {
- new Thread() {
- public void run() {
- executeCommand("snappull", solrcore, rh);
- }
- }.start();
+ executeCommand("snappull", solrcore, rh);
}
if (abortParam != null)
if (abortParam.equals("stop")) {
- new Thread() {
- public void run() {
- executeCommand("abortsnappull", solrcore, rh);
- }
- }.start();
+ executeCommand("abortsnappull", solrcore, rh);
}
-
%>
</td>