Ram has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/53299


Change subject: Bug: 45266 Use sequence numbers instead of timestamps
......................................................................

Bug: 45266 Use sequence numbers instead of timestamps

This is a revised version of the earlier fix and should be compatible
with old clients and servers. Needs parallel changes to OAI extension to
remedy the issue of losing updates.

Bug: 45266
Change-Id: Ia8d74f82ecf7d4c5c1b612a39fbcef99bcd10334
---
M src/org/wikimedia/lsearch/oai/IncrementalUpdater.java
M src/org/wikimedia/lsearch/oai/OAIHarvester.java
M src/org/wikimedia/lsearch/oai/OAIParser.java
3 files changed, 206 insertions(+), 46 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/operations/debs/lucene-search-2 
refs/changes/99/53299/1

diff --git a/src/org/wikimedia/lsearch/oai/IncrementalUpdater.java 
b/src/org/wikimedia/lsearch/oai/IncrementalUpdater.java
index 6028fa2..4fcb352 100755
--- a/src/org/wikimedia/lsearch/oai/IncrementalUpdater.java
+++ b/src/org/wikimedia/lsearch/oai/IncrementalUpdater.java
@@ -45,10 +45,12 @@
  *
  */
 public class IncrementalUpdater {
+       public enum ServerType { TIMESTAMP, SEQUENCE };
+
        static Logger log = Logger.getLogger(IncrementalUpdater.class);
        protected static int maxQueueSize = 500;
        protected static int bufferDocs = 50;
-       
+
        static public class OAIAuthenticator extends Authenticator {
                protected String username,password;
                
@@ -85,6 +87,13 @@
         * @param args
         */
        public static void main(String[] args){
+                // only for debugging: allows use of a different log file from 
the main process
+               for (int i =0; i < args.length; i++) {
+                       if (args[i].equals("-configfile")) {
+                               Configuration.setConfigFile(args[++i]);
+                       }
+               }
+
                // config
                Configuration config = Configuration.open();
                GlobalConfiguration global = GlobalConfiguration.getInstance();
@@ -105,7 +114,15 @@
                boolean requestSnapshot = false;
                String noOptimizationDBlistFile = null;
                HashSet<String> noOptimizationDBs = new HashSet<String>();
-               
+
+
+                // old servers do timestamp-based transfers but that is 
unreliable and
+                // may cause lost updates; new ones use sequence numbers which 
are more
+                // reliable
+                //
+                String seq_first = null;    // first record to fetch
+                ServerType server_type = null;
+
                // args
                for(int i=0; i<args.length; i++){
                        if(args[i].equals("-d"))
@@ -130,14 +147,21 @@
                                requestSnapshot = true;
                        else if(args[i].equals("-nof"))
                                noOptimizationDBlistFile = args[++i];
+                       else if(args[i].equals("-q"))
+                               seq_first = args[++i];
                        else if(args[i].equals("--help"))
                                break;
+                       else if(args[i].equals("-configfile"))
+                            ++i;  // skip argument
                        else if(args[i].startsWith("-")){
                                System.out.println("Unrecognized switch 
"+args[i]);
                                return;
                        } else
                                dbnames.add(args[i]);
                }               
+                if ( null != timestamp && null != seq_first ) {
+                       System.out.println( "Cannot specify both timestamp and 
sequence number" ); return;
+                }
                if(useLocal)
                        dbnames.addAll(global.getMyIndexDBnames());
                dbnames.addAll(readDBList(dblist));
@@ -159,6 +183,7 @@
                        System.out.println("  -ef  - exclude db names listed in 
dblist file");
                        System.out.println("  -sn  - immediately make 
unoptimized snapshot as updates finish ");
                        System.out.println("  -nof - use with -sn to specify a 
file with databases not to be optimized");
+                       System.out.println("  -q   - sequence number to start 
from");
                        return;
                }
                // preload
@@ -171,13 +196,17 @@
                
                maxQueueSize = config.getInt("OAI","maxqueue",500);
                bufferDocs = config.getInt("OAI","bufferdocs",50);
+                log.trace( String.format( "maxQueueSize = %d, bufferDocs = 
%d\n", maxQueueSize, bufferDocs ) );
                firstPass.addAll(dbnames);
                // update
                do{
-                       main_loop: for(String dbname : dbnames){
+                       main_loop:
+                       for(String dbname : dbnames){
                                try{
-                                       if(excludeList.contains(dbname))
+                                       if(excludeList.contains(dbname)) {
+                                               log.trace( String.format( "%s 
in excludeList, skipped\n", dbname ) );
                                                continue;
+                                       }
                                        IndexId iid = IndexId.get(dbname);
                                        OAIHarvester harvester = new 
OAIHarvester(iid,iid.getOAIRepository(),auth);
                                        OAIHarvester harvesterSingle = new 
OAIHarvester(iid,iid.getOAIRepository(),auth);
@@ -193,15 +222,55 @@
                                        } catch (IOException e) {
                                                log.warn("I/O error reading 
status file for "+iid+" at "+iid.getStatusPath()+" : "+e.getMessage(),e);
                                        }                               
-                                       String from;
-                                       if(firstPass.contains(dbname) && 
timestamp!=null)
-                                               from = timestamp;
-                                       else
-                                               from = 
status.getProperty("timestamp",defaultTimestamp);
-                                       log.info("Resuming update of "+iid+" 
from "+from);
-                                       ArrayList<IndexUpdateRecord> records = 
harvester.getRecords(from,bufferDocs);
-                                       if(records.size() == 0)
+
+                                        // fetch next batch of records based 
on sequence number (new scheme) or
+                                        // timestamp (old scheme)
+                                        //
+                                        String from = null, seq_next = null;
+                                       ArrayList<IndexUpdateRecord> records = 
null;
+
+                                       if ( null == server_type ) {    // 
server type not yet known
+                                               if ( firstPass.contains( dbname 
) ) {
+                                                       if ( null != seq_first 
) {
+                                                               server_type = 
ServerType.SEQUENCE;
+                                                               seq_next = 
seq_first;
+                                                       } else if ( null != 
timestamp ) {
+                                                               server_type = 
ServerType.TIMESTAMP;
+                                                               from = 
timestamp;
+                                                       }
+                                               }
+                                               if ( null == seq_next && null 
== from ) {
+                                                       seq_next = 
status.getProperty( "sequence" );
+                                                       if ( null != seq_next ) 
{
+                                                               server_type = 
ServerType.SEQUENCE;
+                                                       } else {
+                                                               // The first 
successful transfer will tell us the server type
+                                                               //server_type = 
ServerType.TIMESTAMP;
+                                                               from = 
status.getProperty( "timestamp", defaultTimestamp );
+                                                       }
+                                               }
+                                       } else if ( ServerType.TIMESTAMP == 
server_type ) {
+                                               from = (firstPass.contains( 
dbname ) && null != timestamp)
+                                                      ? timestamp : 
status.getProperty( "timestamp", defaultTimestamp );
+                                       } else {    // sequence numbers
+                                               seq_next = (firstPass.contains( 
dbname ) && null != seq_first)
+                                                          ? seq_first : 
status.getProperty( "sequence" );
+                                       }
+                                       if ( ServerType.SEQUENCE == server_type 
) {  // working with sequence numbers
+                                               log.info( "Resuming update of 
"+ iid + ", seq = " + seq_next );
+                                               records = 
harvester.getRecordsSeq( seq_next, bufferDocs );
+                                       } else {              // working with 
timestamps
+                                               log.info( "Resuming update of " 
+ iid + ", from = " + from );
+                                               records = harvester.getRecords( 
from, bufferDocs );
+                                       }
+
+                                       if ( records.size() == 0 ) {
+                                               log.trace( String.format( "No 
records\n" ) );
                                                continue;
+                                       }
+                                       if ( null == server_type ) {    // 
first successful transfer
+                                               server_type = 
harvester.hasNext() ? ServerType.SEQUENCE : ServerType.TIMESTAMP;
+                                       }
                                        boolean hasMore = false;
                                        do{
                                                // send to indexer
@@ -209,7 +278,7 @@
                                                try {
                                                        // send main
                                                        printRecords(records);
-                                                       
ensureNotOverladed(messenger,iid);
+                                                       
ensureNotOverloaded(messenger,iid);
                                                        log.info(iid+": Sending 
"+records.size()+" records to indexer");
                                                        HashSet<String> fetch = 
messenger.enqueueFrontend(records.toArray(new IndexUpdateRecord[] 
{}),iid.getIndexHost());
                                                        if(fetch.size()>0){
@@ -220,7 +289,7 @@
                                                                }
                                                                // send 
additional
                                                                
printRecords(additional);
-                                                               
ensureNotOverladed(messenger,iid);
+                                                               
ensureNotOverloaded(messenger,iid);
                                                                log.info(iid+": 
Sending additional "+additional.size()+" records to indexer");
                                                                
messenger.enqueueFrontend(additional.toArray(new IndexUpdateRecord[] 
{}),iid.getIndexHost());
                                                        }
@@ -238,10 +307,12 @@
                                        } while(hasMore);
 
                                        // see if we need to wait for 
notification
+                                        log.trace( String.format( 
"notification = %s\n", notification ) );
                                        if(notification){
                                                RMIMessengerClient messenger = 
new RMIMessengerClient(true);
                                                String host = 
iid.getIndexHost();
                                                boolean req = 
messenger.requestFlushAndNotify(dbname,host);
+                                                log.trace( String.format( "req 
= %s\n", req ) );
                                                if(req){
                                                        log.info("Waiting for 
flush notification for "+dbname);
                                                        Boolean succ = null;
@@ -278,14 +349,20 @@
                                                        continue main_loop;
                                        }
                                        
-                                       // write updated timestamp
-                                       
status.setProperty("timestamp",harvester.getResponseDate());
+                                       // write timestamp and, possibly, 
updated sequence number; timestamp not needed
+                                        // in the new scheme but we save it 
since it may be useful for debugging
+                                        //
+                                        status.setProperty( "timestamp", 
harvester.getResponseDate() );
+                                        if ( ServerType.SEQUENCE == 
server_type ) {
+                                            status.setProperty( "sequence", 
harvester.getSequence() );
+                                        }
                                        try {
                                                if(!statf.exists())
                                                        
statf.getParentFile().mkdirs();
                                                FileOutputStream fileos = new 
FileOutputStream(statf,false);
                                                status.store(fileos,"Last 
incremental update timestamp");
                                                fileos.close();
+                                                log.trace( String.format( 
"stored timestamp/sequence to status file\n" ) );
                                        } catch (IOException e) {
                                                log.warn("I/O error writing 
status file for "+iid+" at "+iid.getStatusPath()+" : "+e.getMessage(),e);
                                        }
@@ -336,7 +413,7 @@
                }
        }
 
-       private static void ensureNotOverladed(RMIMessengerClient messenger, 
IndexId iid) throws InterruptedException{
+       private static void ensureNotOverloaded(RMIMessengerClient messenger, 
IndexId iid) throws InterruptedException{
                // check if indexer is overloaded
                int queueSize = 0;
                do{
@@ -349,4 +426,4 @@
 
        }
        
-}
\ No newline at end of file
+}
diff --git a/src/org/wikimedia/lsearch/oai/OAIHarvester.java 
b/src/org/wikimedia/lsearch/oai/OAIHarvester.java
index 63ac868..f687143 100755
--- a/src/org/wikimedia/lsearch/oai/OAIHarvester.java
+++ b/src/org/wikimedia/lsearch/oai/OAIHarvester.java
@@ -1,8 +1,13 @@
 package org.wikimedia.lsearch.oai;
 
 import java.io.BufferedInputStream;
-import java.io.IOException;
+import java.io.ByteArrayOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.InputStream;
+import java.io.IOException;
+
 import java.net.Authenticator;
 import java.net.MalformedURLException;
 import java.net.URL;
@@ -27,11 +32,35 @@
        protected OAIParser parser;
        protected IndexUpdatesCollector collector;
        protected IndexId iid;
-       protected String resumptionToken, responseDate;
+       protected String resumptionToken, responseDate, sequence;
        protected String host;
        /** number of retries before giving up, useful when there are broken 
servers in the cluster */
        protected int retries = 5;
        
+        // for debugging
+        // save contents of input stream to memory stream and dump to file
+        private static final boolean DBG = true;
+        private static int fnum = 1;
+        public static InputStream toMem( InputStream is ) throws IOException {
+            ByteArrayOutputStream os = new ByteArrayOutputStream();
+            // open new dump file
+            File dir = new File( "/var/tmp" );
+            String
+                pfx = String.format( "oai_%d_", fnum++ ),
+                sfx = ".xml";
+            File dumpfile = File.createTempFile( pfx, sfx, dir );
+            FileOutputStream fos = new FileOutputStream( dumpfile );
+
+            // read bytes
+            int c;
+            while ( -1 != (c = is.read()) ) {
+                os.write( c ); fos.write( c );
+            }
+            fos.close();
+            ByteArrayInputStream bis = new ByteArrayInputStream( 
os.toByteArray() );
+            return bis;
+        }  // toMem
+
        public OAIHarvester(IndexId iid, String url, Authenticator auth) throws 
MalformedURLException{
                this.urlbase = url;
                this.iid = iid;
@@ -70,12 +99,21 @@
                                // set some timeouts
                                urlConn.setReadTimeout(60 * 1000); // 60 seconds
                                urlConn.setConnectTimeout(60 * 1000); // 60 
seconds
-                               InputStream in = new 
BufferedInputStream(urlConn.getInputStream());
+                                InputStream in;
+                                if ( ! DBG ) {
+                                    in = new 
BufferedInputStream(urlConn.getInputStream());
+                                } else {
+                                    in = toMem( urlConn.getInputStream() );
+                                }
+
                                parser = new OAIParser(in,collector);
                                parser.parse();
                                resumptionToken = parser.getResumptionToken();
-                               responseDate = parser.getResponseDate();
+                               responseDate    = parser.getResponseDate();
+                               sequence        = parser.getSequence();
                                in.close();
+                                log.trace( String.format( "resumptionToken = 
%s, responseDate = %s, sequence = %s",
+                                                         resumptionToken, 
responseDate, sequence ) );
                                break;
                        } catch(IOException e){                         
                                if(tryNum == this.retries)
@@ -87,22 +125,47 @@
        }
 
        /** Invoke ListRecords using the last resumption token, get atLeast num 
of records */
-       public ArrayList<IndexUpdateRecord> getMoreRecords(int atLeast){
+       public ArrayList<IndexUpdateRecord> getMoreRecords( int atLeast ) {
                ArrayList<IndexUpdateRecord> ret = new 
ArrayList<IndexUpdateRecord>();
                try{                    
-                       do{
-                               read(new 
URL(urlbase+"&verb=ListRecords&metadataPrefix=mediawiki&resumptionToken="+resumptionToken));
-                               ret.addAll(collector.getRecords());
-                       } while(hasMore() && ret.size() < atLeast);
-               } catch(IOException e){
-                       log.warn("I/O exception listing records: 
"+e.getMessage(),e);
+                       do {
+                               String s = urlbase + 
"&verb=ListRecords&metadataPrefix=mediawiki&";
+                               s = s + (hasNext() ? "next=" + sequence : 
"resumptionToken=" + resumptionToken);
+                               read( new URL( s ) );
+                               ret.addAll( collector.getRecords() );
+                       } while ( hasMore() && ret.size() < atLeast );
+               } catch ( IOException e ) {
+                       log.warn( "I/O exception listing records: " + 
e.getMessage(), e );
                        return null;
                }
                return ret;
        }
        
-       public boolean hasMore(){
-               return !resumptionToken.equals("");
+       /** Invoke ListRecords using the next sequence number, get atLeast num 
of records */
+        public ArrayList<IndexUpdateRecord> getRecordsSeq( String seq, int 
atLeast ) {
+               ArrayList<IndexUpdateRecord> ret = new 
ArrayList<IndexUpdateRecord>();
+               try{                    
+                       do {
+                               read( new URL( urlbase + 
"&verb=ListRecords&metadataPrefix=mediawiki&next=" + seq ) );
+                               ret.addAll( collector.getRecords() );
+                       } while( hasMore() && ret.size() < atLeast );
+               } catch( IOException e ){
+                       log.warn( "I/O exception listing records: " + 
e.getMessage(), e );
+                       return null;
+               }
+               return ret;
+       }
+       
+       public boolean hasMore() {
+               return hasNext() || !resumptionToken.equals("");
+       }
+       
+       public boolean hasNext() {
+               return null != sequence;
+       }
+       
+       public String getSequence(){  // should come from parsing response
+               return sequence;
        }
        
        public String getResponseDate(){
diff --git a/src/org/wikimedia/lsearch/oai/OAIParser.java 
b/src/org/wikimedia/lsearch/oai/OAIParser.java
index b228ef8..1b8bb52 100755
--- a/src/org/wikimedia/lsearch/oai/OAIParser.java
+++ b/src/org/wikimedia/lsearch/oai/OAIParser.java
@@ -35,25 +35,37 @@
        protected IndexUpdatesCollector collector;
        /** parsing state */
        protected boolean inRecord, inHeader, inMetadata, inResponseDate;
-       protected boolean inDump, inIdentifier, inResumptionToken;
-       protected String oaiId,pageId,resumptionToken,responseDate;
-       protected boolean beginMW; // beginning of mediawiki stream
-       protected String mwUri, mwLocalName, mwQName;
+       protected boolean inDump, inIdentifier, inResumptionToken, inSequence;
+       protected String  oaiId,pageId, resumptionToken, responseDate, sequence;
+       protected boolean beginMW;    // beginning of mediawiki stream
+       protected String  mwUri, mwLocalName, mwQName;
        protected boolean isDeleted, inReferences, inRedirect, inRedirectTitle, 
inRedirectRef;
-       protected String references, redirectTitle, redirectRef;
+       protected String  references, redirectTitle, redirectRef;
        
        
        public OAIParser(InputStream in, IndexUpdatesCollector collector){
                dumpReader = new XmlDumpReader(null,collector);
                this.in = in;
                this.collector = collector;
-               inDump = false; inIdentifier = false; inResumptionToken = false;
-               inRecord = false; inHeader = false; inMetadata = false;
-               inResponseDate = false; inReferences = false;
-               oaiId = ""; resumptionToken = ""; responseDate = "";
-               beginMW = true; references = "";
-               inRedirect = false; inRedirectTitle= false; inRedirectRef = 
false;
-               redirectTitle = ""; redirectRef = "";
+               inDump            = false;
+                inIdentifier      = false;
+                inResumptionToken = false;
+                inSequence        = false;
+               inRecord          = false;
+                inHeader          = false;
+                inMetadata        = false;
+               inResponseDate    = false;
+                inReferences      = false;
+               inRedirect        = false;
+                inRedirectTitle   = false;
+                inRedirectRef     = false;
+               oaiId             = "";
+                resumptionToken   = "";
+                responseDate      = "";
+               beginMW           = true;
+                references        = "";
+               redirectTitle     = "";
+                redirectRef       = "";
        }
        
        public void parse() throws IOException{
@@ -111,6 +123,9 @@
                } else if(qName.equals("resumptionToken")){
                        resumptionToken = "";
                        inResumptionToken = true;
+               } else if(qName.equals("next")){
+                       sequence = "";
+                       inSequence = true;
                } else if(qName.equals("responseDate")){
                        responseDate = "";
                        inResponseDate = true;
@@ -161,6 +176,8 @@
                        inIdentifier = false;
                } else if(qName.equals("resumptionToken"))
                        inResumptionToken = false;
+               else if(qName.equals("next"))
+                       inSequence = false;
                else if(qName.equals("responseDate"))
                        inResponseDate = false;
        }
@@ -174,6 +191,8 @@
                        dumpReader.characters(ch,start,length);
                } else if(inResumptionToken){
                        resumptionToken += new String(ch,start,length);
+               } else if(inSequence){
+                       sequence += new String(ch,start,length);
                } else if(inResponseDate){
                        responseDate += new String(ch,start,length);
                } else if(inReferences){
@@ -196,6 +215,10 @@
                return resumptionToken;
        }
 
+       public String getSequence() {
+               return sequence;
+       }
+
        public IndexUpdatesCollector getCollector() {
                return collector;
        }
@@ -203,7 +226,4 @@
        public String getResponseDate() {
                return responseDate;
        }
-       
-       
-       
 }

-- 
To view, visit https://gerrit.wikimedia.org/r/53299
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ia8d74f82ecf7d4c5c1b612a39fbcef99bcd10334
Gerrit-PatchSet: 1
Gerrit-Project: operations/debs/lucene-search-2
Gerrit-Branch: master
Gerrit-Owner: Ram <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to