Author: yonik
Date: Fri Oct 9 18:36:28 2009
New Revision: 823654
URL: http://svn.apache.org/viewvc?rev=823654&view=rev
Log:
SOLR-1458: use saved commit points when replciating optimized indexes
Modified:
lucene/solr/trunk/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java
lucene/solr/trunk/src/java/org/apache/solr/handler/ReplicationHandler.java
Modified:
lucene/solr/trunk/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java
URL:
http://svn.apache.org/viewvc/lucene/solr/trunk/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java?rev=823654&r1=823653&r2=823654&view=diff
==============================================================================
---
lucene/solr/trunk/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java
(original)
+++
lucene/solr/trunk/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java
Fri Oct 9 18:36:28 2009
@@ -23,6 +23,7 @@
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* A wrapper for an IndexDeletionPolicy instance.
@@ -39,6 +40,7 @@
private volatile Map<Long, IndexCommit> solrVersionVsCommits = new
ConcurrentHashMap<Long, IndexCommit>();
private final Map<Long, Long> reserves = new ConcurrentHashMap<Long,Long>();
private volatile IndexCommit latestCommit;
+ private final ConcurrentHashMap<Long, AtomicInteger> savedCommits = new
ConcurrentHashMap<Long, AtomicInteger>();
public IndexDeletionPolicyWrapper(IndexDeletionPolicy deletionPolicy) {
this.deletionPolicy = deletionPolicy;
@@ -98,6 +100,25 @@
return result;
}
+ /** Permanently prevent this commit point from being deleted.
+ * A counter is used to allow a commit point to be correctly saved and
released
+ * multiple times. */
+ public synchronized void saveCommitPoint(Long indexCommitVersion) {
+ AtomicInteger reserveCount = savedCommits.get(indexCommitVersion);
+ if (reserveCount == null) reserveCount = new AtomicInteger();
+ reserveCount.incrementAndGet();
+ }
+
+ /** Release a previously saved commit point */
+ public synchronized void releaseCommmitPoint(Long indexCommitVersion) {
+ AtomicInteger reserveCount = savedCommits.get(indexCommitVersion);
+ if (reserveCount == null) return;// this should not happen
+ if (reserveCount.decrementAndGet() <= 0) {
+ savedCommits.remove(indexCommitVersion);
+ }
+ }
+
+
/**
* Internal use for Lucene... do not explicitly call.
*/
@@ -121,55 +142,74 @@
private class IndexCommitWrapper extends IndexCommit {
IndexCommit delegate;
+
IndexCommitWrapper(IndexCommit delegate) {
this.delegate = delegate;
}
+ @Override
public String getSegmentsFileName() {
return delegate.getSegmentsFileName();
}
+ @Override
public Collection getFileNames() throws IOException {
return delegate.getFileNames();
}
+ @Override
public Directory getDirectory() {
return delegate.getDirectory();
}
+ @Override
public void delete() {
- Long reserve = reserves.get(delegate.getVersion());
+ Long version = delegate.getVersion();
+ Long reserve = reserves.get(version);
if (reserve != null && System.currentTimeMillis() < reserve) return;
+ if(savedCommits.contains(version)) return;
delegate.delete();
}
+ @Override
public boolean isOptimized() {
return delegate.isOptimized();
}
+ @Override
public boolean equals(Object o) {
return delegate.equals(o);
}
+ @Override
public int hashCode() {
return delegate.hashCode();
}
+ @Override
public long getVersion() {
return delegate.getVersion();
}
+ @Override
public long getGeneration() {
return delegate.getGeneration();
}
+ @Override
public boolean isDeleted() {
return delegate.isDeleted();
}
+ @Override
public long getTimestamp() throws IOException {
return delegate.getTimestamp();
}
+
+ @Override
+ public Map getUserData() throws IOException {
+ return delegate.getUserData();
+ }
}
/**
Modified:
lucene/solr/trunk/src/java/org/apache/solr/handler/ReplicationHandler.java
URL:
http://svn.apache.org/viewvc/lucene/solr/trunk/src/java/org/apache/solr/handler/ReplicationHandler.java?rev=823654&r1=823653&r2=823654&view=diff
==============================================================================
--- lucene/solr/trunk/src/java/org/apache/solr/handler/ReplicationHandler.java
(original)
+++ lucene/solr/trunk/src/java/org/apache/solr/handler/ReplicationHandler.java
Fri Oct 9 18:36:28 2009
@@ -18,6 +18,7 @@
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexDeletionPolicy;
+import org.apache.lucene.index.IndexReader;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
@@ -815,15 +816,32 @@
replicateOnStart = true;
RefCounted<SolrIndexSearcher> s = core.getNewestSearcher(false);
try {
+ IndexReader reader = s==null ? null : s.get().getReader();
+ if (reader!=null && reader.getIndexCommit() != null &&
reader.getIndexCommit().getGeneration() != 1L) {
+ try {
+ if(!replicateOnCommit && replicateOnOptimize){
+ Collection<IndexCommit> commits =
IndexReader.listCommits(reader.directory());
+ for (IndexCommit ic : commits) {
+ if(ic.isOptimized()){
+ if(indexCommitPoint == null ||
indexCommitPoint.getVersion() < ic.getVersion()) indexCommitPoint = ic;
+ }
+ }
+ } else{
+ indexCommitPoint = reader.getIndexCommit();
+ }
+ } finally {
+ if(indexCommitPoint != null){
+
core.getDeletionPolicy().saveCommitPoint(indexCommitPoint.getVersion());
+ }
+ }
+ }
if (core.getUpdateHandler() instanceof DirectUpdateHandler2) {
((DirectUpdateHandler2) core.getUpdateHandler()).forceOpenWriter();
} else {
LOG.warn("The update handler being used is not an instance or
sub-class of DirectUpdateHandler2. " +
"Replicate on Startup cannot work.");
- }
- if (s.get().getReader().getIndexCommit() != null)
- if (s.get().getReader().getIndexCommit().getGeneration() != 1L)
- indexCommitPoint = s.get().getReader().getIndexCommit();
+ }
+
} catch (IOException e) {
LOG.warn("Unable to get IndexCommit on startup", e);
} finally {
@@ -892,13 +910,18 @@
* This refreshes the latest replicateable index commit and optionally
can create Snapshots as well
*/
public void postCommit() {
- if (getCommit || snapshoot) {
+ if (getCommit) {
+ IndexCommit oldCommitPoint = indexCommitPoint;
indexCommitPoint = core.getDeletionPolicy().getLatestCommit();
+
core.getDeletionPolicy().saveCommitPoint(indexCommitPoint.getVersion());
+ if(oldCommitPoint != null){
+
core.getDeletionPolicy().releaseCommmitPoint(oldCommitPoint.getVersion());
+ }
}
if (snapshoot) {
try {
SnapShooter snapShooter = new SnapShooter(core, null);
- snapShooter.createSnapAsync(indexCommitPoint.getFileNames(),
ReplicationHandler.this);
+
snapShooter.createSnapAsync(core.getDeletionPolicy().getLatestCommit().getFileNames(),
ReplicationHandler.this);
} catch (Exception e) {
LOG.error("Exception while snapshooting", e);
}