Author: yonik
Date: Fri Oct 9 21:52:42 2009
New Revision: 823711
URL: http://svn.apache.org/viewvc?rev=823711&view=rev
Log:
SOLR-1475: Java-based replication doesn't properly reserve its commit point
during backups
Modified:
lucene/solr/trunk/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java
lucene/solr/trunk/src/java/org/apache/solr/handler/ReplicationHandler.java
lucene/solr/trunk/src/java/org/apache/solr/handler/SnapShooter.java
lucene/solr/trunk/src/test/org/apache/solr/handler/TestReplicationHandler.java
Modified:
lucene/solr/trunk/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java
URL:
http://svn.apache.org/viewvc/lucene/solr/trunk/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java?rev=823711&r1=823710&r2=823711&view=diff
==============================================================================
---
lucene/solr/trunk/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java
(original)
+++
lucene/solr/trunk/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java
Fri Oct 9 21:52:42 2009
@@ -110,7 +110,7 @@
}
/** Release a previously saved commit point */
- public synchronized void releaseCommmitPoint(Long indexCommitVersion) {
+ public synchronized void releaseCommitPoint(Long indexCommitVersion) {
AtomicInteger reserveCount = savedCommits.get(indexCommitVersion);
if (reserveCount == null) return;// this should not happen
if (reserveCount.decrementAndGet() <= 0) {
Modified:
lucene/solr/trunk/src/java/org/apache/solr/handler/ReplicationHandler.java
URL:
http://svn.apache.org/viewvc/lucene/solr/trunk/src/java/org/apache/solr/handler/ReplicationHandler.java?rev=823711&r1=823710&r2=823711&view=diff
==============================================================================
--- lucene/solr/trunk/src/java/org/apache/solr/handler/ReplicationHandler.java
(original)
+++ lucene/solr/trunk/src/java/org/apache/solr/handler/ReplicationHandler.java
Fri Oct 9 21:52:42 2009
@@ -65,7 +65,7 @@
*/
public class ReplicationHandler extends RequestHandlerBase implements
SolrCoreAware {
private static final Logger LOG =
LoggerFactory.getLogger(ReplicationHandler.class.getName());
- private SolrCore core;
+ SolrCore core;
private SnapPuller snapPuller;
@@ -276,12 +276,15 @@
private void doSnapShoot(SolrParams params, SolrQueryResponse rsp,
SolrQueryRequest req) {
try {
- IndexCommit indexCommit = core.getDeletionPolicy().getLatestCommit();
+ IndexDeletionPolicyWrapper delPolicy = core.getDeletionPolicy();
+ IndexCommit indexCommit = delPolicy.getLatestCommit();
+ // race?
+ delPolicy.setReserveDuration(indexCommit.getVersion(),
reserveCommitDuration);
if(indexCommit == null) {
indexCommit = req.getSearcher().getReader().getIndexCommit();
}
if (indexCommit != null) {
- new SnapShooter(core,
params.get("location")).createSnapAsync(indexCommit.getFileNames(), this);
+ new SnapShooter(core,
params.get("location")).createSnapAsync(indexCommit, this);
}
} catch (Exception e) {
LOG.warn("Exception during creating a snapshot", e);
@@ -687,10 +690,13 @@
details.add("master", master);
if (isSlave && showSlaveDetails)
details.add("slave", slave);
- NamedList snapshotStats = snapShootDetails;
- if (snapshotStats != null)
- details.add(CMD_BACKUP, snapshotStats);
+
}
+
+ NamedList snapshotStats = snapShootDetails;
+ if (snapshotStats != null)
+ details.add(CMD_BACKUP, snapshotStats);
+
return details;
}
@@ -915,13 +921,13 @@
indexCommitPoint = core.getDeletionPolicy().getLatestCommit();
core.getDeletionPolicy().saveCommitPoint(indexCommitPoint.getVersion());
if(oldCommitPoint != null){
-
core.getDeletionPolicy().releaseCommmitPoint(oldCommitPoint.getVersion());
+
core.getDeletionPolicy().releaseCommitPoint(oldCommitPoint.getVersion());
}
}
if (snapshoot) {
try {
SnapShooter snapShooter = new SnapShooter(core, null);
-
snapShooter.createSnapAsync(core.getDeletionPolicy().getLatestCommit().getFileNames(),
ReplicationHandler.this);
+
snapShooter.createSnapAsync(core.getDeletionPolicy().getLatestCommit(),
ReplicationHandler.this);
} catch (Exception e) {
LOG.error("Exception while snapshooting", e);
}
Modified: lucene/solr/trunk/src/java/org/apache/solr/handler/SnapShooter.java
URL:
http://svn.apache.org/viewvc/lucene/solr/trunk/src/java/org/apache/solr/handler/SnapShooter.java?rev=823711&r1=823710&r2=823711&view=diff
==============================================================================
--- lucene/solr/trunk/src/java/org/apache/solr/handler/SnapShooter.java
(original)
+++ lucene/solr/trunk/src/java/org/apache/solr/handler/SnapShooter.java Fri Oct
9 21:52:42 2009
@@ -16,20 +16,24 @@
*/
package org.apache.solr.handler;
-import org.apache.commons.io.FileUtils;
-import org.apache.lucene.store.Lock;
-import org.apache.lucene.store.SimpleFSLockFactory;
-import org.apache.solr.core.SolrCore;
-import org.apache.solr.common.util.NamedList;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Date;
-import java.util.ArrayList;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.store.Lock;
+import org.apache.lucene.store.SimpleFSLockFactory;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.IndexDeletionPolicyWrapper;
+import org.apache.solr.core.SolrCore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* <p/> Provides functionality equivalent to the snapshooter script </p>
@@ -42,7 +46,7 @@
private String snapDir = null;
private SolrCore solrCore;
private SimpleFSLockFactory lockFactory;
-
+
public SnapShooter(SolrCore core, String location) throws IOException {
solrCore = core;
if (location == null) snapDir = core.getDataDir();
@@ -55,15 +59,17 @@
lockFactory = new SimpleFSLockFactory(snapDir);
}
- void createSnapAsync(final Collection<String> files, final
ReplicationHandler replicationHandler) {
+ void createSnapAsync(final IndexCommit indexCommit, final ReplicationHandler
replicationHandler) {
+
replicationHandler.core.getDeletionPolicy().saveCommitPoint(indexCommit.getVersion());
+
new Thread() {
public void run() {
- createSnapshot(files, replicationHandler);
+ createSnapshot(indexCommit, replicationHandler);
}
}.start();
}
- void createSnapshot(Collection<String> files, ReplicationHandler
replicationHandler) {
+ void createSnapshot(final IndexCommit indexCommit, ReplicationHandler
replicationHandler) {
NamedList details = new NamedList();
details.add("startTime", new Date().toString());
File snapShotDir = null;
@@ -79,9 +85,10 @@
LOG.warn("Unable to create snapshot directory: " +
snapShotDir.getAbsolutePath());
return;
}
- for (String indexFile : files) {
- FileUtils.copyFileToDirectory(new File(solrCore.getIndexDir(),
indexFile), snapShotDir, true);
- }
+ Collection<String> files = indexCommit.getFileNames();
+ FileCopier fileCopier = new FileCopier(solrCore.getDeletionPolicy(),
indexCommit);
+ fileCopier.copyFiles(files, snapShotDir);
+
details.add("fileCount", files.size());
details.add("status", "success");
details.add("snapshotCompletedAt", new Date().toString());
@@ -90,7 +97,8 @@
LOG.error("Exception while creating snapshot", e);
details.add("snapShootException", e.getMessage());
} finally {
- replicationHandler.snapShootDetails = details;
+
replicationHandler.core.getDeletionPolicy().releaseCommitPoint(indexCommit.getVersion());
+ replicationHandler.snapShootDetails = details;
if (lock != null) {
try {
lock.release();
@@ -103,4 +111,89 @@
public static final String SNAP_DIR = "snapDir";
public static final String DATE_FMT = "yyyyMMddhhmmss";
+
+
+ private class FileCopier {
+ private static final int DEFAULT_BUFFER_SIZE = 32768;
+ private byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
+ private IndexCommit indexCommit;
+ private IndexDeletionPolicyWrapper delPolicy;
+ private int reserveTime;
+
+ public FileCopier(IndexDeletionPolicyWrapper delPolicy, IndexCommit
commit) {
+ this.delPolicy = delPolicy;
+ this.indexCommit = commit;
+ this.reserveTime = reserveTime;
+ }
+
+ public void copyFiles(Collection<String> files, File destDir) throws
IOException {
+ for (String indexFile : files) {
+ File source = new File(solrCore.getIndexDir(), indexFile);
+ copyFile(source, new File(destDir, source.getName()), true);
+ }
+ }
+
+ public void copyFile(File source, File destination, boolean
preserveFileDate)
+ throws IOException {
+ // check source exists
+ if (!source.exists()) {
+ String message = "File " + source + " does not exist";
+ throw new FileNotFoundException(message);
+ }
+
+ // does destinations directory exist ?
+ if (destination.getParentFile() != null
+ && !destination.getParentFile().exists()) {
+ destination.getParentFile().mkdirs();
+ }
+
+ // make sure we can write to destination
+ if (destination.exists() && !destination.canWrite()) {
+ String message = "Unable to open file " + destination + " for
writing.";
+ throw new IOException(message);
+ }
+
+ FileInputStream input = null;
+ FileOutputStream output = null;
+ try {
+ input = new FileInputStream(source);
+ output = new FileOutputStream(destination);
+
+ int count = 0;
+ int n = 0;
+ int rcnt = 0;
+ while (-1 != (n = input.read(buffer))) {
+ output.write(buffer, 0, n);
+ count += n;
+ rcnt++;
+ /***
+ // reserve every 4.6875 MB
+ if (rcnt == 150) {
+ rcnt = 0;
+ delPolicy.setReserveDuration(indexCommit.getVersion(),
reserveTime);
+ }
+ ***/
+ }
+ } finally {
+ try {
+ IOUtils.closeQuietly(input);
+ } finally {
+ IOUtils.closeQuietly(output);
+ }
+ }
+
+ if (source.length() != destination.length()) {
+ String message = "Failed to copy full contents from " + source + " to "
+ + destination;
+ throw new IOException(message);
+ }
+
+ if (preserveFileDate) {
+ // file copy should preserve file date
+ destination.setLastModified(source.lastModified());
+ }
+ }
+ }
+
+
}
Modified:
lucene/solr/trunk/src/test/org/apache/solr/handler/TestReplicationHandler.java
URL:
http://svn.apache.org/viewvc/lucene/solr/trunk/src/test/org/apache/solr/handler/TestReplicationHandler.java?rev=823711&r1=823710&r2=823711&view=diff
==============================================================================
---
lucene/solr/trunk/src/test/org/apache/solr/handler/TestReplicationHandler.java
(original)
+++
lucene/solr/trunk/src/test/org/apache/solr/handler/TestReplicationHandler.java
Fri Oct 9 21:52:42 2009
@@ -16,7 +16,11 @@
*/
package org.apache.solr.handler;
-import junit.framework.TestCase;
+import org.apache.commons.io.IOUtils;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.store.SimpleFSDirectory;
import org.apache.solr.TestDistributedSearch;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
@@ -451,7 +455,106 @@
slaveQueryResult = (SolrDocumentList) slaveQueryRsp.get("response");
assertEquals(0, slaveQueryResult.getNumFound());
}
+
+ public void testBackup() throws Exception {
+ masterJetty.stop();
+ copyFile(new File(CONF_DIR + "solrconfig-master1.xml"), new
File(master.getConfDir(), "solrconfig.xml"));
+
+ masterJetty = createJetty(master);
+ masterClient = createNewSolrServer(masterJetty.getLocalPort());
+
+
+ //add 500 docs to master
+ for (int i = 0; i < 500; i++)
+ index(masterClient, "id", i, "name", "name = " + i);
+
+ masterClient.commit();
+
+ class BackupThread extends Thread {
+ volatile String fail = null;
+ public void run() {
+ String masterUrl = "http://localhost:" + masterJetty.getLocalPort() +
"/solr/replication?command=" + ReplicationHandler.CMD_BACKUP;
+ URL url;
+ InputStream stream = null;
+ try {
+ url = new URL(masterUrl);
+ stream = url.openStream();
+ stream.close();
+ } catch (Exception e) {
+ fail = e.getMessage();
+ } finally {
+ IOUtils.closeQuietly(stream);
+ }
+
+ };
+ };
+ BackupThread backupThread = new BackupThread();
+ backupThread.start();
+
+ File dataDir = new File(master.getDataDir());
+ class CheckStatus extends Thread {
+ volatile String fail = null;
+ volatile String response = null;
+ volatile boolean success = false;
+ public void run() {
+ String masterUrl = "http://localhost:" + masterJetty.getLocalPort() +
"/solr/replication?command=" + ReplicationHandler.CMD_DETAILS;
+ URL url;
+ InputStream stream = null;
+ try {
+ url = new URL(masterUrl);
+ stream = url.openStream();
+ response = IOUtils.toString(stream);
+ if(response.contains("<str name=\"status\">success</str>")) {
+ success = true;
+ }
+ stream.close();
+ } catch (Exception e) {
+ fail = e.getMessage();
+ } finally {
+ IOUtils.closeQuietly(stream);
+ }
+
+ };
+ };
+ int waitCnt = 0;
+ CheckStatus checkStatus = new CheckStatus();
+ while(true) {
+ checkStatus.run();
+ if(checkStatus.fail != null) {
+ fail(checkStatus.fail);
+ }
+ if(checkStatus.success) {
+ break;
+ }
+ Thread.sleep(200);
+ if(waitCnt == 10) {
+ fail("Backup success not detected:" + checkStatus.response);
+ }
+ waitCnt++;
+ }
+
+ if(backupThread.fail != null) {
+ fail(backupThread.fail);
+ }
+
+ File[] files = dataDir.listFiles(new FilenameFilter() {
+
+ public boolean accept(File dir, String name) {
+ if(name.startsWith("snapshot")) {
+ return true;
+ }
+ return false;
+ }
+ });
+ assertEquals(1, files.length);
+ File snapDir = files[0];
+
+ IndexSearcher searcher = new IndexSearcher(new
SimpleFSDirectory(snapDir.getAbsoluteFile(), null), true);
+ TopDocs hits = searcher.search(new MatchAllDocsQuery(), 1);
+
+ assertEquals(500, hits.totalHits);
+ }
/* character copy of file using UTF-8 */
void copyFile(File src, File dst) throws IOException {