Author: shalin
Date: Tue Nov 25 06:19:14 2008
New Revision: 720502

URL: http://svn.apache.org/viewvc?rev=720502&view=rev
Log:
SOLR-829 -- Allow slaves to request compressed files from master during 
replication

Modified:
    lucene/solr/trunk/CHANGES.txt
    lucene/solr/trunk/example/solr/conf/solrconfig.xml
    lucene/solr/trunk/src/java/org/apache/solr/handler/ReplicationHandler.java
    lucene/solr/trunk/src/java/org/apache/solr/handler/SnapPuller.java
    lucene/solr/trunk/src/webapp/web/admin/replication/index.jsp

Modified: lucene/solr/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/lucene/solr/trunk/CHANGES.txt?rev=720502&r1=720501&r2=720502&view=diff
==============================================================================
--- lucene/solr/trunk/CHANGES.txt (original)
+++ lucene/solr/trunk/CHANGES.txt Tue Nov 25 06:19:14 2008
@@ -86,7 +86,10 @@
     before Tokenizer/TokenFilters. (koji)
     
 16. SOLR-868: Adding solrjs as a contrib package: contrib/javascript.
-    (Matthias Epheser via ryan)    
+    (Matthias Epheser via ryan)
+
+17. SOLR-829: Allow slaves to request compressed files from master during 
replication
+    (Simon Collins, Noble Paul, Akshay Ukey via shalin)
     
 
 Optimizations

Modified: lucene/solr/trunk/example/solr/conf/solrconfig.xml
URL: 
http://svn.apache.org/viewvc/lucene/solr/trunk/example/solr/conf/solrconfig.xml?rev=720502&r1=720501&r2=720502&view=diff
==============================================================================
--- lucene/solr/trunk/example/solr/conf/solrconfig.xml (original)
+++ lucene/solr/trunk/example/solr/conf/solrconfig.xml Tue Nov 25 06:19:14 2008
@@ -403,6 +403,7 @@
      </lst>
   </requestHandler>
 
+<!-- Please refer to http://wiki.apache.org/solr/SolrReplication for details 
on configuring replication -->
 <!--Master config-->
 <!--
 <requestHandler name="/replication" class="solr.ReplicationHandler" >
@@ -414,9 +415,9 @@
 -->
 <!-- Slave config-->
 <!--
-<requestHandler name="/replication" class="solr.ReplicationHandler" >
+<requestHandler name="/replication" class="solr.ReplicationHandler">
     <lst name="slave">
-        <str name="masterUrl">http://localhost:8983/solr/replication</str>  
+        <str name="masterUrl">http://localhost:8983/solr/replication</str>
         <str name="pollInterval">00:00:60</str>  
      </lst>
 </requestHandler>

Modified: 
lucene/solr/trunk/src/java/org/apache/solr/handler/ReplicationHandler.java
URL: 
http://svn.apache.org/viewvc/lucene/solr/trunk/src/java/org/apache/solr/handler/ReplicationHandler.java?rev=720502&r1=720501&r2=720502&view=diff
==============================================================================
--- lucene/solr/trunk/src/java/org/apache/solr/handler/ReplicationHandler.java 
(original)
+++ lucene/solr/trunk/src/java/org/apache/solr/handler/ReplicationHandler.java 
Tue Nov 25 06:19:14 2008
@@ -45,6 +45,7 @@
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.zip.Adler32;
 import java.util.zip.Checksum;
+import java.util.zip.DeflaterOutputStream;
 
 /**
  * <p> A Handler which provides a REST API for replication and serves 
replication requests from Slaves.
@@ -126,7 +127,12 @@
     } else if (command.equals(CMD_SNAP_SHOOT)) {
       doSnapShoot(rsp);
     } else if (command.equals(CMD_SNAP_PULL)) {
-      doSnapPull();
+      new Thread() {
+        public void run() {
+          doSnapPull();
+        }
+      }.start();
+      rsp.add("status", "OK");
     } else if (command.equals(CMD_DISABLE_POLL)) {
       if (snapPuller != null)
         snapPuller.disablePoll();
@@ -501,16 +507,11 @@
         closeNoExp(inFile);
       }
 
-      HttpClient client = null;
       try {
-        client = new HttpClient();
-        NamedList nl = snapPuller.getCommandResponse(client, CMD_DETAILS);
+        NamedList nl = snapPuller.getCommandResponse(CMD_DETAILS);
         details.add("masterDetails", nl.get(CMD_DETAILS));
       } catch (IOException e) {
         LOG.warn("Exception while invoking a 'details' method on master ", e);
-      } finally {
-        if (client != null)
-          client.getHttpConnectionManager().closeIdleConnections(0);
       }
       details.add(MASTER_URL, snapPuller.getMasterUrl());
       if (snapPuller.getPollInterval() != null) {
@@ -776,15 +777,20 @@
       delPolicy = core.getDeletionPolicy();
     }
 
-    public void write(OutputStream out) {
-      fos = new FastOutputStream(out);
+    public void write(OutputStream out) throws IOException {
       String fileName = params.get(FILE);
       String cfileName = params.get(CONF_FILE_SHORT);
       String sOffset = params.get(OFFSET);
       String sLen = params.get(LEN);
+      String compress = params.get(COMPRESSION);
       String sChecksum = params.get(CHECKSUM);
       String sindexVersion = params.get(CMD_INDEX_VERSION);
       if (sindexVersion != null) indexVersion = Long.parseLong(sindexVersion);
+      if (Boolean.parseBoolean(compress))  {
+        fos = new FastOutputStream(new DeflaterOutputStream(out));
+      } else  {
+        fos = new FastOutputStream(out);
+      }
       FileInputStream inputStream = null;
       int packetsWritten = 0;
       try {
@@ -918,4 +924,10 @@
 
   public static final String RESERVE = "commitReserveDuration";
 
+  public static final String COMPRESSION = "compression";
+
+  public static final String EXTERNAL = "external";
+
+  public static final String INTERNAL = "internal";
+
 }

Modified: lucene/solr/trunk/src/java/org/apache/solr/handler/SnapPuller.java
URL: 
http://svn.apache.org/viewvc/lucene/solr/trunk/src/java/org/apache/solr/handler/SnapPuller.java?rev=720502&r1=720501&r2=720502&view=diff
==============================================================================
--- lucene/solr/trunk/src/java/org/apache/solr/handler/SnapPuller.java 
(original)
+++ lucene/solr/trunk/src/java/org/apache/solr/handler/SnapPuller.java Tue Nov 
25 06:19:14 2008
@@ -16,9 +16,7 @@
  */
 package org.apache.solr.handler;
 
-import org.apache.commons.httpclient.HttpClient;
-import org.apache.commons.httpclient.HttpStatus;
-import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
+import org.apache.commons.httpclient.*;
 import org.apache.commons.httpclient.methods.PostMethod;
 import org.apache.lucene.index.IndexCommit;
 import org.apache.solr.common.SolrException;
@@ -46,6 +44,8 @@
 import java.util.regex.Pattern;
 import java.util.zip.Adler32;
 import java.util.zip.Checksum;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.InflaterInputStream;
 
 /**
  * <p/> Provides functionality equivalent to the snappull script as well as a
@@ -88,10 +88,15 @@
 
   private volatile boolean stop = false;
 
+  private boolean useInternal = false;
+
+  private boolean useExternal = false;
+
   /**
    * Disable the timer task for polling
    */
   private AtomicBoolean pollDisabled = new AtomicBoolean(false);
+  private final HttpClient client = new HttpClient(new 
MultiThreadedHttpConnectionManager());
 
   public SnapPuller(NamedList initArgs, ReplicationHandler handler, SolrCore 
sc) {
     solrCore = sc;
@@ -102,6 +107,9 @@
     this.replicationHandler = handler;
     pollIntervalStr = (String) initArgs.get(POLL_INTERVAL);
     pollInterval = readInterval(pollIntervalStr);
+    String compress = (String) initArgs.get(COMPRESSION);
+    useInternal = INTERNAL.equals(compress);
+    useExternal = EXTERNAL.equals(compress);
     if (pollInterval != null && pollInterval > 0) {
       startExecutorService();
     } else {
@@ -133,21 +141,21 @@
    * Gets the latest commit version and generation from the master
    */
   @SuppressWarnings("unchecked")
-  NamedList getLatestVersion(HttpClient client) throws IOException {
+  NamedList getLatestVersion() throws IOException {
     PostMethod post = new PostMethod(masterUrl);
     post.addParameter(COMMAND, CMD_INDEX_VERSION);
     post.addParameter("wt", "javabin");
-    return getNamedListResponse(client, post);
+    return getNamedListResponse(post);
   }
 
-  NamedList getCommandResponse(HttpClient client, String cmd) throws 
IOException {
+  NamedList getCommandResponse(String cmd) throws IOException {
     PostMethod post = new PostMethod(masterUrl);
     post.addParameter(COMMAND, cmd);
     post.addParameter("wt", "javabin");
-    return getNamedListResponse(client, post);
+    return getNamedListResponse(post);
   }
 
-  private NamedList getNamedListResponse(HttpClient client, PostMethod method) 
throws IOException {
+  private NamedList getNamedListResponse(PostMethod method) throws IOException 
{
     try {
       int status = client.executeMethod(method);
       if (status != HttpStatus.SC_OK) {
@@ -166,12 +174,12 @@
   /**
    * Fetches the list of files in a given index commit point
    */
-  void fetchFileList(long version, HttpClient client) throws IOException {
+  void fetchFileList(long version) throws IOException {
     PostMethod post = new PostMethod(masterUrl);
     post.addParameter(COMMAND, CMD_GET_FILE_LIST);
     post.addParameter(CMD_INDEX_VERSION, String.valueOf(version));
     post.addParameter("wt", "javabin");
-    NamedList nl = getNamedListResponse(client, post);
+    NamedList nl = getNamedListResponse(post);
     List<Map<String, Object>> f = (List<Map<String, Object>>) 
nl.get(CMD_GET_FILE_LIST);
     if (f != null)
       filesToDownload = Collections.synchronizedList(f);
@@ -191,15 +199,10 @@
    */
   @SuppressWarnings("unchecked")
   boolean fetchLatestIndex(SolrCore core) throws IOException {
-    HttpClient client = null;
     replicationStartTime = System.currentTimeMillis();
     try {
-      client = new HttpClient();
-      // The closing is done in a different thread. So use multiThreaded conn 
manager
-      // else it prints out a warning
-      client.setHttpConnectionManager(new 
MultiThreadedHttpConnectionManager());
       //get the current 'replicateable' index version in the master
-      NamedList response = getLatestVersion(client);
+      NamedList response = getLatestVersion();
       long latestVersion = (Long) response.get(CMD_INDEX_VERSION);
       long latestGeneration = (Long) response.get(GENERATION);
       if (latestVersion == 0L) {
@@ -224,7 +227,7 @@
       LOG.info("Slave's version: " + commit.getVersion() + ", generation: " + 
commit.getGeneration());
       LOG.info("Starting replication process");
       // get the list of files first
-      fetchFileList(latestVersion, client);
+      fetchFileList(latestVersion);
       LOG.info("Number of files in latest snapshot in master: " + 
filesToDownload.size());
 
       // use a synchronized list because the list is read by other threads (to 
show details)
@@ -238,11 +241,11 @@
       boolean successfulInstall = false;
       try {
         File indexDir = new File(core.getIndexDir());
-        downloadIndexFiles(isSnapNeeded, tmpIndexDir, client, latestVersion);
+        downloadIndexFiles(isSnapNeeded, tmpIndexDir, latestVersion);
         LOG.info("Total time taken for download : " + 
((System.currentTimeMillis() - replicationStartTime) / 1000) + " secs");
         Collection<Map<String, Object>> modifiedConfFiles = 
getModifiedConfFiles(confFilesToDownload);
         if (modifiedConfFiles != null && !modifiedConfFiles.isEmpty()) {
-          downloadConfFiles(client, confFilesToDownload, latestVersion);
+          downloadConfFiles(confFilesToDownload, latestVersion);
           if (isSnapNeeded) {
             modifyIndexProps(tmpIndexDir.getName());
           } else {
@@ -283,7 +286,6 @@
       replicationStartTime = 0;
       fileFetcher = null;
       stop = false;
-      client.getHttpConnectionManager().closeIdleConnections(0);
     }
   }
 
@@ -380,7 +382,7 @@
     }.start();
   }
 
-  private void downloadConfFiles(HttpClient client, List<Map<String, Object>> 
confFilesToDownload, long latestVersion) throws Exception {
+  private void downloadConfFiles(List<Map<String, Object>> 
confFilesToDownload, long latestVersion) throws Exception {
     LOG.info("Starting download of configuration files from master: " + 
confFilesToDownload);
     confFilesDownloaded = Collections.synchronizedList(new 
ArrayList<Map<String, Object>>());
     File tmpconfDir = new File(solrCore.getResourceLoader().getConfigDir(), 
"conf." + getDateAsStr(new Date()));
@@ -390,7 +392,7 @@
               "Failed to create temporary config folder: " + 
tmpconfDir.getName());
     }
     for (Map<String, Object> file : confFilesToDownload) {
-      fileFetcher = new FileFetcher(tmpconfDir, file, (String) file.get(NAME), 
client, true, latestVersion);
+      fileFetcher = new FileFetcher(tmpconfDir, file, (String) file.get(NAME), 
true, latestVersion);
       currentFile = file;
       fileFetcher.fetchFile();
       confFilesDownloaded.add(new HashMap<String, Object>(file));
@@ -401,18 +403,15 @@
   /**
    * Download the index files. If a new index is needed, download all the 
files.
    *
-   * @param downloadCompleteIndex    is it a fresh index copy
-   * @param snapDir       the directory to which files need to be downloadeed 
to
-   * @param client        the httpclient instance
-   * @param latestVersion the version number
+   * @param downloadCompleteIndex is it a fresh index copy
+   * @param snapDir               the directory to which files need to be 
downloadeed to
+   * @param latestVersion         the version number
    */
-  private void downloadIndexFiles(boolean downloadCompleteIndex, File snapDir,
-                                  HttpClient client, long latestVersion) 
throws Exception {
+  private void downloadIndexFiles(boolean downloadCompleteIndex, File snapDir, 
long latestVersion) throws Exception {
     for (Map<String, Object> file : filesToDownload) {
       File localIndexFile = new File(solrCore.getIndexDir(), (String) 
file.get(NAME));
       if (!localIndexFile.exists() || downloadCompleteIndex) {
-        fileFetcher = new FileFetcher(snapDir, file, (String) file.get(NAME),
-                client, false, latestVersion);
+        fileFetcher = new FileFetcher(snapDir, file, (String) file.get(NAME), 
false, latestVersion);
         currentFile = file;
         fileFetcher.fetchFile();
         filesDownloaded.add(new HashMap<String, Object>(file));
@@ -445,7 +444,7 @@
   /**
    * Copy a file by the File#renameTo() method. If it fails, it is considered
    * a failure
-   *
+   * <p/>
    * Todo may be we should try a simple copy if it fails
    */
   private boolean copyAFile(File snapDir, File indexDir, String fname, 
List<String> copiedfiles) {
@@ -688,6 +687,7 @@
   /**
    * The class acts as a client for ReplicationHandler.FileStream.
    * It understands the protocol of wt=filestream
+   *
    * @see org.apache.solr.handler.ReplicationHandler.FileStream
    */
   private class FileFetcher {
@@ -703,8 +703,6 @@
 
     long bytesDownloaded = 0;
 
-    HttpClient client;
-
     FileChannel fileChannel;
 
     byte[] buf = new byte[1024 * 1024];
@@ -724,11 +722,10 @@
     private Long indexVersion;
 
     FileFetcher(File dir, Map<String, Object> fileDetails, String saveAs,
-                HttpClient client, boolean isConf, long latestVersion) throws 
FileNotFoundException {
+                boolean isConf, long latestVersion) throws 
FileNotFoundException {
       this.snapDir = dir;
       this.fileName = (String) fileDetails.get(NAME);
       this.size = (Long) fileDetails.get(SIZE);
-      this.client = client;
       this.isConf = isConf;
       this.saveAs = saveAs;
       indexVersion = latestVersion;
@@ -759,18 +756,7 @@
             }
             //if there is an error continue. But continue from the point where 
it got broken
           } finally {
-            // closing Inputstream and HTTP connection takes a long time,
-            // so replication status shows as 'replicating' even though it is 
aborted.
-            new Thread() {
-              public void run() {
-                closeNoExp(is);
-                try {
-                  if (post != null)
-                    post.releaseConnection();
-                } catch (Exception e) {
-                }
-              }
-            }.start();
+            closeNoExp(is);
           }
         }
       } finally {
@@ -870,6 +856,9 @@
         fileChannel.close();
       } catch (Exception e) {/* noop */
       }
+      try {
+        post.releaseConnection();
+      } catch (Exception e) {}
       if (bytesDownloaded != size) {
         //if the download is not complete then
         //delete the file being downloaded
@@ -901,6 +890,12 @@
       } else {
         post.addParameter(FILE, fileName);
       }
+      if (useInternal) {
+        post.addParameter(COMPRESSION, "true");
+      }
+      if (useExternal) {
+        post.setRequestHeader(new Header("Accept-Encoding", "gzip,deflate"));
+      }
       //use checksum
       if (this.includeChecksum)
         post.addParameter(CHECKSUM, "true");
@@ -914,10 +909,42 @@
       client.executeMethod(post);
       InputStream is = post.getResponseBodyAsStream();
       //wrap it using FastInputStream
+      if (useInternal) {
+        is = new InflaterInputStream(is);
+      } else if (useExternal) {
+        is = checkCompressed(post, is);
+      }
       return new FastInputStream(is);
     }
   }
 
+  /*
+   * This is copied from CommonsHttpSolrServer
+   */
+  private InputStream checkCompressed(HttpMethod method, InputStream respBody) 
throws IOException {
+    Header contentEncodingHeader = 
method.getResponseHeader("Content-Encoding");
+    if (contentEncodingHeader != null) {
+      String contentEncoding = contentEncodingHeader.getValue();
+      if (contentEncoding.contains("gzip")) {
+        respBody = new GZIPInputStream(respBody);
+      } else if (contentEncoding.contains("deflate")) {
+        respBody = new InflaterInputStream(respBody);
+      }
+    } else {
+      Header contentTypeHeader = method.getResponseHeader("Content-Type");
+      if (contentTypeHeader != null) {
+        String contentType = contentTypeHeader.getValue();
+        if (contentType != null) {
+          if (contentType.startsWith("application/x-gzip-compressed")) {
+            respBody = new GZIPInputStream(respBody);
+          } else if (contentType.startsWith("application/x-deflate")) {
+            respBody = new InflaterInputStream(respBody);
+          }
+        }
+      }
+    }
+    return respBody;
+  }
 
   static Integer readInterval(String interval) {
     Pattern pattern = Pattern.compile(INTERVAL_PATTERN);
@@ -954,6 +981,7 @@
 
   public void destroy() {
     if (executorService != null) executorService.shutdown();
+    client.getHttpConnectionManager().closeIdleConnections(0);
   }
 
   String getMasterUrl() {

Modified: lucene/solr/trunk/src/webapp/web/admin/replication/index.jsp
URL: 
http://svn.apache.org/viewvc/lucene/solr/trunk/src/webapp/web/admin/replication/index.jsp?rev=720502&r1=720501&r2=720502&view=diff
==============================================================================
--- lucene/solr/trunk/src/webapp/web/admin/replication/index.jsp (original)
+++ lucene/solr/trunk/src/webapp/web/admin/replication/index.jsp Tue Nov 25 
06:19:14 2008
@@ -311,21 +311,12 @@
       String abortParam = request.getParameter("abort");
       if (replicateParam != null)
         if (replicateParam.equals("now")) {
-          new Thread() {
-            public void run() {
-              executeCommand("snappull", solrcore, rh);
-            }
-          }.start();
+          executeCommand("snappull", solrcore, rh);
         }
       if (abortParam != null)
         if (abortParam.equals("stop")) {
-          new Thread() {
-            public void run() {
-              executeCommand("abortsnappull", solrcore, rh);
-            }
-          }.start();
+          executeCommand("abortsnappull", solrcore, rh);
         }
-
     %>
   </td>
 


Reply via email to