Tim Starling has submitted this change and it was merged. Change subject: Bug: 45266 Use sequence numbers instead of timestamps ......................................................................
Bug: 45266 Use sequence numbers instead of timestamps Fix spacing issues. Turn off DBG flag. Testing complete, ready for review. IncrementalUpdater.java -- fix bug in recognizing new style server OAIHarvester.java -- Fix indentation; add parameter to getRecords() to control addition new URL parameter; add overloaded getRecords(); fix bugs in getMoreRecords(), hasMore() and getSequence(). Testing continues. This is a revised version of the earlier fix and should be compatible with old clients and servers. Needs parallel changes to OAI extension to remedy the issue of losing updates. Bug: 45266 Change-Id: Ia8d74f82ecf7d4c5c1b612a39fbcef99bcd10334 --- M src/org/wikimedia/lsearch/oai/IncrementalUpdater.java M src/org/wikimedia/lsearch/oai/OAIHarvester.java M src/org/wikimedia/lsearch/oai/OAIParser.java 3 files changed, 250 insertions(+), 66 deletions(-) Approvals: Tim Starling: Verified; Looks good to me, approved jenkins-bot: Verified diff --git a/src/org/wikimedia/lsearch/oai/IncrementalUpdater.java b/src/org/wikimedia/lsearch/oai/IncrementalUpdater.java index 6028fa2..cdda32d 100755 --- a/src/org/wikimedia/lsearch/oai/IncrementalUpdater.java +++ b/src/org/wikimedia/lsearch/oai/IncrementalUpdater.java @@ -45,10 +45,12 @@ * */ public class IncrementalUpdater { + public enum ServerType { TIMESTAMP, SEQUENCE }; + static Logger log = Logger.getLogger(IncrementalUpdater.class); protected static int maxQueueSize = 500; protected static int bufferDocs = 50; - + static public class OAIAuthenticator extends Authenticator { protected String username,password; @@ -85,6 +87,13 @@ * @param args */ public static void main(String[] args){ + // only for debugging: allows use of a different log file from the main process + for (int i =0; i < args.length; i++) { + if (args[i].equals("-configfile")) { + Configuration.setConfigFile(args[++i]); + } + } + // config Configuration config = Configuration.open(); GlobalConfiguration global = GlobalConfiguration.getInstance(); @@ -105,7 +114,15 @@ boolean requestSnapshot = false; String noOptimizationDBlistFile = null; HashSet<String> noOptimizationDBs = new HashSet<String>(); - + + + // old servers do timestamp-based transfers but that is unreliable and + // may cause lost updates; new ones use sequence numbers which are more + // reliable + // + String seq_first = null; // first record to fetch + ServerType server_type = null; + // args for(int i=0; i<args.length; i++){ if(args[i].equals("-d")) @@ -130,14 +147,21 @@ requestSnapshot = true; else if(args[i].equals("-nof")) noOptimizationDBlistFile = args[++i]; + else if(args[i].equals("-q")) + seq_first = args[++i]; else if(args[i].equals("--help")) break; + else if(args[i].equals("-configfile")) + ++i; // skip argument else if(args[i].startsWith("-")){ System.out.println("Unrecognized switch "+args[i]); return; } else dbnames.add(args[i]); } + if ( null != timestamp && null != seq_first ) { + System.out.println( "Cannot specify both timestamp and sequence number" ); return; + } if(useLocal) dbnames.addAll(global.getMyIndexDBnames()); dbnames.addAll(readDBList(dblist)); @@ -159,6 +183,7 @@ System.out.println(" -ef - exclude db names listed in dblist file"); System.out.println(" -sn - immediately make unoptimized snapshot as updates finish "); System.out.println(" -nof - use with -sn to specify a file with databases not to be optimized"); + System.out.println(" -q - sequence number to start from"); return; } // preload @@ -171,37 +196,85 @@ maxQueueSize = config.getInt("OAI","maxqueue",500); bufferDocs = config.getInt("OAI","bufferdocs",50); + log.trace( String.format( "maxQueueSize = %d, bufferDocs = %d\n", maxQueueSize, bufferDocs ) ); firstPass.addAll(dbnames); // update do{ - main_loop: for(String dbname : dbnames){ + main_loop: + for(String dbname : dbnames){ try{ - if(excludeList.contains(dbname)) + if(excludeList.contains(dbname)) { + log.trace( String.format( "%s in excludeList, skipped\n", dbname ) ); continue; + } IndexId iid = IndexId.get(dbname); OAIHarvester harvester = new OAIHarvester(iid,iid.getOAIRepository(),auth); OAIHarvester harvesterSingle = new OAIHarvester(iid,iid.getOAIRepository(),auth); - Properties status = new Properties(); + Properties status = new Properties(); // read timestamp from status file File statf = new File(iid.getStatusPath()); - try { - if(statf.exists()){ + try { + if(statf.exists()){ FileInputStream fileis = new FileInputStream(iid.getStatusPath()); status.load(fileis); fileis.close(); } } catch (IOException e) { log.warn("I/O error reading status file for "+iid+" at "+iid.getStatusPath()+" : "+e.getMessage(),e); - } - String from; - if(firstPass.contains(dbname) && timestamp!=null) - from = timestamp; - else - from = status.getProperty("timestamp",defaultTimestamp); - log.info("Resuming update of "+iid+" from "+from); - ArrayList<IndexUpdateRecord> records = harvester.getRecords(from,bufferDocs); - if(records.size() == 0) + } + + // fetch next batch of records based on sequence number (new scheme) or + // timestamp (old scheme) + // + String from = null, seq_next = null; + ArrayList<IndexUpdateRecord> records = null; + + if ( null == server_type ) { // server type not yet known + if ( firstPass.contains( dbname ) ) { + if ( null != seq_first ) { + server_type = ServerType.SEQUENCE; + seq_next = seq_first; + } else if ( null != timestamp ) { + server_type = ServerType.TIMESTAMP; + from = timestamp; + } + } + if ( null == seq_next && null == from ) { + seq_next = status.getProperty( "sequence" ); + if ( null != seq_next ) { + server_type = ServerType.SEQUENCE; + } else { + // The first successful transfer will tell us the server type + //server_type = ServerType.TIMESTAMP; + from = status.getProperty( "timestamp", defaultTimestamp ); + } + } + } else if ( ServerType.TIMESTAMP == server_type ) { + from = (firstPass.contains( dbname ) && null != timestamp) + ? timestamp : status.getProperty( "timestamp", defaultTimestamp ); + } else { // sequence numbers + seq_next = (firstPass.contains( dbname ) && null != seq_first) + ? seq_first : status.getProperty( "sequence" ); + } + if ( ServerType.SEQUENCE == server_type ) { // working with sequence numbers + log.info( "Resuming update of "+ iid + ", seq = " + seq_next ); + records = harvester.getRecordsSeq( seq_next, bufferDocs ); + } else if ( null == server_type ) { // test if server can do sequence numbers + log.info( "Resuming update of " + iid + ", from = " + from + ", next = -1" ); + records = harvester.getRecords( from, bufferDocs, true ); + } else { // working with timestamps + log.info( "Resuming update of " + iid + ", from = " + from ); + records = harvester.getRecords( from, bufferDocs ); + } + + if ( records.size() == 0 ) { + log.trace( String.format( "No records\n" ) ); continue; + } + if ( null == server_type ) { // first successful transfer + server_type = harvester.hasNext() ? ServerType.SEQUENCE : ServerType.TIMESTAMP; + log.trace( String.format( "Server type = %s\n", server_type ) ); + } boolean hasMore = false; do{ // send to indexer @@ -209,7 +282,7 @@ try { // send main printRecords(records); - ensureNotOverladed(messenger,iid); + ensureNotOverloaded(messenger,iid); log.info(iid+": Sending "+records.size()+" records to indexer"); HashSet<String> fetch = messenger.enqueueFrontend(records.toArray(new IndexUpdateRecord[] {}),iid.getIndexHost()); if(fetch.size()>0){ @@ -220,7 +293,7 @@ } // send additional printRecords(additional); - ensureNotOverladed(messenger,iid); + ensureNotOverloaded(messenger,iid); log.info(iid+": Sending additional "+additional.size()+" records to indexer"); messenger.enqueueFrontend(additional.toArray(new IndexUpdateRecord[] {}),iid.getIndexHost()); } @@ -238,10 +311,12 @@ } while(hasMore); // see if we need to wait for notification + log.trace( String.format( "notification = %s\n", notification ) ); if(notification){ RMIMessengerClient messenger = new RMIMessengerClient(true); String host = iid.getIndexHost(); boolean req = messenger.requestFlushAndNotify(dbname,host); + log.trace( String.format( "req = %s\n", req ) ); if(req){ log.info("Waiting for flush notification for "+dbname); Boolean succ = null; @@ -278,14 +353,20 @@ continue main_loop; } - // write updated timestamp - status.setProperty("timestamp",harvester.getResponseDate()); + // write timestamp and, possibly, updated sequence number; timestamp not needed + // in the new scheme but we save it since it may be useful for debugging + // + status.setProperty( "timestamp", harvester.getResponseDate() ); + if ( ServerType.SEQUENCE == server_type ) { + status.setProperty( "sequence", harvester.getSequence() ); + } try { if(!statf.exists()) statf.getParentFile().mkdirs(); FileOutputStream fileos = new FileOutputStream(statf,false); status.store(fileos,"Last incremental update timestamp"); fileos.close(); + log.trace( String.format( "stored timestamp/sequence to status file\n" ) ); } catch (IOException e) { log.warn("I/O error writing status file for "+iid+" at "+iid.getStatusPath()+" : "+e.getMessage(),e); } @@ -336,7 +417,7 @@ } } - private static void ensureNotOverladed(RMIMessengerClient messenger, IndexId iid) throws InterruptedException{ + private static void ensureNotOverloaded(RMIMessengerClient messenger, IndexId iid) throws InterruptedException{ // check if indexer is overloaded int queueSize = 0; do{ @@ -349,4 +430,4 @@ } -} \ No newline at end of file +} diff --git a/src/org/wikimedia/lsearch/oai/OAIHarvester.java b/src/org/wikimedia/lsearch/oai/OAIHarvester.java index 63ac868..96834ba 100755 --- a/src/org/wikimedia/lsearch/oai/OAIHarvester.java +++ b/src/org/wikimedia/lsearch/oai/OAIHarvester.java @@ -1,8 +1,13 @@ package org.wikimedia.lsearch.oai; import java.io.BufferedInputStream; -import java.io.IOException; +import java.io.ByteArrayOutputStream; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileOutputStream; import java.io.InputStream; +import java.io.IOException; + import java.net.Authenticator; import java.net.MalformedURLException; import java.net.URL; @@ -27,11 +32,35 @@ protected OAIParser parser; protected IndexUpdatesCollector collector; protected IndexId iid; - protected String resumptionToken, responseDate; + protected String resumptionToken, responseDate, sequence; protected String host; /** number of retries before giving up, useful when there are broken servers in the cluster */ protected int retries = 5; + // for debugging + // save contents of input stream to memory stream and dump to file + private static final boolean DBG = false; + private static int fnum = 1; + public static InputStream toMem( InputStream is ) throws IOException { + ByteArrayOutputStream os = new ByteArrayOutputStream(); + // open new dump file + File dir = new File( "/var/tmp" ); + String + pfx = String.format( "oai_%d_", fnum++ ), + sfx = ".xml"; + File dumpfile = File.createTempFile( pfx, sfx, dir ); + FileOutputStream fos = new FileOutputStream( dumpfile ); + + // read bytes + int c; + while ( -1 != (c = is.read()) ) { + os.write( c ); fos.write( c ); + } + fos.close(); + ByteArrayInputStream bis = new ByteArrayInputStream( os.toByteArray() ); + return bis; + } // toMem + public OAIHarvester(IndexId iid, String url, Authenticator auth) throws MalformedURLException{ this.urlbase = url; this.iid = iid; @@ -41,26 +70,37 @@ Authenticator.setDefault(auth); } - /** Invoke ListRecords from a certain timestamp, fetching at least records.. */ - public ArrayList<IndexUpdateRecord> getRecords(String from, int atLeast) throws IOException { + /** Invoke ListRecords from a certain timestamp, fetching atLeast records. + * if 'test' is true, append "next=-1" parameter to test if the server supports sequence + * numbers; if it does, we'll see a "<next>...</next>" element in the response. + */ + public ArrayList<IndexUpdateRecord> getRecords( String from, int atLeast, boolean test ) throws IOException { ArrayList<IndexUpdateRecord> ret = new ArrayList<IndexUpdateRecord>(); - read(new URL(urlbase+"&verb=ListRecords&metadataPrefix=mediawiki&from="+from)); - ret.addAll(collector.getRecords()); - if(ret.size() < atLeast && hasMore()) - ret.addAll( getMoreRecords(atLeast - ret.size()) ); - + String s = urlbase + "&verb=ListRecords&metadataPrefix=mediawiki&from=" + from; + if ( test ) { + s += "&next=-1"; + } + read( new URL( s ) ); + ret.addAll( collector.getRecords() ); + if ( ret.size() < atLeast && hasMore() ) { + ret.addAll( getMoreRecords( atLeast - ret.size() ) ); + } return ret; - } - + } // getRecords + + public ArrayList<IndexUpdateRecord> getRecords( String from, int atLeast ) throws IOException { + return getRecords( from, atLeast, false ); + } // getRecords + /** Get single record */ public ArrayList<IndexUpdateRecord> getRecord(String key) throws IOException { // sample key: oai:localhost:wikilucene:25139 String id = "oai:"+host+":"+iid.getDBname()+":"+key; read(new URL(urlbase+"&verb=GetRecord&metadataPrefix=mediawiki&identifier="+id)); return collector.getRecords(); - } - - protected void read(URL url) throws IOException { + } // getRecord + + protected void read( URL url ) throws IOException { log.info("Reading records from "+url); // try reading from url a number of times before giving up for(int tryNum = 1; tryNum <= this.retries; tryNum++){ @@ -70,12 +110,21 @@ // set some timeouts urlConn.setReadTimeout(60 * 1000); // 60 seconds urlConn.setConnectTimeout(60 * 1000); // 60 seconds - InputStream in = new BufferedInputStream(urlConn.getInputStream()); + InputStream in; + if ( ! DBG ) { + in = new BufferedInputStream(urlConn.getInputStream()); + } else { + in = toMem( urlConn.getInputStream() ); + } + parser = new OAIParser(in,collector); parser.parse(); resumptionToken = parser.getResumptionToken(); - responseDate = parser.getResponseDate(); + responseDate = parser.getResponseDate(); + sequence = parser.getSequence(); in.close(); + log.trace( String.format( "resumptionToken = %s, responseDate = %s, sequence = %s", + resumptionToken, responseDate, sequence ) ); break; } catch(IOException e){ if(tryNum == this.retries) @@ -84,27 +133,61 @@ log.warn("Error reading from url (will retry): "+url); } } - } + } // read /** Invoke ListRecords using the last resumption token, get atLeast num of records */ - public ArrayList<IndexUpdateRecord> getMoreRecords(int atLeast){ + public ArrayList<IndexUpdateRecord> getMoreRecords( int atLeast ) { ArrayList<IndexUpdateRecord> ret = new ArrayList<IndexUpdateRecord>(); try{ - do{ - read(new URL(urlbase+"&verb=ListRecords&metadataPrefix=mediawiki&resumptionToken="+resumptionToken)); - ret.addAll(collector.getRecords()); - } while(hasMore() && ret.size() < atLeast); - } catch(IOException e){ - log.warn("I/O exception listing records: "+e.getMessage(),e); + do { + String s = urlbase + "&verb=ListRecords&metadataPrefix=mediawiki&"; + s += hasNext() ? "next=" + getSequence() : "resumptionToken=" + resumptionToken; + read( new URL( s ) ); + ret.addAll( collector.getRecords() ); + } while ( hasMore() && ret.size() < atLeast ); + } catch ( IOException e ) { + log.warn( "I/O exception listing records: " + e.getMessage(), e ); return null; } return ret; - } + } // getMoreRecords - public boolean hasMore(){ - return !resumptionToken.equals(""); + /** Invoke ListRecords using the next sequence number, get atLeast num of records */ + public ArrayList<IndexUpdateRecord> getRecordsSeq( String seq, int atLeast ) { + ArrayList<IndexUpdateRecord> ret = new ArrayList<IndexUpdateRecord>(); + try{ + do { + read( new URL( urlbase + "&verb=ListRecords&metadataPrefix=mediawiki&next=" + seq ) ); + ret.addAll( collector.getRecords() ); + } while( hasMore() && ret.size() < atLeast ); + } catch( IOException e ){ + log.warn( "I/O exception listing records: " + e.getMessage(), e ); + return null; + } + return ret; + } // getRecordsSeq + + public boolean hasMore() { + if ( hasNext() ) { + return sequence.endsWith( "+" ); + } + + // we didn't get a sequence number so server must be timestamp-based + // so we expect resumptionToken to be non-null here + // + return ! "".equals( resumptionToken ); } - + + public boolean hasNext() { + return null != sequence; + } + + public String getSequence(){ // should come from parsing response + return sequence.endsWith( "+" ) + ? sequence.substring( 0, sequence.length() - 1 ) + : sequence; + } + public String getResponseDate(){ return responseDate; } diff --git a/src/org/wikimedia/lsearch/oai/OAIParser.java b/src/org/wikimedia/lsearch/oai/OAIParser.java index b228ef8..ed8954d 100755 --- a/src/org/wikimedia/lsearch/oai/OAIParser.java +++ b/src/org/wikimedia/lsearch/oai/OAIParser.java @@ -35,27 +35,39 @@ protected IndexUpdatesCollector collector; /** parsing state */ protected boolean inRecord, inHeader, inMetadata, inResponseDate; - protected boolean inDump, inIdentifier, inResumptionToken; - protected String oaiId,pageId,resumptionToken,responseDate; - protected boolean beginMW; // beginning of mediawiki stream - protected String mwUri, mwLocalName, mwQName; + protected boolean inDump, inIdentifier, inResumptionToken, inSequence; + protected String oaiId,pageId, resumptionToken, responseDate, sequence; + protected boolean beginMW; // beginning of mediawiki stream + protected String mwUri, mwLocalName, mwQName; protected boolean isDeleted, inReferences, inRedirect, inRedirectTitle, inRedirectRef; - protected String references, redirectTitle, redirectRef; + protected String references, redirectTitle, redirectRef; public OAIParser(InputStream in, IndexUpdatesCollector collector){ dumpReader = new XmlDumpReader(null,collector); this.in = in; this.collector = collector; - inDump = false; inIdentifier = false; inResumptionToken = false; - inRecord = false; inHeader = false; inMetadata = false; - inResponseDate = false; inReferences = false; - oaiId = ""; resumptionToken = ""; responseDate = ""; - beginMW = true; references = ""; - inRedirect = false; inRedirectTitle= false; inRedirectRef = false; - redirectTitle = ""; redirectRef = ""; + inDump = false; + inIdentifier = false; + inResumptionToken = false; + inSequence = false; + inRecord = false; + inHeader = false; + inMetadata = false; + inResponseDate = false; + inReferences = false; + inRedirect = false; + inRedirectTitle = false; + inRedirectRef = false; + oaiId = ""; + resumptionToken = ""; + responseDate = ""; + beginMW = true; + references = ""; + redirectTitle = ""; + redirectRef = ""; } - + public void parse() throws IOException{ try { SAXParserFactory factory = SAXParserFactory.newInstance(); @@ -111,6 +123,9 @@ } else if(qName.equals("resumptionToken")){ resumptionToken = ""; inResumptionToken = true; + } else if(qName.equals("next")){ + sequence = ""; + inSequence = true; } else if(qName.equals("responseDate")){ responseDate = ""; inResponseDate = true; @@ -161,6 +176,8 @@ inIdentifier = false; } else if(qName.equals("resumptionToken")) inResumptionToken = false; + else if(qName.equals("next")) + inSequence = false; else if(qName.equals("responseDate")) inResponseDate = false; } @@ -174,6 +191,8 @@ dumpReader.characters(ch,start,length); } else if(inResumptionToken){ resumptionToken += new String(ch,start,length); + } else if(inSequence){ + sequence += new String(ch,start,length); } else if(inResponseDate){ responseDate += new String(ch,start,length); } else if(inReferences){ @@ -196,6 +215,10 @@ return resumptionToken; } + public String getSequence() { + return sequence; + } + public IndexUpdatesCollector getCollector() { return collector; } @@ -203,7 +226,4 @@ public String getResponseDate() { return responseDate; } - - - } -- To view, visit https://gerrit.wikimedia.org/r/53299 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: merged Gerrit-Change-Id: Ia8d74f82ecf7d4c5c1b612a39fbcef99bcd10334 Gerrit-PatchSet: 5 Gerrit-Project: operations/debs/lucene-search-2 Gerrit-Branch: master Gerrit-Owner: Ram <r...@wikimedia.org> Gerrit-Reviewer: Demon <ch...@wikimedia.org> Gerrit-Reviewer: Ram <r...@wikimedia.org> Gerrit-Reviewer: Tim Starling <tstarl...@wikimedia.org> Gerrit-Reviewer: jenkins-bot _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits