Smalyshev has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/360786 )

Change subject: Refactor retry mechanism so that it does not get stuck
......................................................................

Refactor retry mechanism so that it does not get stuck

and uses library components.

Change-Id: I04d67bcc80ffe331f52c94dd1e92bbcf68fd1b78
---
M tools/pom.xml
M tools/src/main/java/org/wikidata/query/rdf/tool/Update.java
M tools/src/main/java/org/wikidata/query/rdf/tool/rdf/RdfRepository.java
3 files changed, 85 insertions(+), 56 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/wikidata/query/rdf 
refs/changes/86/360786/1

diff --git a/tools/pom.xml b/tools/pom.xml
index d02cd39..7efe576 100644
--- a/tools/pom.xml
+++ b/tools/pom.xml
@@ -34,6 +34,11 @@
       <artifactId>logback-core</artifactId>
     </dependency>
     <dependency>
+      <groupId>com.github.rholder</groupId>
+      <artifactId>guava-retrying</artifactId>
+      <version>2.0.0</version>
+    </dependency>
+    <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>
diff --git a/tools/src/main/java/org/wikidata/query/rdf/tool/Update.java 
b/tools/src/main/java/org/wikidata/query/rdf/tool/Update.java
index c711fc0..1aa5d58 100644
--- a/tools/src/main/java/org/wikidata/query/rdf/tool/Update.java
+++ b/tools/src/main/java/org/wikidata/query/rdf/tool/Update.java
@@ -119,23 +119,20 @@
             return;
         }
         WikibaseUris uris = new WikibaseUris(options.wikibaseHost());
-        RdfRepository rdfRepository = new RdfRepository(sparqlUri, uris);
-        Change.Source<? extends Change.Batch> changeSource = 
buildChangeSource(options, rdfRepository,
-                wikibaseRepository);
-        if (changeSource == null) {
-            return;
-        }
-        int threads = options.threadCount();
-        ThreadFactoryBuilder threadFactory = new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("update %s");
-        ExecutorService executor = new ThreadPoolExecutor(threads, threads, 0, 
TimeUnit.SECONDS,
-                new LinkedBlockingQueue<Runnable>(), threadFactory.build());
+        try (RdfRepository rdfRepository = new RdfRepository(sparqlUri, uris)) 
{
+            Change.Source<? extends Change.Batch> changeSource = 
buildChangeSource(options, rdfRepository,
+                    wikibaseRepository);
+            if (changeSource == null) {
+                return;
+            }
+            int threads = options.threadCount();
+            ThreadFactoryBuilder threadFactory = new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("update %s");
+            ExecutorService executor = new ThreadPoolExecutor(threads, 
threads, 0, TimeUnit.SECONDS,
+                    new LinkedBlockingQueue<Runnable>(), 
threadFactory.build());
 
-        Munger munger = mungerFromOptions(options);
-        try {
+            Munger munger = mungerFromOptions(options);
             new Update<>(changeSource, wikibaseRepository, rdfRepository, 
munger, executor,
                 options.pollDelay(), uris, options.verify()).run();
-        } finally {
-            rdfRepository.close();
         }
     }
 
diff --git 
a/tools/src/main/java/org/wikidata/query/rdf/tool/rdf/RdfRepository.java 
b/tools/src/main/java/org/wikidata/query/rdf/tool/rdf/RdfRepository.java
index f675d14..82cda28 100644
--- a/tools/src/main/java/org/wikidata/query/rdf/tool/rdf/RdfRepository.java
+++ b/tools/src/main/java/org/wikidata/query/rdf/tool/rdf/RdfRepository.java
@@ -4,6 +4,7 @@
 import static org.wikidata.query.rdf.tool.FilteredStatements.filtered;
 
 import java.io.ByteArrayInputStream;
+import java.io.Closeable;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URL;
@@ -60,6 +61,13 @@
 import org.wikidata.query.rdf.tool.exception.ContainedException;
 import org.wikidata.query.rdf.tool.exception.FatalException;
 
+import com.github.rholder.retry.Attempt;
+import com.github.rholder.retry.RetryException;
+import com.github.rholder.retry.RetryListener;
+import com.github.rholder.retry.Retryer;
+import com.github.rholder.retry.RetryerBuilder;
+import com.github.rholder.retry.StopStrategies;
+import com.github.rholder.retry.WaitStrategies;
 import com.google.common.base.Charsets;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
@@ -70,7 +78,7 @@
  */
 // TODO fan out complexity
 @SuppressWarnings("checkstyle:classfanoutcomplexity")
-public class RdfRepository {
+public class RdfRepository implements Closeable {
     private static final Logger log = 
LoggerFactory.getLogger(RdfRepository.class);
     /**
      * UTC timezone.
@@ -153,6 +161,11 @@
      */
     private final int timeout;
 
+    /**
+     * Retryer for fetching data from RDF store.
+     */
+    private final Retryer<ContentResponse> retryer;
+
     public RdfRepository(URI uri, WikibaseUris uris) {
         this.uri = uri;
         this.uris = uris;
@@ -168,6 +181,26 @@
         timeout = Integer.parseInt(System.getProperty(TIMEOUT_PROPERTY, "-1"));
         httpClient = new HttpClient(new SslContextFactory(true/* trustAll */));
         setupHttpClient();
+
+        retryer = RetryerBuilder.<ContentResponse>newBuilder()
+                .retryIfExceptionOfType(TimeoutException.class)
+                .retryIfExceptionOfType(ExecutionException.class)
+                .retryIfExceptionOfType(IOException.class)
+                .retryIfRuntimeException()
+                .withWaitStrategy(WaitStrategies.exponentialWait(500, 10, 
TimeUnit.SECONDS))
+                .withStopStrategy(StopStrategies.stopAfterAttempt(maxRetries))
+                .withRetryListener(new RetryListener() {
+                    @Override
+                    public <V> void onRetry(Attempt<V> attempt) {
+                        if (attempt.hasException()) {
+                            log.info("HTTP request failed: {}, attempt {}, 
will {}",
+                                    attempt.getExceptionCause(),
+                                    attempt.getAttemptNumber(),
+                                    attempt.getAttemptNumber() < maxRetries ? 
"retry" : "fail");
+                        }
+                    }
+                })
+                .build();
     }
 
     /**
@@ -197,6 +230,7 @@
      * Close the repository.
      */
     @SuppressWarnings("checkstyle:illegalcatch")
+    @Override
     public void close() {
         try {
             httpClient.stop();
@@ -623,64 +657,57 @@
     }
 
     /**
-     * Execute some raw SPARQL.
-     *
-     * @param type name of the parameter in which to send sparql
-     * @return results string from the server
+     * Create HTTP request.
+     * @param type Request type
+     * @param sparql SPARQL code
+     * @param accept Accept header (can be null)
+     * @return Request object
      */
-    protected <T> T execute(String type, ResponseHandler<T> responseHandler, 
String sparql) {
+    private Request makeRequest(String type, String sparql, String accept) {
         Request post = httpClient.newRequest(uri);
         post.method(HttpMethod.POST);
         if (timeout > 0) {
             post.timeout(timeout, TimeUnit.SECONDS);
         }
         // Note that Blazegraph totally ignores the Accept header for SPARQL
-        // updates like this so the response is just html....
-        if (responseHandler.acceptHeader() != null) {
-            post.header("Accept", responseHandler.acceptHeader());
+        // updates so the response is just html in that case...
+        if (accept != null) {
+            post.header("Accept", accept);
         }
 
-        log.debug("Running SPARQL: {}", sparql);
-        long startQuery = System.currentTimeMillis();
-        // TODO we might want to look into Blazegraph's incremental update
-        // reporting.....
         final Fields fields = new Fields();
         fields.add(type, sparql);
         final FormContentProvider form = new FormContentProvider(fields, 
Charsets.UTF_8);
         post.content(form);
+        return post;
+    }
 
-        int retries = 0;
-        while (true) {
-            try {
-                ContentResponse response = post.send();
+    /**
+     * Execute some raw SPARQL.
+     *
+     * @param type name of the parameter in which to send sparql
+     * @return results string from the server
+     */
+    protected <T> T execute(String type, ResponseHandler<T> responseHandler, 
String sparql) {
+        log.debug("Running SPARQL: {}", sparql);
+        long startQuery = System.currentTimeMillis();
+        // TODO we might want to look into Blazegraph's incremental update
+        // reporting.....
+        final ContentResponse response;
+        try {
+            response = retryer.call(()
+                -> makeRequest(type, sparql, 
responseHandler.acceptHeader()).send());
 
-                if (response.getStatus() != HttpStatus.OK_200) {
-                    throw new ContainedException("Non-200 response from triple 
store:  " + response + " body=\n"
-                            + responseBodyAsString(response));
-                }
-
-                log.debug("Completed in {} ms", System.currentTimeMillis() - 
startQuery);
-                return responseHandler.parse(response);
-            } catch (TimeoutException | ExecutionException | IOException e) {
-                if (retries < maxRetries) {
-                    // Increasing delay, with random 10% variation so threads 
won't all get restarts
-                    // at the same time.
-                    int retryIn = (int)Math.ceil(delay * (retries + 1) * (1 + 
Math.random() * 0.1));
-                    log.info("HTTP request for {} failed: {}, retrying in {} 
ms", type, e, retryIn);
-                    retries++;
-                    try {
-                        Thread.sleep(retryIn);
-                    } catch (InterruptedException e1) {
-                        throw new FatalException("Interrupted", e);
-                    }
-                    continue;
-                }
-                throw new FatalException("Error updating triple store", e);
-            } catch (InterruptedException e) {
-                throw new FatalException("Interrupted updating triple store", 
e);
+            if (response.getStatus() != HttpStatus.OK_200) {
+                throw new ContainedException("Non-200 response from triple 
store:  " + response
+                                + " body=\n" + responseBodyAsString(response));
             }
-        }
 
+            log.debug("Completed in {} ms", System.currentTimeMillis() - 
startQuery);
+            return responseHandler.parse(response);
+        } catch (ExecutionException | RetryException | IOException e) {
+            throw new FatalException("Error updating triple store", e);
+        }
     }
 
     /**

-- 
To view, visit https://gerrit.wikimedia.org/r/360786
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I04d67bcc80ffe331f52c94dd1e92bbcf68fd1b78
Gerrit-PatchSet: 1
Gerrit-Project: wikidata/query/rdf
Gerrit-Branch: master
Gerrit-Owner: Smalyshev <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to