jenkins-bot has submitted this change and it was merged. ( https://gerrit.wikimedia.org/r/345788 )
Change subject: Add secondary poller which lags behind and tries to collect missed updates. ...................................................................... Add secondary poller which lags behind and tries to collect missed updates. The poller (enabled by -T <seconds> option to Updater) runs as a separate thread and tries to identify missed updates and collect them. The code also stored all seen RC IDs for several hours back to know which updates where seen and which weren't. Currently it's 360000 IDs back, which should be enough for now. Change-Id: I21a77f4d32b903dac4d9b1a842d8af78d6f9c5ed --- M tools/src/main/java/org/wikidata/query/rdf/tool/Update.java M tools/src/main/java/org/wikidata/query/rdf/tool/change/RecentChangesPoller.java A tools/src/main/java/org/wikidata/query/rdf/tool/change/TailingChangesPoller.java M tools/src/test/java/org/wikidata/query/rdf/tool/change/RecentChangesPollerUnitTest.java 4 files changed, 252 insertions(+), 23 deletions(-) Approvals: Smalyshev: Looks good to me, approved jenkins-bot: Verified diff --git a/tools/src/main/java/org/wikidata/query/rdf/tool/Update.java b/tools/src/main/java/org/wikidata/query/rdf/tool/Update.java index 5d13798..de0f2a3 100644 --- a/tools/src/main/java/org/wikidata/query/rdf/tool/Update.java +++ b/tools/src/main/java/org/wikidata/query/rdf/tool/Update.java @@ -93,6 +93,10 @@ @Option(shortName = "V", longName = "verify", description = "Verify updates (may have performance impact)") boolean verify(); + @Option(defaultValue = "0", shortName = "T", longName = "tailPoller", + description = "Use secondary poller with given gap (seconds) to catch up missed updates") + int tailPollerOffset(); + @Option(defaultToNull = true, description = "If specified must be numerical indexes of Item and Property namespaces" + " that defined in Wikibase repository, comma separated.") String entityNamespaces(); @@ -204,7 +208,7 @@ log.info("Found start time in the RDF store: {}", inputDateFormat().format(leftOff)); } } - return new RecentChangesPoller(wikibaseRepository, new Date(startTime), options.batchSize()); + return new RecentChangesPoller(wikibaseRepository, new Date(startTime), options.batchSize(), options.tailPollerOffset()); } /** diff --git a/tools/src/main/java/org/wikidata/query/rdf/tool/change/RecentChangesPoller.java b/tools/src/main/java/org/wikidata/query/rdf/tool/change/RecentChangesPoller.java index 12700e2..6d32367 100644 --- a/tools/src/main/java/org/wikidata/query/rdf/tool/change/RecentChangesPoller.java +++ b/tools/src/main/java/org/wikidata/query/rdf/tool/change/RecentChangesPoller.java @@ -3,11 +3,12 @@ import static org.wikidata.query.rdf.tool.wikibase.WikibaseRepository.inputDateFormat; import java.text.DateFormat; +import java.util.Collections; import java.util.Date; -import java.util.HashSet; import java.util.LinkedHashMap; import java.util.Map; -import java.util.Set; +import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; import org.apache.commons.lang3.time.DateUtils; import org.json.simple.JSONArray; @@ -29,6 +30,12 @@ private static final Logger log = LoggerFactory.getLogger(RecentChangesPoller.class); /** + * How many IDs we keep in seen IDs map. + * Should be enough for 1 hour for speeds up to 100 updates/s. + * If we ever get faster updates, we'd have to bump this. + */ + private static final int MAX_SEEN_IDS = 100 * 60 * 60; + /** * Wikibase repository to poll. */ private final WikibaseRepository wikibase; @@ -41,6 +48,19 @@ */ private final int batchSize; /** + * Set of the IDs we've seen before. + * We have to use map here because LinkedHashMap has removeEldestEntry + * and it does not implement Set. So we're using + * Map with boolean values as Set. + */ + private final Map<Long, Boolean> seenIDs; + /** + * How far back should we tail with secondary tailing poller. + * The value is in milliseconds. + * 0 or negative means no tailing poller; + */ + private final int tailSeconds; + /** * How much to back off for recent fetches, in seconds. */ private static final int BACKOFF_TIME = 10; @@ -48,12 +68,52 @@ * How old should the change be to not apply backoff. * The number is in minutes. */ - private static final int BACKOFF_THRESHOLD = 5; + private static final int BACKOFF_THRESHOLD = 2; - public RecentChangesPoller(WikibaseRepository wikibase, Date firstStartTime, int batchSize) { + /** + * Queue for communicating with tailing updater. + */ + private final Queue<Batch> queue = new ArrayBlockingQueue<>(100); + + /** + * Optional tailing poller. + * This will be instantiated only if we were asked for it (tailSeconds > 0) + * and when the poller catches up enough so that it is beyond the tailing gap + * itself. + * Note that tailing poller runs in different thread. + */ + private TailingChangesPoller tailPoller; + + public RecentChangesPoller(WikibaseRepository wikibase, Date firstStartTime, + int batchSize, Map<Long, Boolean> seenIDs, int tailSeconds) { this.wikibase = wikibase; this.firstStartTime = firstStartTime; this.batchSize = batchSize; + this.seenIDs = seenIDs; + this.tailSeconds = tailSeconds; + } + + public RecentChangesPoller(WikibaseRepository wikibase, Date firstStartTime, int batchSize) { + this(wikibase, firstStartTime, batchSize, createSeenMap(), -1); + } + + public RecentChangesPoller(WikibaseRepository wikibase, Date firstStartTime, int batchSize, int tailSeconds) { + this(wikibase, firstStartTime, batchSize, createSeenMap(), tailSeconds); + } + + /** + * Create map of seen IDs. + * @return + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + private static Map<Long, Boolean> createSeenMap() { + // Create hash map with max size, evicting oldest ID + final Map<Long, Boolean> map = new LinkedHashMap(MAX_SEEN_IDS, .75F, false) { + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() > MAX_SEEN_IDS; + } + }; + return Collections.synchronizedMap(map); } @Override @@ -63,7 +123,40 @@ @Override public Batch nextBatch(Batch lastBatch) throws RetryableException { - return batch(lastBatch.leftOffDate, lastBatch); + Batch newBatch = batch(lastBatch.leftOffDate, lastBatch); + if (tailSeconds > 0) { + // Check if tail poller has something to say. + newBatch = checkTailPoller(newBatch); + } + return newBatch; + } + + /** + * Check if we need to poll something from secondary poller queue. + * @param lastBatch + * @return + */ + private Batch checkTailPoller(Batch lastBatch) { + if (tailSeconds <= 0 || + lastBatch.leftOffDate().before(DateUtils.addSeconds(new Date(), -tailSeconds))) { + // still not caught up, do nothing + return lastBatch; + } + if (tailPoller == null) { + // We don't have poller yet - start it + log.info("Started trailing poller with gap of {} seconds", tailSeconds); + // Create new poller starting back tailSeconds and same IDs map. + final RecentChangesPoller poller = new RecentChangesPoller(wikibase, DateUtils.addSeconds(new Date(), -tailSeconds), batchSize, seenIDs, -1); + tailPoller = new TailingChangesPoller(poller, queue, tailSeconds); + tailPoller.start(); + } else { + final Batch queuedBatch = queue.poll(); + if (queuedBatch != null) { + log.info("Merging {} changes from trailing queue", queuedBatch.changes().size()); + return lastBatch.merge(queuedBatch); + } + } + return lastBatch; } /** @@ -76,13 +169,6 @@ private final Date leftOffDate; /** - * The set of the rcid's this batch seen. - * Note that some IDs may be seen but not processed - * due to duplicates, etc. - */ - private final Set<Long> seenIDs; - - /** * Continue from last request. Can be null. */ private final JSONObject lastContinue; @@ -91,11 +177,9 @@ * A batch that will next continue using the continue parameter. */ private Batch(ImmutableList<Change> changes, long advanced, - String leftOff, Date nextStartTime, JSONObject lastContinue, - Set<Long> seenIDs) { + String leftOff, Date nextStartTime, JSONObject lastContinue) { super(changes, advanced, leftOff); leftOffDate = nextStartTime; - this.seenIDs = seenIDs; this.lastContinue = lastContinue; } @@ -116,11 +200,16 @@ } /** - * Get the list of IDs this batch has seen. + * Merge this batch with another batch. + * @param another * @return */ - public Set<Long> getSeenIDs() { - return seenIDs; + public Batch merge(Batch another) { + final ImmutableList<Change> newChanges = new ImmutableList.Builder<Change>() + .addAll(another.changes()) + .addAll(changes()) + .build(); + return new Batch(newChanges, advanced(), leftOffDate.toString(), leftOffDate, lastContinue); } /** @@ -178,7 +267,6 @@ long nextStartTime = lastNextStartTime.getTime(); JSONArray result = (JSONArray) ((JSONObject) recentChanges.get("query")).get("recentchanges"); DateFormat df = inputDateFormat(); - Set<Long> seenIds = new HashSet<>(); for (Object rco : result) { JSONObject rc = (JSONObject) rco; @@ -192,12 +280,12 @@ log.info("Skipping change with bogus title: {}", rc.get("title").toString()); continue; } - seenIds.add(rcid); - if (lastBatch != null && lastBatch.getSeenIDs().contains(rcid)) { + if (seenIDs.containsKey(rcid)) { // This change was in the last batch log.debug("Skipping repeated change with rcid {}", rcid); continue; } + seenIDs.put(rcid, true); // Looks like we can not rely on changes appearing in order, so we have to take them all and let SPARQL // sort out the dupes. // if (continueChange != null && rcid < continueChange.rcid()) { @@ -245,7 +333,7 @@ // be sure we got the whole second String upTo = inputDateFormat().format(new Date(nextStartTime - 1000)); long advanced = nextStartTime - lastNextStartTime.getTime(); - return new Batch(changes, advanced, upTo, new Date(nextStartTime), nextContinue, seenIds); + return new Batch(changes, advanced, upTo, new Date(nextStartTime), nextContinue); } catch (java.text.ParseException e) { throw new RetryableException("Parse error from api", e); } diff --git a/tools/src/main/java/org/wikidata/query/rdf/tool/change/TailingChangesPoller.java b/tools/src/main/java/org/wikidata/query/rdf/tool/change/TailingChangesPoller.java new file mode 100644 index 0000000..4f17c42 --- /dev/null +++ b/tools/src/main/java/org/wikidata/query/rdf/tool/change/TailingChangesPoller.java @@ -0,0 +1,98 @@ +package org.wikidata.query.rdf.tool.change; + +import java.util.Date; +import java.util.Queue; + +import org.apache.commons.lang3.time.DateUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.wikidata.query.rdf.tool.change.RecentChangesPoller.Batch; +import org.wikidata.query.rdf.tool.exception.RetryableException; + +/** + * Tailing changes poller. + * Polls updates that are certain time behind current time (to give + * the system time to settle old updates) and if it find some, puts them + * on the queue. + * + * The class tries to stay behind the updates and never catch up with the + * current stream. In most cases, it will not produce any updates since + * those are already collected by the main updater, but in some cases it + * might catch update that the main one skipped. + * + */ +public class TailingChangesPoller extends Thread { + + private static final Logger log = LoggerFactory + .getLogger(TailingChangesPoller.class); + + /** + * Poller to use for trailing polling. + */ + private final RecentChangesPoller poller; + /** + * Last batch received from the poller. + */ + private Batch lastBatch; + /** + * How far behind the current time we should keep? + */ + private final int tailSeconds; + /** + * Queue to post the batches in. + */ + private final Queue<Batch> queue; + + public TailingChangesPoller(RecentChangesPoller poller, Queue<Batch> queue, int tailSeconds) { + this.poller = poller; + this.tailSeconds = tailSeconds; + this.queue = queue; + } + + /** + * Is this timestamp old enough? + * @param timestamp + * @return + */ + public boolean isOldEnough(Date timestamp) { + return timestamp.before(DateUtils.addSeconds(new Date(), -tailSeconds)); + } + + @Override + public void run() { + this.setName("TailPoller"); + while (true) { + try { + do { + try { + if (lastBatch == null) { + lastBatch = poller.firstBatch(); + } else { + lastBatch = poller.nextBatch(lastBatch); + } + } catch (RetryableException e) { + log.warn("Retryable error fetching first batch. Retrying.", e); + continue; + } + } while (false); + // Process the batch + if (lastBatch.changes().size() > 0) { + log.info("Caught {} missing updates, adding to the queue", lastBatch.changes().size()); + queue.add(lastBatch); + } + log.info("Tail poll up to {}", lastBatch.leftOffDate()); + if (!isOldEnough(lastBatch.leftOffDate())) { + // we're too far forward, let's sleep for a bit so we are couple + // of seconds behind + long sleepTime = lastBatch.leftOffDate().getTime() - + DateUtils.addSeconds(new Date(), -tailSeconds - 2).getTime(); + log.info("Got too close to the current stream, sleeping for {}...", sleepTime); + Thread.sleep(sleepTime); + } + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + return; + } + } + } +} diff --git a/tools/src/test/java/org/wikidata/query/rdf/tool/change/RecentChangesPollerUnitTest.java b/tools/src/test/java/org/wikidata/query/rdf/tool/change/RecentChangesPollerUnitTest.java index b17c929..80bec94 100644 --- a/tools/src/test/java/org/wikidata/query/rdf/tool/change/RecentChangesPollerUnitTest.java +++ b/tools/src/test/java/org/wikidata/query/rdf/tool/change/RecentChangesPollerUnitTest.java @@ -273,6 +273,45 @@ assertEquals(argument.getValue(), startTime); } + /** + * Backoff overflow check, + * Check that if we're backing off but find no new changes then time is advanced. + * @throws RetryableException + */ + @SuppressWarnings("unchecked") + @Test + public void backoffOverflow() throws RetryableException { + Date startTime = new Date(); + batchSize = 1; + RecentChangesPoller poller = new RecentChangesPoller(repository, startTime, batchSize); + + JSONObject result = new JSONObject(); + JSONObject query = new JSONObject(); + result.put("query", query); + JSONArray recentChanges = new JSONArray(); + query.put("recentchanges", recentChanges); + + String date = WikibaseRepository.inputDateFormat().format(startTime); + JSONObject rc = new JSONObject(); + rc.put("ns", Long.valueOf(0)); + rc.put("title", "Q424242"); + rc.put("timestamp", date); + rc.put("revid", 42L); + rc.put("rcid", 42L); + rc.put("type", "edit"); + recentChanges.add(rc); + + firstBatchReturns(startTime, result); + Batch batch = poller.firstBatch(); + assertThat(batch.changes(), hasSize(1)); + assertEquals(startTime, batch.leftOffDate()); + + batch = poller.nextBatch(batch); + assertThat(batch.changes(), hasSize(0)); + assertThat(startTime, lessThan(batch.leftOffDate())); + assertEquals(DateUtils.addSeconds(startTime, 1), batch.leftOffDate()); + } + @Before public void setupMocks() { repository = mock(WikibaseRepository.class); -- To view, visit https://gerrit.wikimedia.org/r/345788 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: merged Gerrit-Change-Id: I21a77f4d32b903dac4d9b1a842d8af78d6f9c5ed Gerrit-PatchSet: 5 Gerrit-Project: wikidata/query/rdf Gerrit-Branch: master Gerrit-Owner: Smalyshev <smalys...@wikimedia.org> Gerrit-Reviewer: Smalyshev <smalys...@wikimedia.org> Gerrit-Reviewer: jenkins-bot <> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits