jenkins-bot has submitted this change and it was merged. ( 
https://gerrit.wikimedia.org/r/345788 )

Change subject: Add secondary poller which lags behind and tries to collect 
missed updates.
......................................................................


Add secondary poller which lags behind and tries to collect missed updates.

The poller (enabled by -T <seconds> option to Updater) runs
as a separate thread and tries to identify missed updates and collect them.

The code also stored all seen RC IDs for several hours back to know which
updates where seen and which weren't. Currently it's 360000 IDs back,
which should be enough for now.

Change-Id: I21a77f4d32b903dac4d9b1a842d8af78d6f9c5ed
---
M tools/src/main/java/org/wikidata/query/rdf/tool/Update.java
M 
tools/src/main/java/org/wikidata/query/rdf/tool/change/RecentChangesPoller.java
A 
tools/src/main/java/org/wikidata/query/rdf/tool/change/TailingChangesPoller.java
M 
tools/src/test/java/org/wikidata/query/rdf/tool/change/RecentChangesPollerUnitTest.java
4 files changed, 252 insertions(+), 23 deletions(-)

Approvals:
  Smalyshev: Looks good to me, approved
  jenkins-bot: Verified



diff --git a/tools/src/main/java/org/wikidata/query/rdf/tool/Update.java 
b/tools/src/main/java/org/wikidata/query/rdf/tool/Update.java
index 5d13798..de0f2a3 100644
--- a/tools/src/main/java/org/wikidata/query/rdf/tool/Update.java
+++ b/tools/src/main/java/org/wikidata/query/rdf/tool/Update.java
@@ -93,6 +93,10 @@
         @Option(shortName = "V", longName = "verify", description = "Verify 
updates (may have performance impact)")
         boolean verify();
 
+        @Option(defaultValue = "0", shortName = "T", longName = "tailPoller",
+                description = "Use secondary poller with given gap (seconds) 
to catch up missed updates")
+        int tailPollerOffset();
+
         @Option(defaultToNull = true, description = "If specified must be 
numerical indexes of Item and Property namespaces"
                 + " that defined in Wikibase repository, comma separated.")
         String entityNamespaces();
@@ -204,7 +208,7 @@
                 log.info("Found start time in the RDF store: {}", 
inputDateFormat().format(leftOff));
             }
         }
-        return new RecentChangesPoller(wikibaseRepository, new 
Date(startTime), options.batchSize());
+        return new RecentChangesPoller(wikibaseRepository, new 
Date(startTime), options.batchSize(), options.tailPollerOffset());
     }
 
     /**
diff --git 
a/tools/src/main/java/org/wikidata/query/rdf/tool/change/RecentChangesPoller.java
 
b/tools/src/main/java/org/wikidata/query/rdf/tool/change/RecentChangesPoller.java
index 12700e2..6d32367 100644
--- 
a/tools/src/main/java/org/wikidata/query/rdf/tool/change/RecentChangesPoller.java
+++ 
b/tools/src/main/java/org/wikidata/query/rdf/tool/change/RecentChangesPoller.java
@@ -3,11 +3,12 @@
 import static 
org.wikidata.query.rdf.tool.wikibase.WikibaseRepository.inputDateFormat;
 
 import java.text.DateFormat;
+import java.util.Collections;
 import java.util.Date;
-import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.Set;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
 
 import org.apache.commons.lang3.time.DateUtils;
 import org.json.simple.JSONArray;
@@ -29,6 +30,12 @@
     private static final Logger log = 
LoggerFactory.getLogger(RecentChangesPoller.class);
 
     /**
+     * How many IDs we keep in seen IDs map.
+     * Should be enough for 1 hour for speeds up to 100 updates/s.
+     * If we ever get faster updates, we'd have to bump this.
+     */
+    private static final int MAX_SEEN_IDS = 100 * 60 * 60;
+    /**
      * Wikibase repository to poll.
      */
     private final WikibaseRepository wikibase;
@@ -41,6 +48,19 @@
      */
     private final int batchSize;
     /**
+     * Set of the IDs we've seen before.
+     * We have to use map here because LinkedHashMap has removeEldestEntry
+     * and it does not implement Set. So we're using
+     * Map with boolean values as Set.
+     */
+    private final Map<Long, Boolean> seenIDs;
+    /**
+     * How far back should we tail with secondary tailing poller.
+     * The value is in milliseconds.
+     * 0 or negative means no tailing poller;
+     */
+    private final int tailSeconds;
+    /**
      * How much to back off for recent fetches, in seconds.
      */
     private static final int BACKOFF_TIME = 10;
@@ -48,12 +68,52 @@
      * How old should the change be to not apply backoff.
      * The number is in minutes.
      */
-    private static final int BACKOFF_THRESHOLD = 5;
+    private static final int BACKOFF_THRESHOLD = 2;
 
-    public RecentChangesPoller(WikibaseRepository wikibase, Date 
firstStartTime, int batchSize) {
+    /**
+     * Queue for communicating with tailing updater.
+     */
+    private final Queue<Batch> queue = new ArrayBlockingQueue<>(100);
+
+    /**
+     * Optional tailing poller.
+     * This will be instantiated only if we were asked for it (tailSeconds > 0)
+     * and when the poller catches up enough so that it is beyond the tailing 
gap
+     * itself.
+     * Note that tailing poller runs in different thread.
+     */
+    private TailingChangesPoller tailPoller;
+
+    public RecentChangesPoller(WikibaseRepository wikibase, Date 
firstStartTime,
+            int batchSize, Map<Long, Boolean> seenIDs, int tailSeconds) {
         this.wikibase = wikibase;
         this.firstStartTime = firstStartTime;
         this.batchSize = batchSize;
+        this.seenIDs = seenIDs;
+        this.tailSeconds = tailSeconds;
+    }
+
+    public RecentChangesPoller(WikibaseRepository wikibase, Date 
firstStartTime, int batchSize) {
+        this(wikibase, firstStartTime, batchSize, createSeenMap(), -1);
+    }
+
+    public RecentChangesPoller(WikibaseRepository wikibase, Date 
firstStartTime, int batchSize, int tailSeconds) {
+        this(wikibase, firstStartTime, batchSize, createSeenMap(), 
tailSeconds);
+    }
+
+    /**
+     * Create map of seen IDs.
+     * @return
+     */
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    private static Map<Long, Boolean> createSeenMap() {
+        // Create hash map with max size, evicting oldest ID
+        final Map<Long, Boolean> map = new LinkedHashMap(MAX_SEEN_IDS, .75F, 
false) {
+            protected boolean removeEldestEntry(Map.Entry eldest) {
+                return size() > MAX_SEEN_IDS;
+            }
+        };
+        return Collections.synchronizedMap(map);
     }
 
     @Override
@@ -63,7 +123,40 @@
 
     @Override
     public Batch nextBatch(Batch lastBatch) throws RetryableException {
-        return batch(lastBatch.leftOffDate, lastBatch);
+        Batch newBatch = batch(lastBatch.leftOffDate, lastBatch);
+        if (tailSeconds > 0) {
+            // Check if tail poller has something to say.
+            newBatch = checkTailPoller(newBatch);
+        }
+        return newBatch;
+    }
+
+    /**
+     * Check if we need to poll something from secondary poller queue.
+     * @param lastBatch
+     * @return
+     */
+    private Batch checkTailPoller(Batch lastBatch) {
+        if (tailSeconds <= 0 ||
+            lastBatch.leftOffDate().before(DateUtils.addSeconds(new Date(), 
-tailSeconds))) {
+            // still not caught up, do nothing
+            return lastBatch;
+        }
+        if (tailPoller == null) {
+            // We don't have poller yet - start it
+            log.info("Started trailing poller with gap of {} seconds", 
tailSeconds);
+            // Create new poller starting back tailSeconds and same IDs map.
+            final RecentChangesPoller poller = new 
RecentChangesPoller(wikibase, DateUtils.addSeconds(new Date(), -tailSeconds), 
batchSize, seenIDs, -1);
+            tailPoller = new TailingChangesPoller(poller, queue, tailSeconds);
+            tailPoller.start();
+        } else {
+            final Batch queuedBatch = queue.poll();
+            if (queuedBatch != null) {
+                log.info("Merging {} changes from trailing queue", 
queuedBatch.changes().size());
+                return lastBatch.merge(queuedBatch);
+            }
+        }
+        return lastBatch;
     }
 
     /**
@@ -76,13 +169,6 @@
         private final Date leftOffDate;
 
         /**
-         * The set of the rcid's this batch seen.
-         * Note that some IDs may be seen but not processed
-         * due to duplicates, etc.
-         */
-        private final Set<Long> seenIDs;
-
-        /**
          * Continue from last request. Can be null.
          */
         private final JSONObject lastContinue;
@@ -91,11 +177,9 @@
          * A batch that will next continue using the continue parameter.
          */
         private Batch(ImmutableList<Change> changes, long advanced,
-                String leftOff, Date nextStartTime, JSONObject lastContinue,
-                Set<Long> seenIDs) {
+                String leftOff, Date nextStartTime, JSONObject lastContinue) {
             super(changes, advanced, leftOff);
             leftOffDate = nextStartTime;
-            this.seenIDs = seenIDs;
             this.lastContinue = lastContinue;
         }
 
@@ -116,11 +200,16 @@
         }
 
         /**
-         * Get the list of IDs this batch has seen.
+         * Merge this batch with another batch.
+         * @param another
          * @return
          */
-        public Set<Long> getSeenIDs() {
-            return seenIDs;
+        public Batch merge(Batch another) {
+            final ImmutableList<Change> newChanges = new 
ImmutableList.Builder<Change>()
+                    .addAll(another.changes())
+                    .addAll(changes())
+                    .build();
+            return new Batch(newChanges, advanced(), leftOffDate.toString(), 
leftOffDate, lastContinue);
         }
 
         /**
@@ -178,7 +267,6 @@
             long nextStartTime = lastNextStartTime.getTime();
             JSONArray result = (JSONArray) ((JSONObject) 
recentChanges.get("query")).get("recentchanges");
             DateFormat df = inputDateFormat();
-            Set<Long> seenIds = new HashSet<>();
 
             for (Object rco : result) {
                 JSONObject rc = (JSONObject) rco;
@@ -192,12 +280,12 @@
                     log.info("Skipping change with bogus title:  {}", 
rc.get("title").toString());
                     continue;
                 }
-                seenIds.add(rcid);
-                if (lastBatch != null && 
lastBatch.getSeenIDs().contains(rcid)) {
+                if (seenIDs.containsKey(rcid)) {
                     // This change was in the last batch
                     log.debug("Skipping repeated change with rcid {}", rcid);
                     continue;
                 }
+                seenIDs.put(rcid, true);
 // Looks like we can not rely on changes appearing in order, so we have to 
take them all and let SPARQL
 // sort out the dupes.
 //                if (continueChange != null && rcid < continueChange.rcid()) {
@@ -245,7 +333,7 @@
             // be sure we got the whole second
             String upTo = inputDateFormat().format(new Date(nextStartTime - 
1000));
             long advanced = nextStartTime - lastNextStartTime.getTime();
-            return new Batch(changes, advanced, upTo, new Date(nextStartTime), 
nextContinue, seenIds);
+            return new Batch(changes, advanced, upTo, new Date(nextStartTime), 
nextContinue);
         } catch (java.text.ParseException e) {
             throw new RetryableException("Parse error from api", e);
         }
diff --git 
a/tools/src/main/java/org/wikidata/query/rdf/tool/change/TailingChangesPoller.java
 
b/tools/src/main/java/org/wikidata/query/rdf/tool/change/TailingChangesPoller.java
new file mode 100644
index 0000000..4f17c42
--- /dev/null
+++ 
b/tools/src/main/java/org/wikidata/query/rdf/tool/change/TailingChangesPoller.java
@@ -0,0 +1,98 @@
+package org.wikidata.query.rdf.tool.change;
+
+import java.util.Date;
+import java.util.Queue;
+
+import org.apache.commons.lang3.time.DateUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wikidata.query.rdf.tool.change.RecentChangesPoller.Batch;
+import org.wikidata.query.rdf.tool.exception.RetryableException;
+
+/**
+ * Tailing changes poller.
+ * Polls updates that are certain time behind current time (to give
+ * the system time to settle old updates) and if it find some, puts them
+ * on the queue.
+ *
+ * The class tries to stay behind the updates and never catch up with the
+ * current stream. In most cases, it will not produce any updates since
+ * those are already collected by the main updater, but in some cases it
+ * might catch update that the main one skipped.
+ *
+ */
+public class TailingChangesPoller extends Thread {
+
+    private static final Logger log = LoggerFactory
+            .getLogger(TailingChangesPoller.class);
+
+    /**
+     * Poller to use for trailing polling.
+     */
+    private final RecentChangesPoller poller;
+    /**
+     * Last batch received from the poller.
+     */
+    private Batch lastBatch;
+    /**
+     * How far behind the current time we should keep?
+     */
+    private final int tailSeconds;
+    /**
+     * Queue to post the batches in.
+     */
+    private final Queue<Batch> queue;
+
+    public TailingChangesPoller(RecentChangesPoller poller, Queue<Batch> 
queue, int tailSeconds) {
+        this.poller = poller;
+        this.tailSeconds = tailSeconds;
+        this.queue = queue;
+    }
+
+    /**
+     * Is this timestamp old enough?
+     * @param timestamp
+     * @return
+     */
+    public boolean isOldEnough(Date timestamp) {
+        return timestamp.before(DateUtils.addSeconds(new Date(), 
-tailSeconds));
+    }
+
+    @Override
+    public void run() {
+        this.setName("TailPoller");
+        while (true) {
+            try {
+                do {
+                    try {
+                        if (lastBatch == null) {
+                            lastBatch = poller.firstBatch();
+                        } else {
+                            lastBatch = poller.nextBatch(lastBatch);
+                        }
+                    } catch (RetryableException e) {
+                        log.warn("Retryable error fetching first batch.  
Retrying.", e);
+                        continue;
+                    }
+                } while (false);
+                // Process the batch
+                if (lastBatch.changes().size() > 0) {
+                    log.info("Caught {} missing updates, adding to the queue", 
lastBatch.changes().size());
+                    queue.add(lastBatch);
+                }
+                log.info("Tail poll up to {}", lastBatch.leftOffDate());
+                if (!isOldEnough(lastBatch.leftOffDate())) {
+                    // we're too far forward, let's sleep for a bit so we are 
couple
+                    // of seconds behind
+                    long sleepTime = lastBatch.leftOffDate().getTime() -
+                            DateUtils.addSeconds(new Date(), -tailSeconds - 
2).getTime();
+                    log.info("Got too close to the current stream, sleeping 
for {}...", sleepTime);
+                    Thread.sleep(sleepTime);
+                }
+            } catch (InterruptedException ex) {
+                Thread.currentThread().interrupt();
+                return;
+            }
+        }
+    }
+}
diff --git 
a/tools/src/test/java/org/wikidata/query/rdf/tool/change/RecentChangesPollerUnitTest.java
 
b/tools/src/test/java/org/wikidata/query/rdf/tool/change/RecentChangesPollerUnitTest.java
index b17c929..80bec94 100644
--- 
a/tools/src/test/java/org/wikidata/query/rdf/tool/change/RecentChangesPollerUnitTest.java
+++ 
b/tools/src/test/java/org/wikidata/query/rdf/tool/change/RecentChangesPollerUnitTest.java
@@ -273,6 +273,45 @@
         assertEquals(argument.getValue(), startTime);
     }
 
+    /**
+     * Backoff overflow check,
+     * Check that if we're backing off but find no new changes then time is 
advanced.
+     * @throws RetryableException
+     */
+    @SuppressWarnings("unchecked")
+    @Test
+    public void backoffOverflow() throws RetryableException {
+        Date startTime = new Date();
+        batchSize = 1;
+        RecentChangesPoller poller = new RecentChangesPoller(repository, 
startTime, batchSize);
+
+        JSONObject result = new JSONObject();
+        JSONObject query = new JSONObject();
+        result.put("query", query);
+        JSONArray recentChanges = new JSONArray();
+        query.put("recentchanges", recentChanges);
+
+        String date = WikibaseRepository.inputDateFormat().format(startTime);
+        JSONObject rc = new JSONObject();
+        rc.put("ns", Long.valueOf(0));
+        rc.put("title", "Q424242");
+        rc.put("timestamp", date);
+        rc.put("revid", 42L);
+        rc.put("rcid", 42L);
+        rc.put("type", "edit");
+        recentChanges.add(rc);
+
+        firstBatchReturns(startTime, result);
+        Batch batch = poller.firstBatch();
+        assertThat(batch.changes(), hasSize(1));
+        assertEquals(startTime, batch.leftOffDate());
+
+        batch = poller.nextBatch(batch);
+        assertThat(batch.changes(), hasSize(0));
+        assertThat(startTime, lessThan(batch.leftOffDate()));
+        assertEquals(DateUtils.addSeconds(startTime, 1), batch.leftOffDate());
+    }
+
     @Before
     public void setupMocks() {
         repository = mock(WikibaseRepository.class);

-- 
To view, visit https://gerrit.wikimedia.org/r/345788
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I21a77f4d32b903dac4d9b1a842d8af78d6f9c5ed
Gerrit-PatchSet: 5
Gerrit-Project: wikidata/query/rdf
Gerrit-Branch: master
Gerrit-Owner: Smalyshev <smalys...@wikimedia.org>
Gerrit-Reviewer: Smalyshev <smalys...@wikimedia.org>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to