Gehel has uploaded a new change for review. (
https://gerrit.wikimedia.org/r/402753 )
Change subject: RdfRepository uses Apache HC instead of Jetty HTTP client
......................................................................
RdfRepository uses Apache HC instead of Jetty HTTP client
This removes a dependency, and provides a (IMHO) simpler HTTP client
library. This might help our issue with leaked threads in tests.
Change-Id: I2ea367e3f7c77338d487793fe2b9982b8538b017
---
M tools/pom.xml
M tools/src/main/java/org/wikidata/query/rdf/tool/HttpClientUtils.java
M tools/src/main/java/org/wikidata/query/rdf/tool/Update.java
A
tools/src/main/java/org/wikidata/query/rdf/tool/http/OkOnlyResponseHandler.java
M tools/src/main/java/org/wikidata/query/rdf/tool/rdf/RdfRepository.java
M
tools/src/test/java/org/wikidata/query/rdf/tool/AbstractUpdaterIntegrationTestBase.java
M tools/src/test/java/org/wikidata/query/rdf/tool/IOBlastingIntegrationTest.java
M tools/src/test/java/org/wikidata/query/rdf/tool/MungeIntegrationTest.java
M tools/src/test/java/org/wikidata/query/rdf/tool/RdfRepositoryForTesting.java
A
tools/src/test/java/org/wikidata/query/rdf/tool/http/OkOnlyResponseHandlerTest.java
M
tools/src/test/java/org/wikidata/query/rdf/tool/rdf/RdfRepositoryIntegrationTest.java
M
tools/src/test/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepositoryIntegrationTest.java
M
tools/src/test/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepositoryWireIntegrationTest.java
13 files changed, 323 insertions(+), 274 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/wikidata/query/rdf
refs/changes/53/402753/8
diff --git a/tools/pom.xml b/tools/pom.xml
index f76c6b7..d18ff86 100644
--- a/tools/pom.xml
+++ b/tools/pom.xml
@@ -39,6 +39,10 @@
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
@@ -81,18 +85,6 @@
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
- </dependency>
- <dependency>
- <groupId>org.eclipse.jetty</groupId>
- <artifactId>jetty-client</artifactId>
- </dependency>
- <dependency>
- <groupId>org.eclipse.jetty</groupId>
- <artifactId>jetty-http</artifactId>
- </dependency>
- <dependency>
- <groupId>org.eclipse.jetty</groupId>
- <artifactId>jetty-util</artifactId>
</dependency>
<dependency>
<groupId>org.openrdf.sesame</groupId>
diff --git
a/tools/src/main/java/org/wikidata/query/rdf/tool/HttpClientUtils.java
b/tools/src/main/java/org/wikidata/query/rdf/tool/HttpClientUtils.java
index eaec48d..dbb7fe9 100644
--- a/tools/src/main/java/org/wikidata/query/rdf/tool/HttpClientUtils.java
+++ b/tools/src/main/java/org/wikidata/query/rdf/tool/HttpClientUtils.java
@@ -1,11 +1,18 @@
package org.wikidata.query.rdf.tool;
+import static java.lang.Integer.parseInt;
+
import java.io.InterruptedIOException;
import java.net.UnknownHostException;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
import javax.net.ssl.SSLException;
import org.apache.http.HttpEntityEnclosingRequest;
+import org.apache.http.HttpException;
+import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
@@ -16,32 +23,41 @@
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.conn.ConnectTimeoutException;
+import org.apache.http.conn.routing.HttpRoute;
+import org.apache.http.conn.routing.HttpRoutePlanner;
+import org.apache.http.conn.socket.LayeredConnectionSocketFactory;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultServiceUnavailableRetryStrategy;
import org.apache.http.impl.client.HttpClients;
+import org.apache.http.impl.conn.DefaultProxyRoutePlanner;
import org.apache.http.protocol.HttpContext;
+import org.apache.http.ssl.SSLContextBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Utilities for dealing with HttpClient.
*/
+@SuppressWarnings("checkstyle:classfanoutcomplexity") // dependencies are
mostly about HC configuration
public final class HttpClientUtils {
private static final Logger log =
LoggerFactory.getLogger(HttpClientUtils.class);
- /**
- * How many retries allowed on error.
- */
+ /** Configuration name for proxy host. */
+ public static final String HTTP_PROXY = "http.proxyHost";
+
+ /** Configuration name for proxy port. */
+ public static final String HTTP_PROXY_PORT = "http.proxyPort";
+
+ /** How many retries allowed on error. */
public static final int RETRIES = 3;
- /**
- * Retry interval, in ms.
- */
+
+ /** Retry interval, in ms. */
public static final int RETRY_INTERVAL = 500;
- /**
- * Configure request to ignore cookies.
- */
+ /** Configure request to ignore cookies. */
public static void ignoreCookies(HttpRequestBase request) {
RequestConfig noCookiesConfig =
RequestConfig.custom().setCookieSpec(CookieSpecs.IGNORE_COOKIES).build();
request.setConfig(noCookiesConfig);
@@ -51,14 +67,53 @@
// Uncallable utility constructor
}
- public static CloseableHttpClient createHttpClient() {
- return HttpClients.custom()
- .setMaxConnPerRoute(100).setMaxConnTotal(100)
- .setRetryHandler(getRetryHandler(RETRIES))
- .setServiceUnavailableRetryStrategy(getRetryStrategy(RETRIES,
RETRY_INTERVAL))
- .disableCookieManagement()
- .setUserAgent("Wikidata Query Service Updater")
- .build();
+ public static CloseableHttpClient createHttpClient(boolean trustAll) {
+ try {
+ return HttpClients.custom()
+ .setMaxConnPerRoute(100).setMaxConnTotal(100)
+ .setRetryHandler(getRetryHandler(RETRIES))
+
.setServiceUnavailableRetryStrategy(getRetryStrategy(RETRIES, RETRY_INTERVAL))
+ .disableCookieManagement()
+ .setRoutePlanner(routePlanner())
+ .setSSLSocketFactory(socketFactory(trustAll))
+ .setUserAgent("Wikidata Query Service Updater")
+ .build();
+ } catch (KeyStoreException | NoSuchAlgorithmException |
KeyManagementException e) {
+ throw new RuntimeException("Could not create HttpClient", e);
+ }
+ }
+
+ private static LayeredConnectionSocketFactory socketFactory(boolean
trustAll)
+ throws KeyStoreException, NoSuchAlgorithmException,
KeyManagementException {
+ if (!trustAll) return null;
+ SSLContextBuilder builder = new SSLContextBuilder();
+ builder.loadTrustMaterial(null, new TrustSelfSignedStrategy());
+ return new SSLConnectionSocketFactory(builder.build());
+ }
+
+ private static HttpRoutePlanner routePlanner() {
+ String httpProxyProp = System.getProperty(HttpClientUtils.HTTP_PROXY);
+ String httpProxyPortProp =
System.getProperty(HttpClientUtils.HTTP_PROXY_PORT);
+ if (httpProxyProp == null || httpProxyPortProp == null) {
+ return null;
+ }
+
+ HttpHost proxyHost = new HttpHost(httpProxyProp,
parseInt(httpProxyPortProp), "http");
+
+ return new DefaultProxyRoutePlanner(proxyHost) {
+ @Override
+ public HttpRoute determineRoute(
+ final HttpHost host,
+ final HttpRequest request,
+ final HttpContext context) throws HttpException {
+ String hostname = host.getHostName();
+ if (hostname.equals("127.0.0.1") ||
hostname.equalsIgnoreCase("localhost")) {
+ // Return direct route
+ return new HttpRoute(host);
+ }
+ return super.determineRoute(host, request, context);
+ }
+ };
}
/**
diff --git a/tools/src/main/java/org/wikidata/query/rdf/tool/Update.java
b/tools/src/main/java/org/wikidata/query/rdf/tool/Update.java
index 3ed90cd..b171f67 100644
--- a/tools/src/main/java/org/wikidata/query/rdf/tool/Update.java
+++ b/tools/src/main/java/org/wikidata/query/rdf/tool/Update.java
@@ -53,9 +53,9 @@
URI sparqlUri = sparqlUri(options);
WikibaseUris uris = new WikibaseUris(options.wikibaseHost());
- try (CloseableHttpClient httpClient =
HttpClientUtils.createHttpClient();
- RdfRepository rdfRepository = new RdfRepository(sparqlUri, uris)
+ try (CloseableHttpClient httpClient =
HttpClientUtils.createHttpClient(false)
) {
+ RdfRepository rdfRepository = new RdfRepository(sparqlUri, uris,
httpClient);
WikibaseRepository wikibaseRepository = new
WikibaseRepository(getUris(options), httpClient);
Change.Source<? extends Change.Batch> changeSource =
buildChangeSource(options, rdfRepository,
wikibaseRepository);
diff --git
a/tools/src/main/java/org/wikidata/query/rdf/tool/http/OkOnlyResponseHandler.java
b/tools/src/main/java/org/wikidata/query/rdf/tool/http/OkOnlyResponseHandler.java
new file mode 100644
index 0000000..285358e
--- /dev/null
+++
b/tools/src/main/java/org/wikidata/query/rdf/tool/http/OkOnlyResponseHandler.java
@@ -0,0 +1,54 @@
+package org.wikidata.query.rdf.tool.http;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.apache.http.HttpStatus.SC_OK;
+
+import java.io.IOException;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.StatusLine;
+import org.apache.http.client.ResponseHandler;
+import org.apache.http.util.EntityUtils;
+import org.wikidata.query.rdf.tool.exception.ContainedException;
+
+/**
+ * A {@link ResponseHandler} that checks that the response is an HTTP/200.
+ *
+ * Also provides {@link #acceptHeader()} to expose the type of content this
+ * handler can parse.
+ *
+ * @param <T> the type of response parsed
+ */
+public abstract class OkOnlyResponseHandler<T> implements ResponseHandler<T> {
+ /**
+ * The contents of the accept header sent to the rdf repository.
+ */
+ public abstract String acceptHeader();
+
+
+ /**
+ * Handle the response entity and transform it into the actual response
+ * object.
+ */
+ protected abstract T handleEntity(HttpEntity entity) throws IOException;
+
+ /**
+ * Read the entity from the response body and pass it to the entity handler
+ * method if the response was successful (a 200 status code). If no
response
+ * body exists, this returns null. If the response was unsuccessful, throws
+ * a {@link ContainedException}.
+ */
+ @Override
+ public T handleResponse(final HttpResponse response)
+ throws IOException {
+ final StatusLine statusLine = response.getStatusLine();
+ final HttpEntity entity = response.getEntity();
+ if (statusLine.getStatusCode() != SC_OK) {
+ String body = entity == null ? null : EntityUtils.toString(entity,
UTF_8);
+ throw new ContainedException("Non-200 response from triple store:
" + response
+ + " body=\n" + body);
+ }
+ return entity == null ? null : handleEntity(entity);
+ }
+}
diff --git
a/tools/src/main/java/org/wikidata/query/rdf/tool/rdf/RdfRepository.java
b/tools/src/main/java/org/wikidata/query/rdf/tool/rdf/RdfRepository.java
index 8120f95..78f5d47 100644
--- a/tools/src/main/java/org/wikidata/query/rdf/tool/rdf/RdfRepository.java
+++ b/tools/src/main/java/org/wikidata/query/rdf/tool/rdf/RdfRepository.java
@@ -1,9 +1,10 @@
package org.wikidata.query.rdf.tool.rdf;
+import static com.google.common.base.Charsets.UTF_8;
import static com.google.common.collect.Sets.newHashSetWithExpectedSize;
import static com.google.common.io.Resources.getResource;
+import static java.util.Collections.singletonList;
-import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URL;
@@ -16,9 +17,6 @@
import java.util.Locale;
import java.util.Set;
import java.util.TimeZone;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -26,16 +24,12 @@
import javax.xml.datatype.DatatypeFactory;
import javax.xml.datatype.XMLGregorianCalendar;
-import org.eclipse.jetty.client.HttpClient;
-import org.eclipse.jetty.client.HttpProxy;
-import org.eclipse.jetty.client.ProxyConfiguration;
-import org.eclipse.jetty.client.api.ContentResponse;
-import org.eclipse.jetty.client.api.Request;
-import org.eclipse.jetty.client.util.FormContentProvider;
-import org.eclipse.jetty.http.HttpMethod;
-import org.eclipse.jetty.http.HttpStatus;
-import org.eclipse.jetty.util.Fields;
-import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.util.EntityUtils;
import org.openrdf.model.Literal;
import org.openrdf.model.Statement;
import org.openrdf.query.Binding;
@@ -53,8 +47,8 @@
import org.wikidata.query.rdf.common.uri.SchemaDotOrg;
import org.wikidata.query.rdf.common.uri.WikibaseUris;
import org.wikidata.query.rdf.tool.change.Change;
-import org.wikidata.query.rdf.tool.exception.ContainedException;
import org.wikidata.query.rdf.tool.exception.FatalException;
+import org.wikidata.query.rdf.tool.http.OkOnlyResponseHandler;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -62,14 +56,6 @@
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.github.rholder.retry.Attempt;
-import com.github.rholder.retry.RetryException;
-import com.github.rholder.retry.RetryListener;
-import com.github.rholder.retry.Retryer;
-import com.github.rholder.retry.RetryerBuilder;
-import com.github.rholder.retry.StopStrategies;
-import com.github.rholder.retry.WaitStrategies;
-import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.io.Resources;
@@ -81,7 +67,7 @@
// TODO fan out complexity
@SuppressWarnings("checkstyle:classfanoutcomplexity")
@SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification =
"spotbug limitation: https://github.com/spotbugs/spotbugs/issues/463")
-public class RdfRepository implements AutoCloseable {
+public class RdfRepository {
private static final Logger log =
LoggerFactory.getLogger(RdfRepository.class);
/**
* UTC timezone.
@@ -136,26 +122,6 @@
private final String verify;
/**
- * How many times we retry a failed HTTP call.
- */
- private int maxRetries = 6;
- /**
- * How long to delay after failing first HTTP call, in milliseconds.
- * Next retries would be slower exponentially by 2x until maxRetries is
exhausted.
- * Note that the first retry is 2x delay due to the way Retryer is
implemented.
- */
- private int delay = 1000;
-
- /**
- * Configuration name for proxy host.
- */
- private static final String HTTP_PROXY = "http.proxyHost";
- /**
- * Configuration name for proxy port.
- */
- private static final String HTTP_PROXY_PORT = "http.proxyPort";
-
- /**
* Request timeout property.
*/
public static final String TIMEOUT_PROPERTY = RdfRepository.class +
".timeout";
@@ -165,12 +131,7 @@
*/
private final int timeout;
- /**
- * Retryer for fetching data from RDF store.
- */
- private final Retryer<ContentResponse> retryer;
-
- public RdfRepository(URI uri, WikibaseUris uris) {
+ public RdfRepository(URI uri, WikibaseUris uris, HttpClient httpClient) {
this.uri = uri;
this.uris = uris;
msyncBody = loadBody("multiSync");
@@ -183,96 +144,7 @@
verify = loadBody("verify");
timeout = Integer.parseInt(System.getProperty(TIMEOUT_PROPERTY, "-1"));
- httpClient = new HttpClient(new SslContextFactory(true/* trustAll */));
- setupHttpClient();
-
- retryer = RetryerBuilder.<ContentResponse>newBuilder()
- .retryIfExceptionOfType(TimeoutException.class)
- .retryIfExceptionOfType(ExecutionException.class)
- .retryIfExceptionOfType(IOException.class)
- .retryIfRuntimeException()
- .withWaitStrategy(WaitStrategies.exponentialWait(delay, 10,
TimeUnit.SECONDS))
- .withStopStrategy(StopStrategies.stopAfterAttempt(maxRetries))
- .withRetryListener(new RetryListener() {
- @Override
- public <V> void onRetry(Attempt<V> attempt) {
- if (attempt.hasException()) {
- log.info("HTTP request failed: {}, attempt {},
will {}",
- attempt.getExceptionCause(),
- attempt.getAttemptNumber(),
- attempt.getAttemptNumber() < maxRetries ?
"retry" : "fail");
- }
- }
- })
- .build();
- }
-
- /**
- * Setup HTTP client settings.
- */
- @SuppressWarnings("checkstyle:illegalcatch")
- private void setupHttpClient() {
- if (System.getProperty(HTTP_PROXY) != null
- && System.getProperty(HTTP_PROXY_PORT) != null) {
- final ProxyConfiguration proxyConfig =
httpClient.getProxyConfiguration();
- final HttpProxy proxy = new HttpProxy(
- System.getProperty(HTTP_PROXY),
- Integer.parseInt(System.getProperty(HTTP_PROXY_PORT)));
- proxy.getExcludedAddresses().add("localhost");
- proxy.getExcludedAddresses().add("127.0.0.1");
- proxyConfig.getProxies().add(proxy);
- }
- try {
- httpClient.start();
- // Who would think declaring it as throws Exception is a good idea?
- } catch (Exception e) {
- throw new RuntimeException("Unable to start HttpClient", e);
- }
- }
-
- /**
- * Close the repository.
- * @throws Exception
- */
- @Override
- public void close() throws Exception {
- httpClient.stop();
- }
-
- /**
- * Get max retries count.
- * @return How many times we retry a failed HTTP call.
- */
- public int getMaxRetries() {
- return maxRetries;
- }
-
- /**
- * Set how many times we retry a failed HTTP call.
- * @return this
- */
- public RdfRepository setMaxRetries(int maxRetries) {
- this.maxRetries = maxRetries;
- return this;
- }
-
- /**
- * Get retry delay.
- * @return How long to delay after failing first HTTP call, in
milliseconds.
- */
- public int getDelay() {
- return delay;
- }
-
- /**
- * Set retry delay.
- * Specifies how long to delay after failing first HTTP call, in
milliseconds.
- * Next retries would be slower by 2x, 3x, 4x etc. until maxRetries is
exhausted.
- * @return The repository object
- */
- public RdfRepository setDelay(int delay) {
- this.delay = delay;
- return this;
+ this.httpClient = httpClient;
}
/**
@@ -286,7 +158,7 @@
private static String loadBody(String name) {
URL url = getResource(RdfRepository.class, "RdfRepository." + name +
".sparql");
try {
- return Resources.toString(url, Charsets.UTF_8);
+ return Resources.toString(url, UTF_8);
} catch (IOException e) {
throw new FatalException("Can't load " + url, e);
}
@@ -300,7 +172,7 @@
* @return Collection of strings resulting from the query.
*/
private Set<String> resultToSet(TupleQueryResult result, String binding) {
- HashSet<String> values = new HashSet<String>();
+ HashSet<String> values = new HashSet<>();
try {
while (result.hasNext()) {
Binding value = result.next().getBinding(binding);
@@ -686,22 +558,25 @@
* @param accept Accept header (can be null)
* @return Request object
*/
- private Request makeRequest(String type, String sparql, String accept) {
- Request post = httpClient.newRequest(uri);
- post.method(HttpMethod.POST);
- if (timeout > 0) {
- post.timeout(timeout, TimeUnit.SECONDS);
- }
+ private HttpPost makeRequest(String type, String sparql, String accept)
throws IOException {
+ HttpPost post = new HttpPost(uri);
+
+ // TODO: manage timeouts
+ // if (timeout > 0) {
+ // post.timeout(timeout, TimeUnit.SECONDS);
+ // }
+
// Note that Blazegraph totally ignores the Accept header for SPARQL
// updates so the response is just html in that case...
if (accept != null) {
- post.header("Accept", accept);
+ post.addHeader("Accept", accept);
}
- final Fields fields = new Fields();
- fields.add(type, sparql);
- final FormContentProvider form = new FormContentProvider(fields,
Charsets.UTF_8);
- post.content(form);
+ post.setEntity(new UrlEncodedFormEntity(
+ singletonList(
+ new BasicNameValuePair(type, sparql)),
+ UTF_8));
+
return post;
}
@@ -711,35 +586,19 @@
* @param type name of the parameter in which to send sparql
* @return results string from the server
*/
- protected <T> T execute(String type, ResponseHandler<T> responseHandler,
String sparql) {
+ protected <T> T execute(String type, OkOnlyResponseHandler<T>
responseHandler, String sparql) {
log.debug("Running SPARQL: {}", sparql);
long startQuery = System.currentTimeMillis();
// TODO we might want to look into Blazegraph's incremental update
- // reporting.....
- final ContentResponse response;
try {
- response = retryer.call(()
- -> makeRequest(type, sparql,
responseHandler.acceptHeader()).send());
-
- if (response.getStatus() != HttpStatus.OK_200) {
- throw new ContainedException("Non-200 response from triple
store: " + response
- + " body=\n" + responseBodyAsString(response));
- }
-
- log.debug("Completed in {} ms", System.currentTimeMillis() -
startQuery);
- return responseHandler.parse(response);
- } catch (ExecutionException | RetryException | IOException e) {
+ return httpClient.execute(
+ makeRequest(type, sparql, responseHandler.acceptHeader()),
+ responseHandler);
+ } catch (IOException e) {
throw new FatalException("Error updating triple store", e);
+ } finally {
+ log.debug("Completed in {} ms", System.currentTimeMillis() -
startQuery);
}
- }
-
- /**
- * Fetch the body of the response as a string.
- *
- * @throws IOException if there is an error reading the response
- */
- protected String responseBodyAsString(ContentResponse response) throws
IOException {
- return response.getContentAsString();
}
/**
@@ -769,47 +628,23 @@
}
/**
- * Passed to execute to setup the accept header and parse the response. Its
- * super ultra mega important to parse the response in execute because
- * execute manages closing the http response object. If execute return the
- * input stream after closing the response then everything would
- * <strong>mostly</strong> work but things would blow up with strange
socket
- * closed errors.
- *
- * @param <T> the type of response parsed
- */
- private interface ResponseHandler<T> {
- /**
- * The contents of the accept header sent to the rdf repository.
- */
- String acceptHeader();
-
- /**
- * Parse the response.
- *
- * @throws IOException if there is an error reading the response
- */
- T parse(ContentResponse entity) throws IOException;
- }
-
- /**
* Count and log the number of updates.
*/
- protected static final ResponseHandler<Integer> UPDATE_COUNT_RESPONSE =
new UpdateCountResponse();
+ protected static final OkOnlyResponseHandler<Integer>
UPDATE_COUNT_RESPONSE = new UpdateCountResponse();
/**
* Parse the response from a regular query into a TupleQueryResult.
*/
- protected static final ResponseHandler<TupleQueryResult>
TUPLE_QUERY_RESPONSE = new TupleQueryResponse();
+ protected static final OkOnlyResponseHandler<TupleQueryResult>
TUPLE_QUERY_RESPONSE = new TupleQueryResponse();
/**
* Parse the response from an ask query into a boolean.
*/
- protected static final ResponseHandler<Boolean> ASK_QUERY_RESPONSE = new
AskQueryResponse();
+ protected static final OkOnlyResponseHandler<Boolean> ASK_QUERY_RESPONSE =
new AskQueryResponse();
/**
* Attempts to log update response information but very likely only works
* for Blazegraph.
*/
- protected static class UpdateCountResponse implements
ResponseHandler<Integer> {
+ protected static class UpdateCountResponse extends
OkOnlyResponseHandler<Integer> {
/**
* The pattern for the response for an update.
*/
@@ -843,9 +678,11 @@
@Override
@SuppressFBWarnings(value = "PRMC_POSSIBLY_REDUNDANT_METHOD_CALLS",
justification = "more readable with 2 calls")
- public Integer parse(ContentResponse entity) throws IOException {
+ public Integer handleEntity(HttpEntity entity) throws IOException {
Integer mutationCount = null;
- for (String line : entity.getContentAsString().split("\\r?\\n")) {
+
+ // TODO: we could probably replace this with a buffered reader
+ for (String line : EntityUtils.toString(entity,
UTF_8).split("\\r?\\n")) {
Matcher m;
m = ELAPSED_LINE_FLUSH.matcher(line);
if (m.matches()) {
@@ -891,19 +728,19 @@
/**
* Parses responses to regular queries into TupleQueryResults.
*/
- private static class TupleQueryResponse implements
ResponseHandler<TupleQueryResult> {
+ private static class TupleQueryResponse extends
OkOnlyResponseHandler<TupleQueryResult> {
@Override
public String acceptHeader() {
return "application/x-binary-rdf-results-table";
}
@Override
- public TupleQueryResult parse(ContentResponse entity) throws
IOException {
+ public TupleQueryResult handleEntity(HttpEntity entity) throws
IOException {
BinaryQueryResultParser p = new BinaryQueryResultParser();
TupleQueryResultBuilder collector = new TupleQueryResultBuilder();
p.setQueryResultHandler(collector);
try {
- p.parseQueryResult(new
ByteArrayInputStream(entity.getContent()));
+ p.parseQueryResult(entity.getContent());
} catch (QueryResultParseException | QueryResultHandlerException |
IllegalStateException e) {
throw new RuntimeException("Error parsing query", e);
}
@@ -914,18 +751,22 @@
/**
* Parses responses to ask queries into booleans.
*/
- private static class AskQueryResponse implements ResponseHandler<Boolean> {
+ private static class AskQueryResponse extends
OkOnlyResponseHandler<Boolean> {
@Override
public String acceptHeader() {
return "application/json";
}
+ private final ObjectMapper mapper;
+
+ AskQueryResponse() {
+ mapper = new ObjectMapper();
+
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ }
@Override
- public Boolean parse(ContentResponse entity) throws IOException {
+ public Boolean handleEntity(HttpEntity entity) throws IOException {
try {
- ObjectMapper mapper = new ObjectMapper();
-
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
- return mapper.readValue(entity.getContentAsString(),
Resp.class).aBoolean;
+ return mapper.readValue(entity.getContent(),
Resp.class).aBoolean;
} catch (JsonParseException | JsonMappingException e) {
throw new IOException("Error parsing response", e);
diff --git
a/tools/src/test/java/org/wikidata/query/rdf/tool/AbstractUpdaterIntegrationTestBase.java
b/tools/src/test/java/org/wikidata/query/rdf/tool/AbstractUpdaterIntegrationTestBase.java
index 3b3a36e..24f2756 100644
---
a/tools/src/test/java/org/wikidata/query/rdf/tool/AbstractUpdaterIntegrationTestBase.java
+++
b/tools/src/test/java/org/wikidata/query/rdf/tool/AbstractUpdaterIntegrationTestBase.java
@@ -9,9 +9,9 @@
import org.apache.http.impl.client.CloseableHttpClient;
import org.junit.Rule;
+import org.junit.rules.RuleChain;
import org.junit.runner.RunWith;
import org.wikidata.query.rdf.common.uri.WikibaseUris;
-import org.wikidata.query.rdf.test.CloseableRule;
import org.wikidata.query.rdf.tool.change.Change;
import org.wikidata.query.rdf.tool.change.IdRangeChangeSource;
import org.wikidata.query.rdf.tool.rdf.Munger;
@@ -27,25 +27,23 @@
@RunWith(RandomizedRunner.class)
public class AbstractUpdaterIntegrationTestBase extends RandomizedTest {
- @Rule
- public final CloseableRule<CloseableHttpClient> httpClient =
autoClose(HttpClientUtils.createHttpClient());
+ private final CloseableHttpClient httpClient =
HttpClientUtils.createHttpClient(false);
+ /** Repository to test with. */
+ protected final RdfRepositoryForTesting rdfRepository = new
RdfRepositoryForTesting("wdq", httpClient);
+
+ @Rule public RuleChain chain = RuleChain.outerRule(autoClose(httpClient))
+ .around(rdfRepository);
+
/**
* Wikibase test against.
*/
public final WikibaseRepository wikibaseRepository =
- new WikibaseRepository(new Uris("https", "www.wikidata.org"),
httpClient.get());
+ new WikibaseRepository(new Uris("https", "www.wikidata.org"),
httpClient);
/**
* Munger to test against.
*/
private final Munger munger = new
Munger(WikibaseUris.getURISystem()).removeSiteLinks();
-
- /**
- * Repository to test with.
- */
- @Rule
- public RdfRepositoryForTesting rdfRepository = new
RdfRepositoryForTesting("wdq");
-
/**
* Update all ids from from to to.
diff --git
a/tools/src/test/java/org/wikidata/query/rdf/tool/IOBlastingIntegrationTest.java
b/tools/src/test/java/org/wikidata/query/rdf/tool/IOBlastingIntegrationTest.java
index 173f0a2..b795c1c 100644
---
a/tools/src/test/java/org/wikidata/query/rdf/tool/IOBlastingIntegrationTest.java
+++
b/tools/src/test/java/org/wikidata/query/rdf/tool/IOBlastingIntegrationTest.java
@@ -1,6 +1,7 @@
package org.wikidata.query.rdf.tool;
import static org.hamcrest.Matchers.hasItems;
+import static org.wikidata.query.rdf.test.CloseableRule.autoClose;
import static
org.wikidata.query.rdf.test.Matchers.subjectPredicateObjectMatchers;
import static
org.wikidata.query.rdf.test.StatementHelper.randomStatementsAbout;
import static org.wikidata.query.rdf.tool.TupleQueryResultHelper.toIterable;
@@ -13,16 +14,20 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import org.apache.http.client.HttpClient;
+import org.apache.http.impl.client.CloseableHttpClient;
import org.hamcrest.Matcher;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.openrdf.model.Statement;
import org.openrdf.query.BindingSet;
import org.openrdf.query.QueryEvaluationException;
import org.openrdf.query.TupleQueryResult;
+import org.wikidata.query.rdf.test.CloseableRule;
import org.wikidata.query.rdf.tool.exception.ContainedException;
import org.wikidata.query.rdf.tool.rdf.RdfRepository;
@@ -46,10 +51,13 @@
new
ThreadFactoryBuilder().setNameFormat("IO-Blasting-%d").build());
private List<Future<IOBlasterResults>> resultses = new ArrayList<>();
+ @Rule
+ public CloseableRule<CloseableHttpClient> httpClient =
autoClose(HttpClientUtils.createHttpClient(true));
+
@Before
public void setup() {
for (int i = 1; i <= TOTAL_NAMESPACES; i++) {
- resultses.add(pool.submit(new IOBlaster("wdq" + i)));
+ resultses.add(pool.submit(new IOBlaster("wdq" + i,
httpClient.get())));
}
}
@@ -99,8 +107,8 @@
private final RdfRepositoryForTesting rdfRepository;
- IOBlaster(String namespace) {
- rdfRepository = new RdfRepositoryForTesting(namespace);
+ IOBlaster(String namespace, HttpClient httpClient) {
+ rdfRepository = new RdfRepositoryForTesting(namespace, httpClient);
}
/**
diff --git
a/tools/src/test/java/org/wikidata/query/rdf/tool/MungeIntegrationTest.java
b/tools/src/test/java/org/wikidata/query/rdf/tool/MungeIntegrationTest.java
index bc09050..0a5d3af 100644
--- a/tools/src/test/java/org/wikidata/query/rdf/tool/MungeIntegrationTest.java
+++ b/tools/src/test/java/org/wikidata/query/rdf/tool/MungeIntegrationTest.java
@@ -1,6 +1,7 @@
package org.wikidata.query.rdf.tool;
import static com.google.common.io.Resources.getResource;
+import static org.wikidata.query.rdf.test.CloseableRule.autoClose;
import static org.wikidata.query.rdf.test.Matchers.binds;
import static org.wikidata.query.rdf.tool.StreamUtils.utf8;
@@ -19,8 +20,10 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import org.apache.http.impl.client.CloseableHttpClient;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.RuleChain;
import org.junit.runner.RunWith;
import org.openrdf.model.impl.LiteralImpl;
import org.openrdf.model.vocabulary.XMLSchema;
@@ -56,9 +59,13 @@
*/
private final WikibaseUris uris = new WikibaseUris("test.wikidata.org");
+ private final CloseableHttpClient httpClient =
HttpClientUtils.createHttpClient(true);
+
+ private final RdfRepositoryForTesting rdfRepository = new
RdfRepositoryForTesting("wdq", httpClient);
@Rule
- public RdfRepositoryForTesting rdfRepository = new
RdfRepositoryForTesting("wdq");
+ public RuleChain chain = RuleChain.outerRule(autoClose(httpClient))
+ .around(rdfRepository);
/**
* Loads a truncated version of a test dump from test wikidata.
diff --git
a/tools/src/test/java/org/wikidata/query/rdf/tool/RdfRepositoryForTesting.java
b/tools/src/test/java/org/wikidata/query/rdf/tool/RdfRepositoryForTesting.java
index ab98668..0a74d39 100644
---
a/tools/src/test/java/org/wikidata/query/rdf/tool/RdfRepositoryForTesting.java
+++
b/tools/src/test/java/org/wikidata/query/rdf/tool/RdfRepositoryForTesting.java
@@ -2,6 +2,7 @@
import java.net.URI;
+import org.apache.http.client.HttpClient;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
@@ -14,14 +15,8 @@
*/
public class RdfRepositoryForTesting extends RdfRepository implements TestRule
{
- /**
- * The namespace of the local RDF repository, e.g. "kb" or "wdq".
- */
- private final String namespace;
-
- public RdfRepositoryForTesting(String namespace) {
- super(url("/namespace/" + namespace + "/sparql"),
WikibaseUris.WIKIDATA);
- this.namespace = namespace;
+ public RdfRepositoryForTesting(String namespace, HttpClient httpClient) {
+ super(url("/namespace/" + namespace + "/sparql"),
WikibaseUris.WIKIDATA, httpClient);
}
/**
@@ -109,8 +104,7 @@
/**
* Clear and close repository after test.
*/
- private void after() throws Exception {
+ private void after() {
clear();
- close();
}
}
diff --git
a/tools/src/test/java/org/wikidata/query/rdf/tool/http/OkOnlyResponseHandlerTest.java
b/tools/src/test/java/org/wikidata/query/rdf/tool/http/OkOnlyResponseHandlerTest.java
new file mode 100644
index 0000000..6194196
--- /dev/null
+++
b/tools/src/test/java/org/wikidata/query/rdf/tool/http/OkOnlyResponseHandlerTest.java
@@ -0,0 +1,92 @@
+package org.wikidata.query.rdf.tool.http;
+
+import static org.apache.http.HttpStatus.SC_CREATED;
+import static org.apache.http.HttpStatus.SC_OK;
+import static org.apache.http.HttpStatus.SC_SERVICE_UNAVAILABLE;
+import static org.apache.http.HttpVersion.HTTP_1_1;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.message.BasicStatusLine;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.wikidata.query.rdf.tool.exception.ContainedException;
+
+public class OkOnlyResponseHandlerTest {
+
+ private final MockHanlder handler = new MockHanlder();
+
+
+ @Test(expected = ContainedException.class)
+ public void exceptionRaisedWhenNotOk() throws IOException {
+ HttpResponse response = Mockito.mock(HttpResponse.class);
+
+ mockStatus(response, SC_CREATED);
+
+ handler.handleResponse(response);
+ }
+
+ @Test
+ public void bodyIsReturnedInException() throws IOException {
+ HttpResponse response = Mockito.mock(HttpResponse.class);
+
+ mockStatus(response, SC_SERVICE_UNAVAILABLE);
+ mockBody(response, "some body");
+
+ try {
+ handler.handleResponse(response);
+ fail();
+ } catch (ContainedException ce) {
+ assertThat(ce.getMessage(), containsString("some body"));
+ }
+ }
+
+ @Test
+ public void nullIsReturnedOnNullBody() throws IOException {
+ HttpResponse response = Mockito.mock(HttpResponse.class);
+
+ mockStatus(response, SC_OK);
+
+ Object result = handler.handleResponse(response);
+
+ assertNull(result);
+ assertFalse(handler.handleEntityCalled);
+ }
+
+ private void mockBody(HttpResponse response, String body) throws
UnsupportedEncodingException {
+ when(response.getEntity())
+ .thenReturn(new StringEntity(body));
+ }
+
+ private void mockStatus(HttpResponse response, int statusCode) {
+ when(response.getStatusLine())
+ .thenReturn(new BasicStatusLine(HTTP_1_1, statusCode,
"NOT_FOUND"));
+ }
+
+
+ private static class MockHanlder extends OkOnlyResponseHandler {
+
+ private boolean handleEntityCalled;
+
+ @Override
+ public String acceptHeader() {
+ return null;
+ }
+
+ @Override
+ public Object handleEntity(HttpEntity entity) {
+ handleEntityCalled = true;
+ return null;
+ }
+ }
+}
diff --git
a/tools/src/test/java/org/wikidata/query/rdf/tool/rdf/RdfRepositoryIntegrationTest.java
b/tools/src/test/java/org/wikidata/query/rdf/tool/rdf/RdfRepositoryIntegrationTest.java
index 27b963f..5b9bfba 100644
---
a/tools/src/test/java/org/wikidata/query/rdf/tool/rdf/RdfRepositoryIntegrationTest.java
+++
b/tools/src/test/java/org/wikidata/query/rdf/tool/rdf/RdfRepositoryIntegrationTest.java
@@ -1,6 +1,7 @@
package org.wikidata.query.rdf.tool.rdf;
import static org.hamcrest.Matchers.allOf;
+import static org.wikidata.query.rdf.test.CloseableRule.autoClose;
import static org.wikidata.query.rdf.test.Matchers.binds;
import static org.wikidata.query.rdf.test.StatementHelper.siteLink;
import static org.wikidata.query.rdf.test.StatementHelper.statement;
@@ -15,9 +16,11 @@
import java.util.Locale;
import java.util.Set;
+import org.apache.http.impl.client.CloseableHttpClient;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.RuleChain;
import org.junit.runner.RunWith;
import org.openrdf.model.BNode;
import org.openrdf.model.Statement;
@@ -33,6 +36,7 @@
import org.wikidata.query.rdf.common.uri.SchemaDotOrg;
import org.wikidata.query.rdf.common.uri.WikibaseUris;
import org.wikidata.query.rdf.common.uri.WikibaseUris.PropertyType;
+import org.wikidata.query.rdf.tool.HttpClientUtils;
import org.wikidata.query.rdf.tool.RdfRepositoryForTesting;
import com.carrotsearch.randomizedtesting.RandomizedRunner;
@@ -51,8 +55,12 @@
*/
private final WikibaseUris uris = WikibaseUris.getURISystem();
+ private final CloseableHttpClient httpClient =
HttpClientUtils.createHttpClient(true);
+ private final RdfRepositoryForTesting rdfRepository = new
RdfRepositoryForTesting("wdq", httpClient);
+
@Rule
- public RdfRepositoryForTesting rdfRepository = new
RdfRepositoryForTesting("wdq");
+ public RuleChain chain = RuleChain.outerRule(autoClose(httpClient))
+ .around(rdfRepository);
@Before
public void cleanList() {
diff --git
a/tools/src/test/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepositoryIntegrationTest.java
b/tools/src/test/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepositoryIntegrationTest.java
index 0d4e258..061fd39 100644
---
a/tools/src/test/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepositoryIntegrationTest.java
+++
b/tools/src/test/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepositoryIntegrationTest.java
@@ -35,7 +35,7 @@
public class WikibaseRepositoryIntegrationTest extends RandomizedTest {
private static final String HOST = "test.wikidata.org";
@Rule
- public final CloseableRule<CloseableHttpClient> httpClient =
autoClose(HttpClientUtils.createHttpClient());
+ public final CloseableRule<CloseableHttpClient> httpClient =
autoClose(HttpClientUtils.createHttpClient(false));
private final WikibaseRepository repo = new WikibaseRepository(new
Uris("https", HOST), httpClient.get());
private final WikibaseRepository proxyRepo = new WikibaseRepository(new
Uris("http", "localhost", 8812), httpClient.get());
private final WikibaseUris uris = new WikibaseUris(HOST);
@@ -138,7 +138,7 @@
@Test
public void fetchIsNormalized() throws RetryableException,
ContainedException, IOException {
long now = System.currentTimeMillis();
- try (CloseableHttpClient httpClient =
HttpClientUtils.createHttpClient()) {
+ try (CloseableHttpClient httpClient =
HttpClientUtils.createHttpClient(true)) {
WikibaseRepository proxyRepo = new WikibaseRepository(new
Uris("http", "localhost", 8812), httpClient);
String entityId =
repo.firstEntityIdForLabelStartingWith("QueryTestItem", "en", "item");
repo.setLabel(entityId, "item", "QueryTestItem" + now, "en");
diff --git
a/tools/src/test/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepositoryWireIntegrationTest.java
b/tools/src/test/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepositoryWireIntegrationTest.java
index 9a9abf4..31275b3 100644
---
a/tools/src/test/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepositoryWireIntegrationTest.java
+++
b/tools/src/test/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepositoryWireIntegrationTest.java
@@ -32,7 +32,7 @@
public class WikibaseRepositoryWireIntegrationTest {
@Rule
- public final CloseableRule<CloseableHttpClient> httpClient =
autoClose(HttpClientUtils.createHttpClient());
+ public final CloseableRule<CloseableHttpClient> httpClient =
autoClose(HttpClientUtils.createHttpClient(false));
@Rule public WireMockRule wireMockRule = new WireMockRule(wireMockConfig()
.dynamicPort()
--
To view, visit https://gerrit.wikimedia.org/r/402753
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I2ea367e3f7c77338d487793fe2b9982b8538b017
Gerrit-PatchSet: 8
Gerrit-Project: wikidata/query/rdf
Gerrit-Branch: master
Gerrit-Owner: Gehel <[email protected]>
Gerrit-Reviewer: Gehel <[email protected]>
Gerrit-Reviewer: jenkins-bot <>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits