Smalyshev has uploaded a new change for review. (
https://gerrit.wikimedia.org/r/360786 )
Change subject: Refactor retry mechanism so that it does not get stuck
......................................................................
Refactor retry mechanism so that it does not get stuck
and uses library components.
Change-Id: I04d67bcc80ffe331f52c94dd1e92bbcf68fd1b78
---
M tools/pom.xml
M tools/src/main/java/org/wikidata/query/rdf/tool/Update.java
M tools/src/main/java/org/wikidata/query/rdf/tool/rdf/RdfRepository.java
3 files changed, 85 insertions(+), 56 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/wikidata/query/rdf
refs/changes/86/360786/1
diff --git a/tools/pom.xml b/tools/pom.xml
index d02cd39..7efe576 100644
--- a/tools/pom.xml
+++ b/tools/pom.xml
@@ -34,6 +34,11 @@
<artifactId>logback-core</artifactId>
</dependency>
<dependency>
+ <groupId>com.github.rholder</groupId>
+ <artifactId>guava-retrying</artifactId>
+ <version>2.0.0</version>
+ </dependency>
+ <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
diff --git a/tools/src/main/java/org/wikidata/query/rdf/tool/Update.java
b/tools/src/main/java/org/wikidata/query/rdf/tool/Update.java
index c711fc0..1aa5d58 100644
--- a/tools/src/main/java/org/wikidata/query/rdf/tool/Update.java
+++ b/tools/src/main/java/org/wikidata/query/rdf/tool/Update.java
@@ -119,23 +119,20 @@
return;
}
WikibaseUris uris = new WikibaseUris(options.wikibaseHost());
- RdfRepository rdfRepository = new RdfRepository(sparqlUri, uris);
- Change.Source<? extends Change.Batch> changeSource =
buildChangeSource(options, rdfRepository,
- wikibaseRepository);
- if (changeSource == null) {
- return;
- }
- int threads = options.threadCount();
- ThreadFactoryBuilder threadFactory = new
ThreadFactoryBuilder().setDaemon(true).setNameFormat("update %s");
- ExecutorService executor = new ThreadPoolExecutor(threads, threads, 0,
TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(), threadFactory.build());
+ try (RdfRepository rdfRepository = new RdfRepository(sparqlUri, uris))
{
+ Change.Source<? extends Change.Batch> changeSource =
buildChangeSource(options, rdfRepository,
+ wikibaseRepository);
+ if (changeSource == null) {
+ return;
+ }
+ int threads = options.threadCount();
+ ThreadFactoryBuilder threadFactory = new
ThreadFactoryBuilder().setDaemon(true).setNameFormat("update %s");
+ ExecutorService executor = new ThreadPoolExecutor(threads,
threads, 0, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
threadFactory.build());
- Munger munger = mungerFromOptions(options);
- try {
+ Munger munger = mungerFromOptions(options);
new Update<>(changeSource, wikibaseRepository, rdfRepository,
munger, executor,
options.pollDelay(), uris, options.verify()).run();
- } finally {
- rdfRepository.close();
}
}
diff --git
a/tools/src/main/java/org/wikidata/query/rdf/tool/rdf/RdfRepository.java
b/tools/src/main/java/org/wikidata/query/rdf/tool/rdf/RdfRepository.java
index f675d14..82cda28 100644
--- a/tools/src/main/java/org/wikidata/query/rdf/tool/rdf/RdfRepository.java
+++ b/tools/src/main/java/org/wikidata/query/rdf/tool/rdf/RdfRepository.java
@@ -4,6 +4,7 @@
import static org.wikidata.query.rdf.tool.FilteredStatements.filtered;
import java.io.ByteArrayInputStream;
+import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.net.URL;
@@ -60,6 +61,13 @@
import org.wikidata.query.rdf.tool.exception.ContainedException;
import org.wikidata.query.rdf.tool.exception.FatalException;
+import com.github.rholder.retry.Attempt;
+import com.github.rholder.retry.RetryException;
+import com.github.rholder.retry.RetryListener;
+import com.github.rholder.retry.Retryer;
+import com.github.rholder.retry.RetryerBuilder;
+import com.github.rholder.retry.StopStrategies;
+import com.github.rholder.retry.WaitStrategies;
import com.google.common.base.Charsets;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
@@ -70,7 +78,7 @@
*/
// TODO fan out complexity
@SuppressWarnings("checkstyle:classfanoutcomplexity")
-public class RdfRepository {
+public class RdfRepository implements Closeable {
private static final Logger log =
LoggerFactory.getLogger(RdfRepository.class);
/**
* UTC timezone.
@@ -153,6 +161,11 @@
*/
private final int timeout;
+ /**
+ * Retryer for fetching data from RDF store.
+ */
+ private final Retryer<ContentResponse> retryer;
+
public RdfRepository(URI uri, WikibaseUris uris) {
this.uri = uri;
this.uris = uris;
@@ -168,6 +181,26 @@
timeout = Integer.parseInt(System.getProperty(TIMEOUT_PROPERTY, "-1"));
httpClient = new HttpClient(new SslContextFactory(true/* trustAll */));
setupHttpClient();
+
+ retryer = RetryerBuilder.<ContentResponse>newBuilder()
+ .retryIfExceptionOfType(TimeoutException.class)
+ .retryIfExceptionOfType(ExecutionException.class)
+ .retryIfExceptionOfType(IOException.class)
+ .retryIfRuntimeException()
+ .withWaitStrategy(WaitStrategies.exponentialWait(500, 10,
TimeUnit.SECONDS))
+ .withStopStrategy(StopStrategies.stopAfterAttempt(maxRetries))
+ .withRetryListener(new RetryListener() {
+ @Override
+ public <V> void onRetry(Attempt<V> attempt) {
+ if (attempt.hasException()) {
+ log.info("HTTP request failed: {}, attempt {},
will {}",
+ attempt.getExceptionCause(),
+ attempt.getAttemptNumber(),
+ attempt.getAttemptNumber() < maxRetries ?
"retry" : "fail");
+ }
+ }
+ })
+ .build();
}
/**
@@ -197,6 +230,7 @@
* Close the repository.
*/
@SuppressWarnings("checkstyle:illegalcatch")
+ @Override
public void close() {
try {
httpClient.stop();
@@ -623,64 +657,57 @@
}
/**
- * Execute some raw SPARQL.
- *
- * @param type name of the parameter in which to send sparql
- * @return results string from the server
+ * Create HTTP request.
+ * @param type Request type
+ * @param sparql SPARQL code
+ * @param accept Accept header (can be null)
+ * @return Request object
*/
- protected <T> T execute(String type, ResponseHandler<T> responseHandler,
String sparql) {
+ private Request makeRequest(String type, String sparql, String accept) {
Request post = httpClient.newRequest(uri);
post.method(HttpMethod.POST);
if (timeout > 0) {
post.timeout(timeout, TimeUnit.SECONDS);
}
// Note that Blazegraph totally ignores the Accept header for SPARQL
- // updates like this so the response is just html....
- if (responseHandler.acceptHeader() != null) {
- post.header("Accept", responseHandler.acceptHeader());
+ // updates so the response is just html in that case...
+ if (accept != null) {
+ post.header("Accept", accept);
}
- log.debug("Running SPARQL: {}", sparql);
- long startQuery = System.currentTimeMillis();
- // TODO we might want to look into Blazegraph's incremental update
- // reporting.....
final Fields fields = new Fields();
fields.add(type, sparql);
final FormContentProvider form = new FormContentProvider(fields,
Charsets.UTF_8);
post.content(form);
+ return post;
+ }
- int retries = 0;
- while (true) {
- try {
- ContentResponse response = post.send();
+ /**
+ * Execute some raw SPARQL.
+ *
+ * @param type name of the parameter in which to send sparql
+ * @return results string from the server
+ */
+ protected <T> T execute(String type, ResponseHandler<T> responseHandler,
String sparql) {
+ log.debug("Running SPARQL: {}", sparql);
+ long startQuery = System.currentTimeMillis();
+ // TODO we might want to look into Blazegraph's incremental update
+ // reporting.....
+ final ContentResponse response;
+ try {
+ response = retryer.call(()
+ -> makeRequest(type, sparql,
responseHandler.acceptHeader()).send());
- if (response.getStatus() != HttpStatus.OK_200) {
- throw new ContainedException("Non-200 response from triple
store: " + response + " body=\n"
- + responseBodyAsString(response));
- }
-
- log.debug("Completed in {} ms", System.currentTimeMillis() -
startQuery);
- return responseHandler.parse(response);
- } catch (TimeoutException | ExecutionException | IOException e) {
- if (retries < maxRetries) {
- // Increasing delay, with random 10% variation so threads
won't all get restarts
- // at the same time.
- int retryIn = (int)Math.ceil(delay * (retries + 1) * (1 +
Math.random() * 0.1));
- log.info("HTTP request for {} failed: {}, retrying in {}
ms", type, e, retryIn);
- retries++;
- try {
- Thread.sleep(retryIn);
- } catch (InterruptedException e1) {
- throw new FatalException("Interrupted", e);
- }
- continue;
- }
- throw new FatalException("Error updating triple store", e);
- } catch (InterruptedException e) {
- throw new FatalException("Interrupted updating triple store",
e);
+ if (response.getStatus() != HttpStatus.OK_200) {
+ throw new ContainedException("Non-200 response from triple
store: " + response
+ + " body=\n" + responseBodyAsString(response));
}
- }
+ log.debug("Completed in {} ms", System.currentTimeMillis() -
startQuery);
+ return responseHandler.parse(response);
+ } catch (ExecutionException | RetryException | IOException e) {
+ throw new FatalException("Error updating triple store", e);
+ }
}
/**
--
To view, visit https://gerrit.wikimedia.org/r/360786
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I04d67bcc80ffe331f52c94dd1e92bbcf68fd1b78
Gerrit-PatchSet: 1
Gerrit-Project: wikidata/query/rdf
Gerrit-Branch: master
Gerrit-Owner: Smalyshev <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits