Smalyshev has uploaded a new change for review. ( https://gerrit.wikimedia.org/r/345788 )
Change subject: [WIP] Add secondary poller which lags behind and tries to collect missed updates. ...................................................................... [WIP] Add secondary poller which lags behind and tries to collect missed updates. The poller (enabled by -T <seconds> option to Updater) runs as a separate thread and tries to identify missed updates and collect them. The code also stored all seen RC IDs for several hours back to know which updates where seen and which weren't. Currently it's 360000 IDs back, which should be enough for now. Change-Id: I21a77f4d32b903dac4d9b1a842d8af78d6f9c5ed --- M gui M tools/src/main/java/org/wikidata/query/rdf/tool/Update.java M tools/src/main/java/org/wikidata/query/rdf/tool/change/RecentChangesPoller.java A tools/src/main/java/org/wikidata/query/rdf/tool/change/TailingChangesPoller.java 4 files changed, 207 insertions(+), 22 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/wikidata/query/rdf refs/changes/88/345788/1 diff --git a/gui b/gui index f6c57d5..c8be52c 160000 --- a/gui +++ b/gui @@ -1 +1 @@ -Subproject commit f6c57d515433de586109f6661bb0c0a258a7ebce +Subproject commit c8be52c606f5594665956bc80a562252bbe356a9 diff --git a/tools/src/main/java/org/wikidata/query/rdf/tool/Update.java b/tools/src/main/java/org/wikidata/query/rdf/tool/Update.java index b93a952..e8ff2ac 100644 --- a/tools/src/main/java/org/wikidata/query/rdf/tool/Update.java +++ b/tools/src/main/java/org/wikidata/query/rdf/tool/Update.java @@ -93,6 +93,10 @@ @Option(shortName = "V", longName = "verify", description = "Verify updates (may have performance impact)") boolean verify(); + @Option(defaultValue = "0", shortName = "T", longName = "tailPoller", + description = "Use secondary poller with given gap (seconds) to catch up missed updates") + int tailPollerOffset(); + @Option(defaultToNull = true, description = "If specified must be numerical indexes of Item and Property namespaces" + " that defined in Wikibase repository, comma separated.") String entityNamespaces(); @@ -204,7 +208,7 @@ log.info("Found start time in the RDF store: {}", inputDateFormat().format(leftOff)); } } - return new RecentChangesPoller(wikibaseRepository, new Date(startTime), options.batchSize()); + return new RecentChangesPoller(wikibaseRepository, new Date(startTime), options.batchSize(), options.tailPollerOffset()); } /** diff --git a/tools/src/main/java/org/wikidata/query/rdf/tool/change/RecentChangesPoller.java b/tools/src/main/java/org/wikidata/query/rdf/tool/change/RecentChangesPoller.java index 09ca176..619c2af 100644 --- a/tools/src/main/java/org/wikidata/query/rdf/tool/change/RecentChangesPoller.java +++ b/tools/src/main/java/org/wikidata/query/rdf/tool/change/RecentChangesPoller.java @@ -3,12 +3,14 @@ import static org.wikidata.query.rdf.tool.wikibase.WikibaseRepository.inputDateFormat; import java.text.DateFormat; +import java.util.Collections; import java.util.Date; -import java.util.HashSet; import java.util.LinkedHashMap; import java.util.Map; -import java.util.Set; +import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; +import org.apache.commons.lang3.time.DateUtils; import org.json.simple.JSONArray; import org.json.simple.JSONObject; import org.slf4j.Logger; @@ -28,6 +30,12 @@ private static final Logger log = LoggerFactory.getLogger(RecentChangesPoller.class); /** + * How many IDs we keep in seen IDs map. + * Should be enough for 1 hour for speeds up to 100 updates/s. + * If we ever get faster updates, we'd have to bump this. + */ + private static final int MAX_SEEN_IDS = 100 * 60 * 60; + /** * Wikibase repository to poll. */ private final WikibaseRepository wikibase; @@ -39,11 +47,64 @@ * Size of the batches to poll against wikibase. */ private final int batchSize; + /** + * Set of the IDs we've seen before. + * We have to use map here because LinkedHashMap has removeEldestEntry + * and it does not implement Set. So we're using + * Map with boolean values as Set. + */ + private final Map<Long, Boolean> seenIDs; + /** + * How far back should we tail with secondary tailing poller. + * The value is in milliseconds. + * 0 or negative means no tailing poller; + */ + private final int tailSeconds; - public RecentChangesPoller(WikibaseRepository wikibase, Date firstStartTime, int batchSize) { + /** + * Queue for communicating with tailing updater. + */ + private final Queue<Batch> queue = new ArrayBlockingQueue<>(100); + + /** + * Optional tailing poller. + * This will be instantiated only if we were asked for it (tailSeconds > 0) + * and when the poller catches up enough so that it is beyond the tailing gap + * itself. + * Note that tailing poller runs in different thread. + */ + private TailingChangesPoller tailPoller; + + public RecentChangesPoller(WikibaseRepository wikibase, Date firstStartTime, + int batchSize, Map<Long, Boolean> seenIDs, int tailSeconds) { this.wikibase = wikibase; this.firstStartTime = firstStartTime; this.batchSize = batchSize; + this.seenIDs = seenIDs; + this.tailSeconds = tailSeconds; + } + + public RecentChangesPoller(WikibaseRepository wikibase, Date firstStartTime, int batchSize) { + this(wikibase, firstStartTime, batchSize, createSeenMap(), -1); + } + + public RecentChangesPoller(WikibaseRepository wikibase, Date firstStartTime, int batchSize, int tailSeconds) { + this(wikibase, firstStartTime, batchSize, createSeenMap(), tailSeconds); + } + + /** + * Create map of seen IDs. + * @return + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + private static Map<Long, Boolean> createSeenMap() { + // Create hash map with max size, evicting oldest ID + final Map<Long, Boolean> map = new LinkedHashMap(MAX_SEEN_IDS, .75F, false) { + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() > MAX_SEEN_IDS; + } + }; + return Collections.synchronizedMap(map); } @Override @@ -53,7 +114,36 @@ @Override public Batch nextBatch(Batch lastBatch) throws RetryableException { - return batch(lastBatch.leftOffDate, lastBatch); + Batch newBatch = batch(lastBatch.leftOffDate, lastBatch); + return tailSeconds > 0 ? checkTailPoller(newBatch) : newBatch; + } + + /** + * Check if we need to poll something from secondary + * poller queue. + * @param lastBatch + * @return + */ + private Batch checkTailPoller(Batch lastBatch) { + if (tailSeconds <= 0 || + lastBatch.leftOffDate().after(DateUtils.addSeconds(new Date(), -tailSeconds))) { + // still not caught up, do nothing + return lastBatch; + } + if (tailPoller == null) { + // We don't have poller yet - start it + log.info("Started trailing poller with gap of {} seconds", tailSeconds); + final RecentChangesPoller poller = new RecentChangesPoller(wikibase, firstStartTime, batchSize, seenIDs, -1); + tailPoller = new TailingChangesPoller(poller, queue, tailSeconds); + tailPoller.start(); + } else { + final Batch queuedBatch = queue.poll(); + if (queuedBatch != null) { + log.info("Merging {} changes from trailing queue", queuedBatch.changes().size()); + return lastBatch.merge(queuedBatch); + } + } + return lastBatch; } /** @@ -66,19 +156,11 @@ private final Date leftOffDate; /** - * The set of the rcid's this batch seen. - * Note that some IDs may be seen but not processed - * due to duplicates, etc. - */ - private final Set<Long> seenIDs; - - /** * A batch that will next continue using the continue parameter. */ - private Batch(ImmutableList<Change> changes, long advanced, String leftOff, Date nextStartTime, Set<Long> seenIDs) { + private Batch(ImmutableList<Change> changes, long advanced, String leftOff, Date nextStartTime) { super(changes, advanced, leftOff); leftOffDate = nextStartTime; - this.seenIDs = seenIDs; } @Override @@ -98,11 +180,16 @@ } /** - * Get the list of IDs this batch has seen. + * Merge this batch with another batch. + * @param another * @return */ - public Set<Long> getSeenIDs() { - return seenIDs; + public Batch merge(Batch another) { + final ImmutableList<Change> newChanges = new ImmutableList.Builder<Change>() + .addAll(another.changes()) + .addAll(changes()) + .build(); + return new Batch(newChanges, advanced(), leftOffDate.toString(), leftOffDate); } } @@ -122,7 +209,6 @@ long nextStartTime = lastNextStartTime.getTime(); JSONArray result = (JSONArray) ((JSONObject) recentChanges.get("query")).get("recentchanges"); DateFormat df = inputDateFormat(); - Set<Long> seenIds = new HashSet<>(); for (Object rco : result) { JSONObject rc = (JSONObject) rco; @@ -136,12 +222,12 @@ log.info("Skipping change with bogus title: {}", rc.get("title").toString()); continue; } - seenIds.add(rcid); - if (lastBatch != null && lastBatch.getSeenIDs().contains(rcid)) { + if (seenIDs.containsKey(rcid)) { // This change was in the last batch log.debug("Skipping repeated change with rcid {}", rcid); continue; } + seenIDs.put(rcid, true); // Looks like we can not rely on changes appearing in order, so we have to take them all and let SPARQL // sort out the dupes. // if (continueChange != null && rcid < continueChange.rcid()) { @@ -189,7 +275,7 @@ // be sure we got the whole second String upTo = inputDateFormat().format(new Date(nextStartTime - 1000)); long advanced = nextStartTime - lastNextStartTime.getTime(); - return new Batch(changes, advanced, upTo, new Date(nextStartTime), seenIds); + return new Batch(changes, advanced, upTo, new Date(nextStartTime)); } catch (java.text.ParseException e) { throw new RetryableException("Parse error from api", e); } diff --git a/tools/src/main/java/org/wikidata/query/rdf/tool/change/TailingChangesPoller.java b/tools/src/main/java/org/wikidata/query/rdf/tool/change/TailingChangesPoller.java new file mode 100644 index 0000000..fa307f3 --- /dev/null +++ b/tools/src/main/java/org/wikidata/query/rdf/tool/change/TailingChangesPoller.java @@ -0,0 +1,95 @@ +package org.wikidata.query.rdf.tool.change; + +import java.util.Date; +import java.util.Queue; + +import org.apache.commons.lang3.time.DateUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.wikidata.query.rdf.tool.change.RecentChangesPoller.Batch; +import org.wikidata.query.rdf.tool.exception.RetryableException; + +/** + * Tailing changes poller. + * Polls updates that are certain time behind current time (to give + * the system time to settle old updates) and if it find some, puts them + * on the queue. + * + * The class tries to stay behind the updates and never catch up with the + * current stream. In most cases, it will not produce any updates since + * those are already collected by the main updater, but in some cases it + * might catch update that the main one skipped. + * + */ +public class TailingChangesPoller extends Thread { + + private static final Logger log = LoggerFactory + .getLogger(TailingChangesPoller.class); + + /** + * Poller to use for trailing polling. + */ + private final RecentChangesPoller poller; + /** + * Last batch received from the poller. + */ + private Batch lastBatch; + /** + * How far behind the current time we should keep? + */ + private final int tailSeconds; + /** + * Queue to post the batches in. + */ + private final Queue<Batch> queue; + + public TailingChangesPoller(RecentChangesPoller poller, Queue<Batch> queue, int tailSeconds) { + this.poller = poller; + this.tailSeconds = tailSeconds; + this.queue = queue; + } + + /** + * Is this timestamp old enough? + * @param timestamp + * @return + */ + public boolean isOldEnough(Date timestamp) { + return timestamp.before(DateUtils.addSeconds(new Date(), -tailSeconds)); + } + + @Override + public void run() { + while (true) { + try { + do { + try { + if (lastBatch == null) { + lastBatch = poller.firstBatch(); + } else { + lastBatch = poller.nextBatch(lastBatch); + } + } catch (RetryableException e) { + log.warn("Retryable error fetching first batch. Retrying.", e); + continue; + } + } while (false); + // Process the batch + if (lastBatch.changes().size() > 0) { + log.info("Caught {} missing updates, adding to the queue", lastBatch.changes().size()); + queue.add(lastBatch); + } + if (!isOldEnough(lastBatch.leftOffDate())) { + // we're too far forward, let's sleep for a bit so we are couple + // of seconds behind + log.debug("Got too close to the current stream, sleeping..."); + Thread.sleep(lastBatch.leftOffDate().getTime() - + DateUtils.addSeconds(new Date(), -tailSeconds - 2).getTime()); + } + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + return; + } + } + } +} -- To view, visit https://gerrit.wikimedia.org/r/345788 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I21a77f4d32b903dac4d9b1a842d8af78d6f9c5ed Gerrit-PatchSet: 1 Gerrit-Project: wikidata/query/rdf Gerrit-Branch: master Gerrit-Owner: Smalyshev <smalys...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits