Tim Starling has submitted this change and it was merged.
Change subject: Bug: 45266 Use sequence numbers instead of timestamps
......................................................................
Bug: 45266 Use sequence numbers instead of timestamps
Fix spacing issues.
Turn off DBG flag. Testing complete, ready for review.
IncrementalUpdater.java -- fix bug in recognizing new style server
OAIHarvester.java -- Fix indentation; add parameter to getRecords() to
control addition new URL parameter; add overloaded getRecords(); fix bugs
in getMoreRecords(), hasMore() and getSequence(). Testing continues.
This is a revised version of the earlier fix and should be compatible
with old clients and servers. Needs parallel changes to OAI extension to
remedy the issue of losing updates.
Bug: 45266
Change-Id: Ia8d74f82ecf7d4c5c1b612a39fbcef99bcd10334
---
M src/org/wikimedia/lsearch/oai/IncrementalUpdater.java
M src/org/wikimedia/lsearch/oai/OAIHarvester.java
M src/org/wikimedia/lsearch/oai/OAIParser.java
3 files changed, 250 insertions(+), 66 deletions(-)
Approvals:
Tim Starling: Verified; Looks good to me, approved
jenkins-bot: Verified
diff --git a/src/org/wikimedia/lsearch/oai/IncrementalUpdater.java
b/src/org/wikimedia/lsearch/oai/IncrementalUpdater.java
index 6028fa2..cdda32d 100755
--- a/src/org/wikimedia/lsearch/oai/IncrementalUpdater.java
+++ b/src/org/wikimedia/lsearch/oai/IncrementalUpdater.java
@@ -45,10 +45,12 @@
*
*/
public class IncrementalUpdater {
+ public enum ServerType { TIMESTAMP, SEQUENCE };
+
static Logger log = Logger.getLogger(IncrementalUpdater.class);
protected static int maxQueueSize = 500;
protected static int bufferDocs = 50;
-
+
static public class OAIAuthenticator extends Authenticator {
protected String username,password;
@@ -85,6 +87,13 @@
* @param args
*/
public static void main(String[] args){
+ // only for debugging: allows use of a different log file from
the main process
+ for (int i =0; i < args.length; i++) {
+ if (args[i].equals("-configfile")) {
+ Configuration.setConfigFile(args[++i]);
+ }
+ }
+
// config
Configuration config = Configuration.open();
GlobalConfiguration global = GlobalConfiguration.getInstance();
@@ -105,7 +114,15 @@
boolean requestSnapshot = false;
String noOptimizationDBlistFile = null;
HashSet<String> noOptimizationDBs = new HashSet<String>();
-
+
+
+ // old servers do timestamp-based transfers but that is
unreliable and
+ // may cause lost updates; new ones use sequence numbers which
are more
+ // reliable
+ //
+ String seq_first = null; // first record to fetch
+ ServerType server_type = null;
+
// args
for(int i=0; i<args.length; i++){
if(args[i].equals("-d"))
@@ -130,14 +147,21 @@
requestSnapshot = true;
else if(args[i].equals("-nof"))
noOptimizationDBlistFile = args[++i];
+ else if(args[i].equals("-q"))
+ seq_first = args[++i];
else if(args[i].equals("--help"))
break;
+ else if(args[i].equals("-configfile"))
+ ++i; // skip argument
else if(args[i].startsWith("-")){
System.out.println("Unrecognized switch
"+args[i]);
return;
} else
dbnames.add(args[i]);
}
+ if ( null != timestamp && null != seq_first ) {
+ System.out.println( "Cannot specify both timestamp and
sequence number" ); return;
+ }
if(useLocal)
dbnames.addAll(global.getMyIndexDBnames());
dbnames.addAll(readDBList(dblist));
@@ -159,6 +183,7 @@
System.out.println(" -ef - exclude db names listed in
dblist file");
System.out.println(" -sn - immediately make
unoptimized snapshot as updates finish ");
System.out.println(" -nof - use with -sn to specify a
file with databases not to be optimized");
+ System.out.println(" -q - sequence number to start
from");
return;
}
// preload
@@ -171,37 +196,85 @@
maxQueueSize = config.getInt("OAI","maxqueue",500);
bufferDocs = config.getInt("OAI","bufferdocs",50);
+ log.trace( String.format( "maxQueueSize = %d, bufferDocs =
%d\n", maxQueueSize, bufferDocs ) );
firstPass.addAll(dbnames);
// update
do{
- main_loop: for(String dbname : dbnames){
+ main_loop:
+ for(String dbname : dbnames){
try{
- if(excludeList.contains(dbname))
+ if(excludeList.contains(dbname)) {
+ log.trace( String.format( "%s
in excludeList, skipped\n", dbname ) );
continue;
+ }
IndexId iid = IndexId.get(dbname);
OAIHarvester harvester = new
OAIHarvester(iid,iid.getOAIRepository(),auth);
OAIHarvester harvesterSingle = new
OAIHarvester(iid,iid.getOAIRepository(),auth);
- Properties status = new Properties();
+ Properties status = new Properties();
// read timestamp from status file
File statf = new
File(iid.getStatusPath());
- try {
- if(statf.exists()){
+ try {
+ if(statf.exists()){
FileInputStream fileis
= new FileInputStream(iid.getStatusPath());
status.load(fileis);
fileis.close();
}
} catch (IOException e) {
log.warn("I/O error reading
status file for "+iid+" at "+iid.getStatusPath()+" : "+e.getMessage(),e);
- }
- String from;
- if(firstPass.contains(dbname) &&
timestamp!=null)
- from = timestamp;
- else
- from =
status.getProperty("timestamp",defaultTimestamp);
- log.info("Resuming update of "+iid+"
from "+from);
- ArrayList<IndexUpdateRecord> records =
harvester.getRecords(from,bufferDocs);
- if(records.size() == 0)
+ }
+
+ // fetch next batch of records based on
sequence number (new scheme) or
+ // timestamp (old scheme)
+ //
+ String from = null, seq_next = null;
+ ArrayList<IndexUpdateRecord> records =
null;
+
+ if ( null == server_type ) { //
server type not yet known
+ if ( firstPass.contains( dbname
) ) {
+ if ( null != seq_first
) {
+ server_type =
ServerType.SEQUENCE;
+ seq_next =
seq_first;
+ } else if ( null !=
timestamp ) {
+ server_type =
ServerType.TIMESTAMP;
+ from =
timestamp;
+ }
+ }
+ if ( null == seq_next && null
== from ) {
+ seq_next =
status.getProperty( "sequence" );
+ if ( null != seq_next )
{
+ server_type =
ServerType.SEQUENCE;
+ } else {
+ // The first
successful transfer will tell us the server type
+ //server_type =
ServerType.TIMESTAMP;
+ from =
status.getProperty( "timestamp", defaultTimestamp );
+ }
+ }
+ } else if ( ServerType.TIMESTAMP ==
server_type ) {
+ from = (firstPass.contains(
dbname ) && null != timestamp)
+ ? timestamp :
status.getProperty( "timestamp", defaultTimestamp );
+ } else { // sequence numbers
+ seq_next = (firstPass.contains(
dbname ) && null != seq_first)
+ ? seq_first :
status.getProperty( "sequence" );
+ }
+ if ( ServerType.SEQUENCE == server_type
) { // working with sequence numbers
+ log.info( "Resuming update of
"+ iid + ", seq = " + seq_next );
+ records =
harvester.getRecordsSeq( seq_next, bufferDocs );
+ } else if ( null == server_type ) { //
test if server can do sequence numbers
+ log.info( "Resuming update of "
+ iid + ", from = " + from + ", next = -1" );
+ records = harvester.getRecords(
from, bufferDocs, true );
+ } else { // working with
timestamps
+ log.info( "Resuming update of "
+ iid + ", from = " + from );
+ records = harvester.getRecords(
from, bufferDocs );
+ }
+
+ if ( records.size() == 0 ) {
+ log.trace( String.format( "No
records\n" ) );
continue;
+ }
+ if ( null == server_type ) { //
first successful transfer
+ server_type =
harvester.hasNext() ? ServerType.SEQUENCE : ServerType.TIMESTAMP;
+ log.trace( String.format(
"Server type = %s\n", server_type ) );
+ }
boolean hasMore = false;
do{
// send to indexer
@@ -209,7 +282,7 @@
try {
// send main
printRecords(records);
-
ensureNotOverladed(messenger,iid);
+
ensureNotOverloaded(messenger,iid);
log.info(iid+": Sending
"+records.size()+" records to indexer");
HashSet<String> fetch =
messenger.enqueueFrontend(records.toArray(new IndexUpdateRecord[]
{}),iid.getIndexHost());
if(fetch.size()>0){
@@ -220,7 +293,7 @@
}
// send
additional
printRecords(additional);
-
ensureNotOverladed(messenger,iid);
+
ensureNotOverloaded(messenger,iid);
log.info(iid+":
Sending additional "+additional.size()+" records to indexer");
messenger.enqueueFrontend(additional.toArray(new IndexUpdateRecord[]
{}),iid.getIndexHost());
}
@@ -238,10 +311,12 @@
} while(hasMore);
// see if we need to wait for
notification
+ log.trace( String.format( "notification
= %s\n", notification ) );
if(notification){
RMIMessengerClient messenger =
new RMIMessengerClient(true);
String host =
iid.getIndexHost();
boolean req =
messenger.requestFlushAndNotify(dbname,host);
+ log.trace( String.format( "req
= %s\n", req ) );
if(req){
log.info("Waiting for
flush notification for "+dbname);
Boolean succ = null;
@@ -278,14 +353,20 @@
continue main_loop;
}
- // write updated timestamp
-
status.setProperty("timestamp",harvester.getResponseDate());
+ // write timestamp and, possibly,
updated sequence number; timestamp not needed
+ // in the new scheme but we save it
since it may be useful for debugging
+ //
+ status.setProperty( "timestamp",
harvester.getResponseDate() );
+ if ( ServerType.SEQUENCE == server_type
) {
+ status.setProperty( "sequence",
harvester.getSequence() );
+ }
try {
if(!statf.exists())
statf.getParentFile().mkdirs();
FileOutputStream fileos = new
FileOutputStream(statf,false);
status.store(fileos,"Last
incremental update timestamp");
fileos.close();
+ log.trace( String.format(
"stored timestamp/sequence to status file\n" ) );
} catch (IOException e) {
log.warn("I/O error writing
status file for "+iid+" at "+iid.getStatusPath()+" : "+e.getMessage(),e);
}
@@ -336,7 +417,7 @@
}
}
- private static void ensureNotOverladed(RMIMessengerClient messenger,
IndexId iid) throws InterruptedException{
+ private static void ensureNotOverloaded(RMIMessengerClient messenger,
IndexId iid) throws InterruptedException{
// check if indexer is overloaded
int queueSize = 0;
do{
@@ -349,4 +430,4 @@
}
-}
\ No newline at end of file
+}
diff --git a/src/org/wikimedia/lsearch/oai/OAIHarvester.java
b/src/org/wikimedia/lsearch/oai/OAIHarvester.java
index 63ac868..96834ba 100755
--- a/src/org/wikimedia/lsearch/oai/OAIHarvester.java
+++ b/src/org/wikimedia/lsearch/oai/OAIHarvester.java
@@ -1,8 +1,13 @@
package org.wikimedia.lsearch.oai;
import java.io.BufferedInputStream;
-import java.io.IOException;
+import java.io.ByteArrayOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileOutputStream;
import java.io.InputStream;
+import java.io.IOException;
+
import java.net.Authenticator;
import java.net.MalformedURLException;
import java.net.URL;
@@ -27,11 +32,35 @@
protected OAIParser parser;
protected IndexUpdatesCollector collector;
protected IndexId iid;
- protected String resumptionToken, responseDate;
+ protected String resumptionToken, responseDate, sequence;
protected String host;
/** number of retries before giving up, useful when there are broken
servers in the cluster */
protected int retries = 5;
+ // for debugging
+ // save contents of input stream to memory stream and dump to file
+ private static final boolean DBG = false;
+ private static int fnum = 1;
+ public static InputStream toMem( InputStream is ) throws IOException {
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
+ // open new dump file
+ File dir = new File( "/var/tmp" );
+ String
+ pfx = String.format( "oai_%d_", fnum++ ),
+ sfx = ".xml";
+ File dumpfile = File.createTempFile( pfx, sfx, dir );
+ FileOutputStream fos = new FileOutputStream( dumpfile );
+
+ // read bytes
+ int c;
+ while ( -1 != (c = is.read()) ) {
+ os.write( c ); fos.write( c );
+ }
+ fos.close();
+ ByteArrayInputStream bis = new ByteArrayInputStream(
os.toByteArray() );
+ return bis;
+ } // toMem
+
public OAIHarvester(IndexId iid, String url, Authenticator auth) throws
MalformedURLException{
this.urlbase = url;
this.iid = iid;
@@ -41,26 +70,37 @@
Authenticator.setDefault(auth);
}
- /** Invoke ListRecords from a certain timestamp, fetching at least
records.. */
- public ArrayList<IndexUpdateRecord> getRecords(String from, int
atLeast) throws IOException {
+ /** Invoke ListRecords from a certain timestamp, fetching atLeast
records.
+ * if 'test' is true, append "next=-1" parameter to test if the server
supports sequence
+ * numbers; if it does, we'll see a "<next>...</next>" element in the
response.
+ */
+ public ArrayList<IndexUpdateRecord> getRecords( String from, int
atLeast, boolean test ) throws IOException {
ArrayList<IndexUpdateRecord> ret = new
ArrayList<IndexUpdateRecord>();
- read(new
URL(urlbase+"&verb=ListRecords&metadataPrefix=mediawiki&from="+from));
- ret.addAll(collector.getRecords());
- if(ret.size() < atLeast && hasMore())
- ret.addAll( getMoreRecords(atLeast - ret.size()) );
-
+ String s = urlbase +
"&verb=ListRecords&metadataPrefix=mediawiki&from=" + from;
+ if ( test ) {
+ s += "&next=-1";
+ }
+ read( new URL( s ) );
+ ret.addAll( collector.getRecords() );
+ if ( ret.size() < atLeast && hasMore() ) {
+ ret.addAll( getMoreRecords( atLeast - ret.size() ) );
+ }
return ret;
- }
-
+ } // getRecords
+
+ public ArrayList<IndexUpdateRecord> getRecords( String from, int
atLeast ) throws IOException {
+ return getRecords( from, atLeast, false );
+ } // getRecords
+
/** Get single record */
public ArrayList<IndexUpdateRecord> getRecord(String key) throws
IOException {
// sample key: oai:localhost:wikilucene:25139
String id = "oai:"+host+":"+iid.getDBname()+":"+key;
read(new
URL(urlbase+"&verb=GetRecord&metadataPrefix=mediawiki&identifier="+id));
return collector.getRecords();
- }
-
- protected void read(URL url) throws IOException {
+ } // getRecord
+
+ protected void read( URL url ) throws IOException {
log.info("Reading records from "+url);
// try reading from url a number of times before giving up
for(int tryNum = 1; tryNum <= this.retries; tryNum++){
@@ -70,12 +110,21 @@
// set some timeouts
urlConn.setReadTimeout(60 * 1000); // 60 seconds
urlConn.setConnectTimeout(60 * 1000); // 60
seconds
- InputStream in = new
BufferedInputStream(urlConn.getInputStream());
+ InputStream in;
+ if ( ! DBG ) {
+ in = new
BufferedInputStream(urlConn.getInputStream());
+ } else {
+ in = toMem( urlConn.getInputStream() );
+ }
+
parser = new OAIParser(in,collector);
parser.parse();
resumptionToken = parser.getResumptionToken();
- responseDate = parser.getResponseDate();
+ responseDate = parser.getResponseDate();
+ sequence = parser.getSequence();
in.close();
+ log.trace( String.format( "resumptionToken =
%s, responseDate = %s, sequence = %s",
+ resumptionToken, responseDate, sequence
) );
break;
} catch(IOException e){
if(tryNum == this.retries)
@@ -84,27 +133,61 @@
log.warn("Error reading from url (will
retry): "+url);
}
}
- }
+ } // read
/** Invoke ListRecords using the last resumption token, get atLeast num
of records */
- public ArrayList<IndexUpdateRecord> getMoreRecords(int atLeast){
+ public ArrayList<IndexUpdateRecord> getMoreRecords( int atLeast ) {
ArrayList<IndexUpdateRecord> ret = new
ArrayList<IndexUpdateRecord>();
try{
- do{
- read(new
URL(urlbase+"&verb=ListRecords&metadataPrefix=mediawiki&resumptionToken="+resumptionToken));
- ret.addAll(collector.getRecords());
- } while(hasMore() && ret.size() < atLeast);
- } catch(IOException e){
- log.warn("I/O exception listing records:
"+e.getMessage(),e);
+ do {
+ String s = urlbase +
"&verb=ListRecords&metadataPrefix=mediawiki&";
+ s += hasNext() ? "next=" + getSequence() :
"resumptionToken=" + resumptionToken;
+ read( new URL( s ) );
+ ret.addAll( collector.getRecords() );
+ } while ( hasMore() && ret.size() < atLeast );
+ } catch ( IOException e ) {
+ log.warn( "I/O exception listing records: " +
e.getMessage(), e );
return null;
}
return ret;
- }
+ } // getMoreRecords
- public boolean hasMore(){
- return !resumptionToken.equals("");
+ /** Invoke ListRecords using the next sequence number, get atLeast num
of records */
+ public ArrayList<IndexUpdateRecord> getRecordsSeq( String seq, int
atLeast ) {
+ ArrayList<IndexUpdateRecord> ret = new
ArrayList<IndexUpdateRecord>();
+ try{
+ do {
+ read( new URL( urlbase +
"&verb=ListRecords&metadataPrefix=mediawiki&next=" + seq ) );
+ ret.addAll( collector.getRecords() );
+ } while( hasMore() && ret.size() < atLeast );
+ } catch( IOException e ){
+ log.warn( "I/O exception listing records: " +
e.getMessage(), e );
+ return null;
+ }
+ return ret;
+ } // getRecordsSeq
+
+ public boolean hasMore() {
+ if ( hasNext() ) {
+ return sequence.endsWith( "+" );
+ }
+
+ // we didn't get a sequence number so server must be
timestamp-based
+ // so we expect resumptionToken to be non-null here
+ //
+ return ! "".equals( resumptionToken );
}
-
+
+ public boolean hasNext() {
+ return null != sequence;
+ }
+
+ public String getSequence(){ // should come from parsing response
+ return sequence.endsWith( "+" )
+ ? sequence.substring( 0, sequence.length() - 1 )
+ : sequence;
+ }
+
public String getResponseDate(){
return responseDate;
}
diff --git a/src/org/wikimedia/lsearch/oai/OAIParser.java
b/src/org/wikimedia/lsearch/oai/OAIParser.java
index b228ef8..ed8954d 100755
--- a/src/org/wikimedia/lsearch/oai/OAIParser.java
+++ b/src/org/wikimedia/lsearch/oai/OAIParser.java
@@ -35,27 +35,39 @@
protected IndexUpdatesCollector collector;
/** parsing state */
protected boolean inRecord, inHeader, inMetadata, inResponseDate;
- protected boolean inDump, inIdentifier, inResumptionToken;
- protected String oaiId,pageId,resumptionToken,responseDate;
- protected boolean beginMW; // beginning of mediawiki stream
- protected String mwUri, mwLocalName, mwQName;
+ protected boolean inDump, inIdentifier, inResumptionToken, inSequence;
+ protected String oaiId,pageId, resumptionToken, responseDate, sequence;
+ protected boolean beginMW; // beginning of mediawiki stream
+ protected String mwUri, mwLocalName, mwQName;
protected boolean isDeleted, inReferences, inRedirect, inRedirectTitle,
inRedirectRef;
- protected String references, redirectTitle, redirectRef;
+ protected String references, redirectTitle, redirectRef;
public OAIParser(InputStream in, IndexUpdatesCollector collector){
dumpReader = new XmlDumpReader(null,collector);
this.in = in;
this.collector = collector;
- inDump = false; inIdentifier = false; inResumptionToken = false;
- inRecord = false; inHeader = false; inMetadata = false;
- inResponseDate = false; inReferences = false;
- oaiId = ""; resumptionToken = ""; responseDate = "";
- beginMW = true; references = "";
- inRedirect = false; inRedirectTitle= false; inRedirectRef =
false;
- redirectTitle = ""; redirectRef = "";
+ inDump = false;
+ inIdentifier = false;
+ inResumptionToken = false;
+ inSequence = false;
+ inRecord = false;
+ inHeader = false;
+ inMetadata = false;
+ inResponseDate = false;
+ inReferences = false;
+ inRedirect = false;
+ inRedirectTitle = false;
+ inRedirectRef = false;
+ oaiId = "";
+ resumptionToken = "";
+ responseDate = "";
+ beginMW = true;
+ references = "";
+ redirectTitle = "";
+ redirectRef = "";
}
-
+
public void parse() throws IOException{
try {
SAXParserFactory factory =
SAXParserFactory.newInstance();
@@ -111,6 +123,9 @@
} else if(qName.equals("resumptionToken")){
resumptionToken = "";
inResumptionToken = true;
+ } else if(qName.equals("next")){
+ sequence = "";
+ inSequence = true;
} else if(qName.equals("responseDate")){
responseDate = "";
inResponseDate = true;
@@ -161,6 +176,8 @@
inIdentifier = false;
} else if(qName.equals("resumptionToken"))
inResumptionToken = false;
+ else if(qName.equals("next"))
+ inSequence = false;
else if(qName.equals("responseDate"))
inResponseDate = false;
}
@@ -174,6 +191,8 @@
dumpReader.characters(ch,start,length);
} else if(inResumptionToken){
resumptionToken += new String(ch,start,length);
+ } else if(inSequence){
+ sequence += new String(ch,start,length);
} else if(inResponseDate){
responseDate += new String(ch,start,length);
} else if(inReferences){
@@ -196,6 +215,10 @@
return resumptionToken;
}
+ public String getSequence() {
+ return sequence;
+ }
+
public IndexUpdatesCollector getCollector() {
return collector;
}
@@ -203,7 +226,4 @@
public String getResponseDate() {
return responseDate;
}
-
-
-
}
--
To view, visit https://gerrit.wikimedia.org/r/53299
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: Ia8d74f82ecf7d4c5c1b612a39fbcef99bcd10334
Gerrit-PatchSet: 5
Gerrit-Project: operations/debs/lucene-search-2
Gerrit-Branch: master
Gerrit-Owner: Ram <[email protected]>
Gerrit-Reviewer: Demon <[email protected]>
Gerrit-Reviewer: Ram <[email protected]>
Gerrit-Reviewer: Tim Starling <[email protected]>
Gerrit-Reviewer: jenkins-bot
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits