Demon has submitted this change and it was merged.
Change subject: Revert "Fix for bug 45266. Needs parallel changes to OAI."
......................................................................
Revert "Fix for bug 45266. Needs parallel changes to OAI."
This reverts commit 24eb3e33ebe867c2a763a44405571ac556426a36
Change-Id: Id2582771cef5520e887551fb3f58424d11e4524c
---
M src/org/wikimedia/lsearch/oai/IncrementalUpdater.java
M src/org/wikimedia/lsearch/oai/OAIHarvester.java
2 files changed, 17 insertions(+), 124 deletions(-)
Approvals:
Demon: Looks good to me, approved
Ram: Looks good to me, but someone else must approve
jenkins-bot: Verified
diff --git a/src/org/wikimedia/lsearch/oai/IncrementalUpdater.java
b/src/org/wikimedia/lsearch/oai/IncrementalUpdater.java
index fc3c508..6028fa2 100755
--- a/src/org/wikimedia/lsearch/oai/IncrementalUpdater.java
+++ b/src/org/wikimedia/lsearch/oai/IncrementalUpdater.java
@@ -85,14 +85,6 @@
* @param args
*/
public static void main(String[] args){
- // only for debugging: allows use of a different log file from
the main process
- for (int i =0; i < args.length; i++) {
- if (args[i].equals("-configfile")) {
- Configuration.setConfigFile(args[++i]);
- break;
- }
- }
-
// config
Configuration config = Configuration.open();
GlobalConfiguration global = GlobalConfiguration.getInstance();
@@ -113,8 +105,7 @@
boolean requestSnapshot = false;
String noOptimizationDBlistFile = null;
HashSet<String> noOptimizationDBs = new HashSet<String>();
- String seq_first = null; // first record to fetch
-
+
// args
for(int i=0; i<args.length; i++){
if(args[i].equals("-d"))
@@ -139,12 +130,8 @@
requestSnapshot = true;
else if(args[i].equals("-nof"))
noOptimizationDBlistFile = args[++i];
- else if(args[i].equals("-q"))
- seq_first = args[++i];
else if(args[i].equals("--help"))
break;
- else if(args[i].equals("-configfile"))
- ++i; // skip argument
else if(args[i].startsWith("-")){
System.out.println("Unrecognized switch
"+args[i]);
return;
@@ -172,7 +159,6 @@
System.out.println(" -ef - exclude db names listed in
dblist file");
System.out.println(" -sn - immediately make
unoptimized snapshot as updates finish ");
System.out.println(" -nof - use with -sn to specify a
file with databases not to be optimized");
- System.out.println(" -q - sequence number to start
from");
return;
}
// preload
@@ -185,16 +171,13 @@
maxQueueSize = config.getInt("OAI","maxqueue",500);
bufferDocs = config.getInt("OAI","bufferdocs",50);
- log.trace( String.format( "maxQueueSize = %d, bufferDocs =
%d\n", maxQueueSize, bufferDocs ) );
firstPass.addAll(dbnames);
// update
do{
main_loop: for(String dbname : dbnames){
try{
- if(excludeList.contains(dbname)) {
- log.trace( String.format( "%s
in excludeList, skipped\n", dbname ) );
+ if(excludeList.contains(dbname))
continue;
- }
IndexId iid = IndexId.get(dbname);
OAIHarvester harvester = new
OAIHarvester(iid,iid.getOAIRepository(),auth);
OAIHarvester harvesterSingle = new
OAIHarvester(iid,iid.getOAIRepository(),auth);
@@ -210,38 +193,15 @@
} catch (IOException e) {
log.warn("I/O error reading
status file for "+iid+" at "+iid.getStatusPath()+" : "+e.getMessage(),e);
}
-
- // fetch next batch of records based on
sequence number (new scheme) or
- // timestamp (new scheme)
- //
- String from = null,
- seq_next = null;
- ArrayList<IndexUpdateRecord> records =
null;
- if ( firstPass.contains( dbname ) ) {
- if ( null != seq_first ) {
- seq_next = seq_first;
- } else if ( null != timestamp )
{
- from = timestamp;
- }
- }
- if ( null == seq_next && null == from )
{
- seq_next = status.getProperty(
"sequence" );
- if ( null == seq_next ) {
- from =
status.getProperty("timestamp",defaultTimestamp);
- }
- }
- if ( null != seq_next ) { // working
with sequence numbers
- log.info( "Resuming update of
"+ iid + ", seq = " + seq_next );
- records =
harvester.getRecordsSeq( seq_next, bufferDocs );
- } else { // working with
timestamps
- log.info( "Resuming update of "
+ iid + ", from = " + from );
- records = harvester.getRecords(
from, bufferDocs );
- }
-
- if(records.size() == 0) {
- log.trace( String.format( "No
records\n" ) );
+ String from;
+ if(firstPass.contains(dbname) &&
timestamp!=null)
+ from = timestamp;
+ else
+ from =
status.getProperty("timestamp",defaultTimestamp);
+ log.info("Resuming update of "+iid+"
from "+from);
+ ArrayList<IndexUpdateRecord> records =
harvester.getRecords(from,bufferDocs);
+ if(records.size() == 0)
continue;
- }
boolean hasMore = false;
do{
// send to indexer
@@ -274,21 +234,14 @@
if(hasMore){
log.info("Fetching more
records...");
records =
harvester.getMoreRecords(bufferDocs);
-
- if(records.size() ==
0) {
- log.trace(
String.format( "Unexpected: hasMore is true but no records\n" ) );
- break;
- }
}
} while(hasMore);
// see if we need to wait for
notification
- log.trace( String.format( "notification
= %s\n", notification ) );
if(notification){
RMIMessengerClient messenger =
new RMIMessengerClient(true);
String host =
iid.getIndexHost();
boolean req =
messenger.requestFlushAndNotify(dbname,host);
- log.trace( String.format( "req
= %s\n", req ) );
if(req){
log.info("Waiting for
flush notification for "+dbname);
Boolean succ = null;
@@ -325,16 +278,14 @@
continue main_loop;
}
- // write updated sequence number
(timestamp not needed)
- status.setProperty( "sequence",
harvester.getSequence() );
- status.setProperty( "timestamp",
harvester.getResponseDate() );
+ // write updated timestamp
+
status.setProperty("timestamp",harvester.getResponseDate());
try {
if(!statf.exists())
statf.getParentFile().mkdirs();
FileOutputStream fileos = new
FileOutputStream(statf,false);
status.store(fileos,"Last
incremental update timestamp");
fileos.close();
- log.trace( String.format(
"stored sequence to status file\n" ) );
} catch (IOException e) {
log.warn("I/O error writing
status file for "+iid+" at "+iid.getStatusPath()+" : "+e.getMessage(),e);
}
@@ -398,4 +349,4 @@
}
-}
+}
\ No newline at end of file
diff --git a/src/org/wikimedia/lsearch/oai/OAIHarvester.java
b/src/org/wikimedia/lsearch/oai/OAIHarvester.java
index 1796adf..63ac868 100755
--- a/src/org/wikimedia/lsearch/oai/OAIHarvester.java
+++ b/src/org/wikimedia/lsearch/oai/OAIHarvester.java
@@ -1,13 +1,8 @@
package org.wikimedia.lsearch.oai;
import java.io.BufferedInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.InputStream;
import java.io.IOException;
-
+import java.io.InputStream;
import java.net.Authenticator;
import java.net.MalformedURLException;
import java.net.URL;
@@ -37,30 +32,6 @@
/** number of retries before giving up, useful when there are broken
servers in the cluster */
protected int retries = 5;
- // for debugging
- // save contents of input stream to memory stream and dump to file
- private static final boolean DBG = false;
- private static int fnum = 1;
- public static InputStream toMem( InputStream is ) throws IOException {
- ByteArrayOutputStream os = new ByteArrayOutputStream();
- // open new dump file
- File dir = new File( "/var/tmp" );
- String
- pfx = String.format( "oai_%d_", fnum++ ),
- sfx = ".xml";
- File dumpfile = File.createTempFile( pfx, sfx, dir );
- FileOutputStream fos = new FileOutputStream( dumpfile );
-
- // read bytes
- int c;
- while ( -1 != (c = is.read()) ) {
- os.write( c ); fos.write( c );
- }
- fos.close();
- ByteArrayInputStream bis = new ByteArrayInputStream(
os.toByteArray() );
- return bis;
- } // toMem
-
public OAIHarvester(IndexId iid, String url, Authenticator auth) throws
MalformedURLException{
this.urlbase = url;
this.iid = iid;
@@ -99,19 +70,12 @@
// set some timeouts
urlConn.setReadTimeout(60 * 1000); // 60 seconds
urlConn.setConnectTimeout(60 * 1000); // 60
seconds
- InputStream in;
- if ( ! DBG ) {
- in = new BufferedInputStream(
urlConn.getInputStream() );
- } else {
- in = toMem( urlConn.getInputStream() );
- }
-
+ InputStream in = new
BufferedInputStream(urlConn.getInputStream());
parser = new OAIParser(in,collector);
parser.parse();
resumptionToken = parser.getResumptionToken();
responseDate = parser.getResponseDate();
in.close();
- log.trace( String.format( "resumptionToken =
%s, responseDate = %s", resumptionToken, responseDate ) );
break;
} catch(IOException e){
if(tryNum == this.retries)
@@ -127,8 +91,7 @@
ArrayList<IndexUpdateRecord> ret = new
ArrayList<IndexUpdateRecord>();
try{
do{
- URL url = new URL( urlbase +
"&verb=ListRecords&metadataPrefix=mediawiki&resumptionToken=" + getSequence() );
- read( url );
+ read(new
URL(urlbase+"&verb=ListRecords&metadataPrefix=mediawiki&resumptionToken="+resumptionToken));
ret.addAll(collector.getRecords());
} while(hasMore() && ret.size() < atLeast);
} catch(IOException e){
@@ -138,29 +101,8 @@
return ret;
}
- /** Invoke ListRecords using the last resumption token, get atLeast num
of records */
- public ArrayList<IndexUpdateRecord> getRecordsSeq( String seq, int
atLeast ) {
- ArrayList<IndexUpdateRecord> ret = new
ArrayList<IndexUpdateRecord>();
- try{
- do{
- read( new URL( urlbase +
"&verb=ListRecords&metadataPrefix=mediawiki&resumptionToken=" + seq ) );
- ret.addAll( collector.getRecords() );
- } while( hasMore() && ret.size() < atLeast );
- } catch( IOException e ){
- log.warn( "I/O exception listing records: " +
e.getMessage(), e );
- return null;
- }
- return ret;
- }
-
public boolean hasMore(){
- return resumptionToken.endsWith(":");
- }
-
- public String getSequence(){
- return resumptionToken.endsWith(":")
- ? resumptionToken.substring( 0,
resumptionToken.length() - 1 )
- : resumptionToken;
+ return !resumptionToken.equals("");
}
public String getResponseDate(){
--
To view, visit https://gerrit.wikimedia.org/r/53385
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: Id2582771cef5520e887551fb3f58424d11e4524c
Gerrit-PatchSet: 1
Gerrit-Project: operations/debs/lucene-search-2
Gerrit-Branch: master
Gerrit-Owner: Demon <[email protected]>
Gerrit-Reviewer: Demon <[email protected]>
Gerrit-Reviewer: Ram <[email protected]>
Gerrit-Reviewer: jenkins-bot
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits