Smalyshev has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/345788 )

Change subject: [WIP] Add secondary poller which lags behind and tries to 
collect missed updates.
......................................................................

[WIP] Add secondary poller which lags behind and tries to collect missed 
updates.

The poller (enabled by -T <seconds> option to Updater) runs as a separate 
thread and
tries to identify missed updates and collect them.

The code also stored all seen RC IDs for several hours back to know which 
updates
where seen and which weren't. Currently it's 360000 IDs back, which should be 
enough for now.

Change-Id: I21a77f4d32b903dac4d9b1a842d8af78d6f9c5ed
---
M gui
M tools/src/main/java/org/wikidata/query/rdf/tool/Update.java
M 
tools/src/main/java/org/wikidata/query/rdf/tool/change/RecentChangesPoller.java
A 
tools/src/main/java/org/wikidata/query/rdf/tool/change/TailingChangesPoller.java
4 files changed, 207 insertions(+), 22 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/wikidata/query/rdf 
refs/changes/88/345788/1

diff --git a/gui b/gui
index f6c57d5..c8be52c 160000
--- a/gui
+++ b/gui
@@ -1 +1 @@
-Subproject commit f6c57d515433de586109f6661bb0c0a258a7ebce
+Subproject commit c8be52c606f5594665956bc80a562252bbe356a9
diff --git a/tools/src/main/java/org/wikidata/query/rdf/tool/Update.java 
b/tools/src/main/java/org/wikidata/query/rdf/tool/Update.java
index b93a952..e8ff2ac 100644
--- a/tools/src/main/java/org/wikidata/query/rdf/tool/Update.java
+++ b/tools/src/main/java/org/wikidata/query/rdf/tool/Update.java
@@ -93,6 +93,10 @@
         @Option(shortName = "V", longName = "verify", description = "Verify 
updates (may have performance impact)")
         boolean verify();
 
+        @Option(defaultValue = "0", shortName = "T", longName = "tailPoller",
+                description = "Use secondary poller with given gap (seconds) 
to catch up missed updates")
+        int tailPollerOffset();
+
         @Option(defaultToNull = true, description = "If specified must be 
numerical indexes of Item and Property namespaces"
                 + " that defined in Wikibase repository, comma separated.")
         String entityNamespaces();
@@ -204,7 +208,7 @@
                 log.info("Found start time in the RDF store: {}", 
inputDateFormat().format(leftOff));
             }
         }
-        return new RecentChangesPoller(wikibaseRepository, new 
Date(startTime), options.batchSize());
+        return new RecentChangesPoller(wikibaseRepository, new 
Date(startTime), options.batchSize(), options.tailPollerOffset());
     }
 
     /**
diff --git 
a/tools/src/main/java/org/wikidata/query/rdf/tool/change/RecentChangesPoller.java
 
b/tools/src/main/java/org/wikidata/query/rdf/tool/change/RecentChangesPoller.java
index 09ca176..619c2af 100644
--- 
a/tools/src/main/java/org/wikidata/query/rdf/tool/change/RecentChangesPoller.java
+++ 
b/tools/src/main/java/org/wikidata/query/rdf/tool/change/RecentChangesPoller.java
@@ -3,12 +3,14 @@
 import static 
org.wikidata.query.rdf.tool.wikibase.WikibaseRepository.inputDateFormat;
 
 import java.text.DateFormat;
+import java.util.Collections;
 import java.util.Date;
-import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.Set;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
 
+import org.apache.commons.lang3.time.DateUtils;
 import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
 import org.slf4j.Logger;
@@ -28,6 +30,12 @@
     private static final Logger log = 
LoggerFactory.getLogger(RecentChangesPoller.class);
 
     /**
+     * How many IDs we keep in seen IDs map.
+     * Should be enough for 1 hour for speeds up to 100 updates/s.
+     * If we ever get faster updates, we'd have to bump this.
+     */
+    private static final int MAX_SEEN_IDS = 100 * 60 * 60;
+    /**
      * Wikibase repository to poll.
      */
     private final WikibaseRepository wikibase;
@@ -39,11 +47,64 @@
      * Size of the batches to poll against wikibase.
      */
     private final int batchSize;
+    /**
+     * Set of the IDs we've seen before.
+     * We have to use map here because LinkedHashMap has removeEldestEntry
+     * and it does not implement Set. So we're using
+     * Map with boolean values as Set.
+     */
+    private final Map<Long, Boolean> seenIDs;
+    /**
+     * How far back should we tail with secondary tailing poller.
+     * The value is in milliseconds.
+     * 0 or negative means no tailing poller;
+     */
+    private final int tailSeconds;
 
-    public RecentChangesPoller(WikibaseRepository wikibase, Date 
firstStartTime, int batchSize) {
+    /**
+     * Queue for communicating with tailing updater.
+     */
+    private final Queue<Batch> queue = new ArrayBlockingQueue<>(100);
+
+    /**
+     * Optional tailing poller.
+     * This will be instantiated only if we were asked for it (tailSeconds > 0)
+     * and when the poller catches up enough so that it is beyond the tailing 
gap
+     * itself.
+     * Note that tailing poller runs in different thread.
+     */
+    private TailingChangesPoller tailPoller;
+
+    public RecentChangesPoller(WikibaseRepository wikibase, Date 
firstStartTime,
+            int batchSize, Map<Long, Boolean> seenIDs, int tailSeconds) {
         this.wikibase = wikibase;
         this.firstStartTime = firstStartTime;
         this.batchSize = batchSize;
+        this.seenIDs = seenIDs;
+        this.tailSeconds = tailSeconds;
+    }
+
+    public RecentChangesPoller(WikibaseRepository wikibase, Date 
firstStartTime, int batchSize) {
+        this(wikibase, firstStartTime, batchSize, createSeenMap(), -1);
+    }
+
+    public RecentChangesPoller(WikibaseRepository wikibase, Date 
firstStartTime, int batchSize, int tailSeconds) {
+        this(wikibase, firstStartTime, batchSize, createSeenMap(), 
tailSeconds);
+    }
+
+    /**
+     * Create map of seen IDs.
+     * @return
+     */
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    private static Map<Long, Boolean> createSeenMap() {
+        // Create hash map with max size, evicting oldest ID
+        final Map<Long, Boolean> map = new LinkedHashMap(MAX_SEEN_IDS, .75F, 
false) {
+            protected boolean removeEldestEntry(Map.Entry eldest) {
+                return size() > MAX_SEEN_IDS;
+            }
+        };
+        return Collections.synchronizedMap(map);
     }
 
     @Override
@@ -53,7 +114,36 @@
 
     @Override
     public Batch nextBatch(Batch lastBatch) throws RetryableException {
-        return batch(lastBatch.leftOffDate, lastBatch);
+        Batch newBatch = batch(lastBatch.leftOffDate, lastBatch);
+        return tailSeconds > 0 ? checkTailPoller(newBatch) : newBatch;
+    }
+
+    /**
+     * Check if we need to poll something from secondary
+     * poller queue.
+     * @param lastBatch
+     * @return
+     */
+    private Batch checkTailPoller(Batch lastBatch) {
+        if (tailSeconds <= 0 ||
+            lastBatch.leftOffDate().after(DateUtils.addSeconds(new Date(), 
-tailSeconds))) {
+            // still not caught up, do nothing
+            return lastBatch;
+        }
+        if (tailPoller == null) {
+            // We don't have poller yet - start it
+            log.info("Started trailing poller with gap of {} seconds", 
tailSeconds);
+            final RecentChangesPoller poller = new 
RecentChangesPoller(wikibase, firstStartTime, batchSize, seenIDs, -1);
+            tailPoller = new TailingChangesPoller(poller, queue, tailSeconds);
+            tailPoller.start();
+        } else {
+            final Batch queuedBatch = queue.poll();
+            if (queuedBatch != null) {
+                log.info("Merging {} changes from trailing queue", 
queuedBatch.changes().size());
+                return lastBatch.merge(queuedBatch);
+            }
+        }
+        return lastBatch;
     }
 
     /**
@@ -66,19 +156,11 @@
         private final Date leftOffDate;
 
         /**
-         * The set of the rcid's this batch seen.
-         * Note that some IDs may be seen but not processed
-         * due to duplicates, etc.
-         */
-        private final Set<Long> seenIDs;
-
-        /**
          * A batch that will next continue using the continue parameter.
          */
-        private Batch(ImmutableList<Change> changes, long advanced, String 
leftOff, Date nextStartTime, Set<Long> seenIDs) {
+        private Batch(ImmutableList<Change> changes, long advanced, String 
leftOff, Date nextStartTime) {
             super(changes, advanced, leftOff);
             leftOffDate = nextStartTime;
-            this.seenIDs = seenIDs;
         }
 
         @Override
@@ -98,11 +180,16 @@
         }
 
         /**
-         * Get the list of IDs this batch has seen.
+         * Merge this batch with another batch.
+         * @param another
          * @return
          */
-        public Set<Long> getSeenIDs() {
-            return seenIDs;
+        public Batch merge(Batch another) {
+            final ImmutableList<Change> newChanges = new 
ImmutableList.Builder<Change>()
+                    .addAll(another.changes())
+                    .addAll(changes())
+                    .build();
+            return new Batch(newChanges, advanced(), leftOffDate.toString(), 
leftOffDate);
         }
     }
 
@@ -122,7 +209,6 @@
             long nextStartTime = lastNextStartTime.getTime();
             JSONArray result = (JSONArray) ((JSONObject) 
recentChanges.get("query")).get("recentchanges");
             DateFormat df = inputDateFormat();
-            Set<Long> seenIds = new HashSet<>();
 
             for (Object rco : result) {
                 JSONObject rc = (JSONObject) rco;
@@ -136,12 +222,12 @@
                     log.info("Skipping change with bogus title:  {}", 
rc.get("title").toString());
                     continue;
                 }
-                seenIds.add(rcid);
-                if (lastBatch != null && 
lastBatch.getSeenIDs().contains(rcid)) {
+                if (seenIDs.containsKey(rcid)) {
                     // This change was in the last batch
                     log.debug("Skipping repeated change with rcid {}", rcid);
                     continue;
                 }
+                seenIDs.put(rcid, true);
 // Looks like we can not rely on changes appearing in order, so we have to 
take them all and let SPARQL
 // sort out the dupes.
 //                if (continueChange != null && rcid < continueChange.rcid()) {
@@ -189,7 +275,7 @@
             // be sure we got the whole second
             String upTo = inputDateFormat().format(new Date(nextStartTime - 
1000));
             long advanced = nextStartTime - lastNextStartTime.getTime();
-            return new Batch(changes, advanced, upTo, new Date(nextStartTime), 
seenIds);
+            return new Batch(changes, advanced, upTo, new Date(nextStartTime));
         } catch (java.text.ParseException e) {
             throw new RetryableException("Parse error from api", e);
         }
diff --git 
a/tools/src/main/java/org/wikidata/query/rdf/tool/change/TailingChangesPoller.java
 
b/tools/src/main/java/org/wikidata/query/rdf/tool/change/TailingChangesPoller.java
new file mode 100644
index 0000000..fa307f3
--- /dev/null
+++ 
b/tools/src/main/java/org/wikidata/query/rdf/tool/change/TailingChangesPoller.java
@@ -0,0 +1,95 @@
+package org.wikidata.query.rdf.tool.change;
+
+import java.util.Date;
+import java.util.Queue;
+
+import org.apache.commons.lang3.time.DateUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wikidata.query.rdf.tool.change.RecentChangesPoller.Batch;
+import org.wikidata.query.rdf.tool.exception.RetryableException;
+
+/**
+ * Tailing changes poller.
+ * Polls updates that are certain time behind current time (to give
+ * the system time to settle old updates) and if it find some, puts them
+ * on the queue.
+ *
+ * The class tries to stay behind the updates and never catch up with the
+ * current stream. In most cases, it will not produce any updates since
+ * those are already collected by the main updater, but in some cases it
+ * might catch update that the main one skipped.
+ *
+ */
+public class TailingChangesPoller extends Thread {
+
+    private static final Logger log = LoggerFactory
+            .getLogger(TailingChangesPoller.class);
+
+    /**
+     * Poller to use for trailing polling.
+     */
+    private final RecentChangesPoller poller;
+    /**
+     * Last batch received from the poller.
+     */
+    private Batch lastBatch;
+    /**
+     * How far behind the current time we should keep?
+     */
+    private final int tailSeconds;
+    /**
+     * Queue to post the batches in.
+     */
+    private final Queue<Batch> queue;
+
+    public TailingChangesPoller(RecentChangesPoller poller, Queue<Batch> 
queue, int tailSeconds) {
+        this.poller = poller;
+        this.tailSeconds = tailSeconds;
+        this.queue = queue;
+    }
+
+    /**
+     * Is this timestamp old enough?
+     * @param timestamp
+     * @return
+     */
+    public boolean isOldEnough(Date timestamp) {
+        return timestamp.before(DateUtils.addSeconds(new Date(), 
-tailSeconds));
+    }
+
+    @Override
+    public void run() {
+        while (true) {
+            try {
+                do {
+                    try {
+                        if (lastBatch == null) {
+                            lastBatch = poller.firstBatch();
+                        } else {
+                            lastBatch = poller.nextBatch(lastBatch);
+                        }
+                    } catch (RetryableException e) {
+                        log.warn("Retryable error fetching first batch.  
Retrying.", e);
+                        continue;
+                    }
+                } while (false);
+                // Process the batch
+                if (lastBatch.changes().size() > 0) {
+                    log.info("Caught {} missing updates, adding to the queue", 
lastBatch.changes().size());
+                    queue.add(lastBatch);
+                }
+                if (!isOldEnough(lastBatch.leftOffDate())) {
+                    // we're too far forward, let's sleep for a bit so we are 
couple
+                    // of seconds behind
+                    log.debug("Got too close to the current stream, 
sleeping...");
+                    Thread.sleep(lastBatch.leftOffDate().getTime() -
+                            DateUtils.addSeconds(new Date(), -tailSeconds - 
2).getTime());
+                }
+            } catch (InterruptedException ex) {
+                Thread.currentThread().interrupt();
+                return;
+            }
+        }
+    }
+}

-- 
To view, visit https://gerrit.wikimedia.org/r/345788
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I21a77f4d32b903dac4d9b1a842d8af78d6f9c5ed
Gerrit-PatchSet: 1
Gerrit-Project: wikidata/query/rdf
Gerrit-Branch: master
Gerrit-Owner: Smalyshev <smalys...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to