Ram has uploaded a new change for review.
https://gerrit.wikimedia.org/r/53299
Change subject: Bug: 45266 Use sequence numbers instead of timestamps
......................................................................
Bug: 45266 Use sequence numbers instead of timestamps
This is a revised version of the earlier fix and should be compatible
with old clients and servers. Needs parallel changes to OAI extension to
remedy the issue of losing updates.
Bug: 45266
Change-Id: Ia8d74f82ecf7d4c5c1b612a39fbcef99bcd10334
---
M src/org/wikimedia/lsearch/oai/IncrementalUpdater.java
M src/org/wikimedia/lsearch/oai/OAIHarvester.java
M src/org/wikimedia/lsearch/oai/OAIParser.java
3 files changed, 206 insertions(+), 46 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/operations/debs/lucene-search-2
refs/changes/99/53299/1
diff --git a/src/org/wikimedia/lsearch/oai/IncrementalUpdater.java
b/src/org/wikimedia/lsearch/oai/IncrementalUpdater.java
index 6028fa2..4fcb352 100755
--- a/src/org/wikimedia/lsearch/oai/IncrementalUpdater.java
+++ b/src/org/wikimedia/lsearch/oai/IncrementalUpdater.java
@@ -45,10 +45,12 @@
*
*/
public class IncrementalUpdater {
+ public enum ServerType { TIMESTAMP, SEQUENCE };
+
static Logger log = Logger.getLogger(IncrementalUpdater.class);
protected static int maxQueueSize = 500;
protected static int bufferDocs = 50;
-
+
static public class OAIAuthenticator extends Authenticator {
protected String username,password;
@@ -85,6 +87,13 @@
* @param args
*/
public static void main(String[] args){
+ // only for debugging: allows use of a different log file from
the main process
+ for (int i =0; i < args.length; i++) {
+ if (args[i].equals("-configfile")) {
+ Configuration.setConfigFile(args[++i]);
+ }
+ }
+
// config
Configuration config = Configuration.open();
GlobalConfiguration global = GlobalConfiguration.getInstance();
@@ -105,7 +114,15 @@
boolean requestSnapshot = false;
String noOptimizationDBlistFile = null;
HashSet<String> noOptimizationDBs = new HashSet<String>();
-
+
+
+ // old servers do timestamp-based transfers but that is
unreliable and
+ // may cause lost updates; new ones use sequence numbers which
are more
+ // reliable
+ //
+ String seq_first = null; // first record to fetch
+ ServerType server_type = null;
+
// args
for(int i=0; i<args.length; i++){
if(args[i].equals("-d"))
@@ -130,14 +147,21 @@
requestSnapshot = true;
else if(args[i].equals("-nof"))
noOptimizationDBlistFile = args[++i];
+ else if(args[i].equals("-q"))
+ seq_first = args[++i];
else if(args[i].equals("--help"))
break;
+ else if(args[i].equals("-configfile"))
+ ++i; // skip argument
else if(args[i].startsWith("-")){
System.out.println("Unrecognized switch
"+args[i]);
return;
} else
dbnames.add(args[i]);
}
+ if ( null != timestamp && null != seq_first ) {
+ System.out.println( "Cannot specify both timestamp and
sequence number" ); return;
+ }
if(useLocal)
dbnames.addAll(global.getMyIndexDBnames());
dbnames.addAll(readDBList(dblist));
@@ -159,6 +183,7 @@
System.out.println(" -ef - exclude db names listed in
dblist file");
System.out.println(" -sn - immediately make
unoptimized snapshot as updates finish ");
System.out.println(" -nof - use with -sn to specify a
file with databases not to be optimized");
+ System.out.println(" -q - sequence number to start
from");
return;
}
// preload
@@ -171,13 +196,17 @@
maxQueueSize = config.getInt("OAI","maxqueue",500);
bufferDocs = config.getInt("OAI","bufferdocs",50);
+ log.trace( String.format( "maxQueueSize = %d, bufferDocs =
%d\n", maxQueueSize, bufferDocs ) );
firstPass.addAll(dbnames);
// update
do{
- main_loop: for(String dbname : dbnames){
+ main_loop:
+ for(String dbname : dbnames){
try{
- if(excludeList.contains(dbname))
+ if(excludeList.contains(dbname)) {
+ log.trace( String.format( "%s
in excludeList, skipped\n", dbname ) );
continue;
+ }
IndexId iid = IndexId.get(dbname);
OAIHarvester harvester = new
OAIHarvester(iid,iid.getOAIRepository(),auth);
OAIHarvester harvesterSingle = new
OAIHarvester(iid,iid.getOAIRepository(),auth);
@@ -193,15 +222,55 @@
} catch (IOException e) {
log.warn("I/O error reading
status file for "+iid+" at "+iid.getStatusPath()+" : "+e.getMessage(),e);
}
- String from;
- if(firstPass.contains(dbname) &&
timestamp!=null)
- from = timestamp;
- else
- from =
status.getProperty("timestamp",defaultTimestamp);
- log.info("Resuming update of "+iid+"
from "+from);
- ArrayList<IndexUpdateRecord> records =
harvester.getRecords(from,bufferDocs);
- if(records.size() == 0)
+
+ // fetch next batch of records based
on sequence number (new scheme) or
+ // timestamp (old scheme)
+ //
+ String from = null, seq_next = null;
+ ArrayList<IndexUpdateRecord> records =
null;
+
+ if ( null == server_type ) { //
server type not yet known
+ if ( firstPass.contains( dbname
) ) {
+ if ( null != seq_first
) {
+ server_type =
ServerType.SEQUENCE;
+ seq_next =
seq_first;
+ } else if ( null !=
timestamp ) {
+ server_type =
ServerType.TIMESTAMP;
+ from =
timestamp;
+ }
+ }
+ if ( null == seq_next && null
== from ) {
+ seq_next =
status.getProperty( "sequence" );
+ if ( null != seq_next )
{
+ server_type =
ServerType.SEQUENCE;
+ } else {
+ // The first
successful transfer will tell us the server type
+ //server_type =
ServerType.TIMESTAMP;
+ from =
status.getProperty( "timestamp", defaultTimestamp );
+ }
+ }
+ } else if ( ServerType.TIMESTAMP ==
server_type ) {
+ from = (firstPass.contains(
dbname ) && null != timestamp)
+ ? timestamp :
status.getProperty( "timestamp", defaultTimestamp );
+ } else { // sequence numbers
+ seq_next = (firstPass.contains(
dbname ) && null != seq_first)
+ ? seq_first :
status.getProperty( "sequence" );
+ }
+ if ( ServerType.SEQUENCE == server_type
) { // working with sequence numbers
+ log.info( "Resuming update of
"+ iid + ", seq = " + seq_next );
+ records =
harvester.getRecordsSeq( seq_next, bufferDocs );
+ } else { // working with
timestamps
+ log.info( "Resuming update of "
+ iid + ", from = " + from );
+ records = harvester.getRecords(
from, bufferDocs );
+ }
+
+ if ( records.size() == 0 ) {
+ log.trace( String.format( "No
records\n" ) );
continue;
+ }
+ if ( null == server_type ) { //
first successful transfer
+ server_type =
harvester.hasNext() ? ServerType.SEQUENCE : ServerType.TIMESTAMP;
+ }
boolean hasMore = false;
do{
// send to indexer
@@ -209,7 +278,7 @@
try {
// send main
printRecords(records);
-
ensureNotOverladed(messenger,iid);
+
ensureNotOverloaded(messenger,iid);
log.info(iid+": Sending
"+records.size()+" records to indexer");
HashSet<String> fetch =
messenger.enqueueFrontend(records.toArray(new IndexUpdateRecord[]
{}),iid.getIndexHost());
if(fetch.size()>0){
@@ -220,7 +289,7 @@
}
// send
additional
printRecords(additional);
-
ensureNotOverladed(messenger,iid);
+
ensureNotOverloaded(messenger,iid);
log.info(iid+":
Sending additional "+additional.size()+" records to indexer");
messenger.enqueueFrontend(additional.toArray(new IndexUpdateRecord[]
{}),iid.getIndexHost());
}
@@ -238,10 +307,12 @@
} while(hasMore);
// see if we need to wait for
notification
+ log.trace( String.format(
"notification = %s\n", notification ) );
if(notification){
RMIMessengerClient messenger =
new RMIMessengerClient(true);
String host =
iid.getIndexHost();
boolean req =
messenger.requestFlushAndNotify(dbname,host);
+ log.trace( String.format( "req
= %s\n", req ) );
if(req){
log.info("Waiting for
flush notification for "+dbname);
Boolean succ = null;
@@ -278,14 +349,20 @@
continue main_loop;
}
- // write updated timestamp
-
status.setProperty("timestamp",harvester.getResponseDate());
+ // write timestamp and, possibly,
updated sequence number; timestamp not needed
+ // in the new scheme but we save it
since it may be useful for debugging
+ //
+ status.setProperty( "timestamp",
harvester.getResponseDate() );
+ if ( ServerType.SEQUENCE ==
server_type ) {
+ status.setProperty( "sequence",
harvester.getSequence() );
+ }
try {
if(!statf.exists())
statf.getParentFile().mkdirs();
FileOutputStream fileos = new
FileOutputStream(statf,false);
status.store(fileos,"Last
incremental update timestamp");
fileos.close();
+ log.trace( String.format(
"stored timestamp/sequence to status file\n" ) );
} catch (IOException e) {
log.warn("I/O error writing
status file for "+iid+" at "+iid.getStatusPath()+" : "+e.getMessage(),e);
}
@@ -336,7 +413,7 @@
}
}
- private static void ensureNotOverladed(RMIMessengerClient messenger,
IndexId iid) throws InterruptedException{
+ private static void ensureNotOverloaded(RMIMessengerClient messenger,
IndexId iid) throws InterruptedException{
// check if indexer is overloaded
int queueSize = 0;
do{
@@ -349,4 +426,4 @@
}
-}
\ No newline at end of file
+}
diff --git a/src/org/wikimedia/lsearch/oai/OAIHarvester.java
b/src/org/wikimedia/lsearch/oai/OAIHarvester.java
index 63ac868..f687143 100755
--- a/src/org/wikimedia/lsearch/oai/OAIHarvester.java
+++ b/src/org/wikimedia/lsearch/oai/OAIHarvester.java
@@ -1,8 +1,13 @@
package org.wikimedia.lsearch.oai;
import java.io.BufferedInputStream;
-import java.io.IOException;
+import java.io.ByteArrayOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileOutputStream;
import java.io.InputStream;
+import java.io.IOException;
+
import java.net.Authenticator;
import java.net.MalformedURLException;
import java.net.URL;
@@ -27,11 +32,35 @@
protected OAIParser parser;
protected IndexUpdatesCollector collector;
protected IndexId iid;
- protected String resumptionToken, responseDate;
+ protected String resumptionToken, responseDate, sequence;
protected String host;
/** number of retries before giving up, useful when there are broken
servers in the cluster */
protected int retries = 5;
+ // for debugging
+ // save contents of input stream to memory stream and dump to file
+ private static final boolean DBG = true;
+ private static int fnum = 1;
+ public static InputStream toMem( InputStream is ) throws IOException {
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
+ // open new dump file
+ File dir = new File( "/var/tmp" );
+ String
+ pfx = String.format( "oai_%d_", fnum++ ),
+ sfx = ".xml";
+ File dumpfile = File.createTempFile( pfx, sfx, dir );
+ FileOutputStream fos = new FileOutputStream( dumpfile );
+
+ // read bytes
+ int c;
+ while ( -1 != (c = is.read()) ) {
+ os.write( c ); fos.write( c );
+ }
+ fos.close();
+ ByteArrayInputStream bis = new ByteArrayInputStream(
os.toByteArray() );
+ return bis;
+ } // toMem
+
public OAIHarvester(IndexId iid, String url, Authenticator auth) throws
MalformedURLException{
this.urlbase = url;
this.iid = iid;
@@ -70,12 +99,21 @@
// set some timeouts
urlConn.setReadTimeout(60 * 1000); // 60 seconds
urlConn.setConnectTimeout(60 * 1000); // 60
seconds
- InputStream in = new
BufferedInputStream(urlConn.getInputStream());
+ InputStream in;
+ if ( ! DBG ) {
+ in = new
BufferedInputStream(urlConn.getInputStream());
+ } else {
+ in = toMem( urlConn.getInputStream() );
+ }
+
parser = new OAIParser(in,collector);
parser.parse();
resumptionToken = parser.getResumptionToken();
- responseDate = parser.getResponseDate();
+ responseDate = parser.getResponseDate();
+ sequence = parser.getSequence();
in.close();
+ log.trace( String.format( "resumptionToken =
%s, responseDate = %s, sequence = %s",
+ resumptionToken,
responseDate, sequence ) );
break;
} catch(IOException e){
if(tryNum == this.retries)
@@ -87,22 +125,47 @@
}
/** Invoke ListRecords using the last resumption token, get atLeast num
of records */
- public ArrayList<IndexUpdateRecord> getMoreRecords(int atLeast){
+ public ArrayList<IndexUpdateRecord> getMoreRecords( int atLeast ) {
ArrayList<IndexUpdateRecord> ret = new
ArrayList<IndexUpdateRecord>();
try{
- do{
- read(new
URL(urlbase+"&verb=ListRecords&metadataPrefix=mediawiki&resumptionToken="+resumptionToken));
- ret.addAll(collector.getRecords());
- } while(hasMore() && ret.size() < atLeast);
- } catch(IOException e){
- log.warn("I/O exception listing records:
"+e.getMessage(),e);
+ do {
+ String s = urlbase +
"&verb=ListRecords&metadataPrefix=mediawiki&";
+ s = s + (hasNext() ? "next=" + sequence :
"resumptionToken=" + resumptionToken);
+ read( new URL( s ) );
+ ret.addAll( collector.getRecords() );
+ } while ( hasMore() && ret.size() < atLeast );
+ } catch ( IOException e ) {
+ log.warn( "I/O exception listing records: " +
e.getMessage(), e );
return null;
}
return ret;
}
- public boolean hasMore(){
- return !resumptionToken.equals("");
+ /** Invoke ListRecords using the next sequence number, get atLeast num
of records */
+ public ArrayList<IndexUpdateRecord> getRecordsSeq( String seq, int
atLeast ) {
+ ArrayList<IndexUpdateRecord> ret = new
ArrayList<IndexUpdateRecord>();
+ try{
+ do {
+ read( new URL( urlbase +
"&verb=ListRecords&metadataPrefix=mediawiki&next=" + seq ) );
+ ret.addAll( collector.getRecords() );
+ } while( hasMore() && ret.size() < atLeast );
+ } catch( IOException e ){
+ log.warn( "I/O exception listing records: " +
e.getMessage(), e );
+ return null;
+ }
+ return ret;
+ }
+
+ public boolean hasMore() {
+ return hasNext() || !resumptionToken.equals("");
+ }
+
+ public boolean hasNext() {
+ return null != sequence;
+ }
+
+ public String getSequence(){ // should come from parsing response
+ return sequence;
}
public String getResponseDate(){
diff --git a/src/org/wikimedia/lsearch/oai/OAIParser.java
b/src/org/wikimedia/lsearch/oai/OAIParser.java
index b228ef8..1b8bb52 100755
--- a/src/org/wikimedia/lsearch/oai/OAIParser.java
+++ b/src/org/wikimedia/lsearch/oai/OAIParser.java
@@ -35,25 +35,37 @@
protected IndexUpdatesCollector collector;
/** parsing state */
protected boolean inRecord, inHeader, inMetadata, inResponseDate;
- protected boolean inDump, inIdentifier, inResumptionToken;
- protected String oaiId,pageId,resumptionToken,responseDate;
- protected boolean beginMW; // beginning of mediawiki stream
- protected String mwUri, mwLocalName, mwQName;
+ protected boolean inDump, inIdentifier, inResumptionToken, inSequence;
+ protected String oaiId,pageId, resumptionToken, responseDate, sequence;
+ protected boolean beginMW; // beginning of mediawiki stream
+ protected String mwUri, mwLocalName, mwQName;
protected boolean isDeleted, inReferences, inRedirect, inRedirectTitle,
inRedirectRef;
- protected String references, redirectTitle, redirectRef;
+ protected String references, redirectTitle, redirectRef;
public OAIParser(InputStream in, IndexUpdatesCollector collector){
dumpReader = new XmlDumpReader(null,collector);
this.in = in;
this.collector = collector;
- inDump = false; inIdentifier = false; inResumptionToken = false;
- inRecord = false; inHeader = false; inMetadata = false;
- inResponseDate = false; inReferences = false;
- oaiId = ""; resumptionToken = ""; responseDate = "";
- beginMW = true; references = "";
- inRedirect = false; inRedirectTitle= false; inRedirectRef =
false;
- redirectTitle = ""; redirectRef = "";
+ inDump = false;
+ inIdentifier = false;
+ inResumptionToken = false;
+ inSequence = false;
+ inRecord = false;
+ inHeader = false;
+ inMetadata = false;
+ inResponseDate = false;
+ inReferences = false;
+ inRedirect = false;
+ inRedirectTitle = false;
+ inRedirectRef = false;
+ oaiId = "";
+ resumptionToken = "";
+ responseDate = "";
+ beginMW = true;
+ references = "";
+ redirectTitle = "";
+ redirectRef = "";
}
public void parse() throws IOException{
@@ -111,6 +123,9 @@
} else if(qName.equals("resumptionToken")){
resumptionToken = "";
inResumptionToken = true;
+ } else if(qName.equals("next")){
+ sequence = "";
+ inSequence = true;
} else if(qName.equals("responseDate")){
responseDate = "";
inResponseDate = true;
@@ -161,6 +176,8 @@
inIdentifier = false;
} else if(qName.equals("resumptionToken"))
inResumptionToken = false;
+ else if(qName.equals("next"))
+ inSequence = false;
else if(qName.equals("responseDate"))
inResponseDate = false;
}
@@ -174,6 +191,8 @@
dumpReader.characters(ch,start,length);
} else if(inResumptionToken){
resumptionToken += new String(ch,start,length);
+ } else if(inSequence){
+ sequence += new String(ch,start,length);
} else if(inResponseDate){
responseDate += new String(ch,start,length);
} else if(inReferences){
@@ -196,6 +215,10 @@
return resumptionToken;
}
+ public String getSequence() {
+ return sequence;
+ }
+
public IndexUpdatesCollector getCollector() {
return collector;
}
@@ -203,7 +226,4 @@
public String getResponseDate() {
return responseDate;
}
-
-
-
}
--
To view, visit https://gerrit.wikimedia.org/r/53299
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ia8d74f82ecf7d4c5c1b612a39fbcef99bcd10334
Gerrit-PatchSet: 1
Gerrit-Project: operations/debs/lucene-search-2
Gerrit-Branch: master
Gerrit-Owner: Ram <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits