Gehel has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/402753 )

Change subject: RdfRepository uses Apache HC instead of Jetty HTTP client
......................................................................

RdfRepository uses Apache HC instead of Jetty HTTP client

This removes a dependency, and provides a (IMHO) simpler HTTP client
library. This might help our issue with leaked threads in tests.

Change-Id: I2ea367e3f7c77338d487793fe2b9982b8538b017
---
M tools/pom.xml
M tools/src/main/java/org/wikidata/query/rdf/tool/HttpClientUtils.java
M tools/src/main/java/org/wikidata/query/rdf/tool/Update.java
A 
tools/src/main/java/org/wikidata/query/rdf/tool/http/OkOnlyResponseHandler.java
M tools/src/main/java/org/wikidata/query/rdf/tool/rdf/RdfRepository.java
M 
tools/src/test/java/org/wikidata/query/rdf/tool/AbstractUpdaterIntegrationTestBase.java
M tools/src/test/java/org/wikidata/query/rdf/tool/IOBlastingIntegrationTest.java
M tools/src/test/java/org/wikidata/query/rdf/tool/MungeIntegrationTest.java
M tools/src/test/java/org/wikidata/query/rdf/tool/RdfRepositoryForTesting.java
A 
tools/src/test/java/org/wikidata/query/rdf/tool/http/OkOnlyResponseHandlerTest.java
M 
tools/src/test/java/org/wikidata/query/rdf/tool/rdf/RdfRepositoryIntegrationTest.java
M 
tools/src/test/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepositoryIntegrationTest.java
M 
tools/src/test/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepositoryWireIntegrationTest.java
13 files changed, 323 insertions(+), 274 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/wikidata/query/rdf 
refs/changes/53/402753/8

diff --git a/tools/pom.xml b/tools/pom.xml
index f76c6b7..d18ff86 100644
--- a/tools/pom.xml
+++ b/tools/pom.xml
@@ -39,6 +39,10 @@
         </dependency>
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-databind</artifactId>
         </dependency>
         <dependency>
@@ -81,18 +85,6 @@
         <dependency>
             <groupId>org.apache.httpcomponents</groupId>
             <artifactId>httpcore</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.eclipse.jetty</groupId>
-            <artifactId>jetty-client</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.eclipse.jetty</groupId>
-            <artifactId>jetty-http</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.eclipse.jetty</groupId>
-            <artifactId>jetty-util</artifactId>
         </dependency>
         <dependency>
             <groupId>org.openrdf.sesame</groupId>
diff --git 
a/tools/src/main/java/org/wikidata/query/rdf/tool/HttpClientUtils.java 
b/tools/src/main/java/org/wikidata/query/rdf/tool/HttpClientUtils.java
index eaec48d..dbb7fe9 100644
--- a/tools/src/main/java/org/wikidata/query/rdf/tool/HttpClientUtils.java
+++ b/tools/src/main/java/org/wikidata/query/rdf/tool/HttpClientUtils.java
@@ -1,11 +1,18 @@
 package org.wikidata.query.rdf.tool;
 
+import static java.lang.Integer.parseInt;
+
 import java.io.InterruptedIOException;
 import java.net.UnknownHostException;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
 
 import javax.net.ssl.SSLException;
 
 import org.apache.http.HttpEntityEnclosingRequest;
+import org.apache.http.HttpException;
+import org.apache.http.HttpHost;
 import org.apache.http.HttpRequest;
 import org.apache.http.HttpResponse;
 import org.apache.http.HttpStatus;
@@ -16,32 +23,41 @@
 import org.apache.http.client.methods.HttpRequestBase;
 import org.apache.http.client.protocol.HttpClientContext;
 import org.apache.http.conn.ConnectTimeoutException;
+import org.apache.http.conn.routing.HttpRoute;
+import org.apache.http.conn.routing.HttpRoutePlanner;
+import org.apache.http.conn.socket.LayeredConnectionSocketFactory;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.DefaultServiceUnavailableRetryStrategy;
 import org.apache.http.impl.client.HttpClients;
+import org.apache.http.impl.conn.DefaultProxyRoutePlanner;
 import org.apache.http.protocol.HttpContext;
+import org.apache.http.ssl.SSLContextBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Utilities for dealing with HttpClient.
  */
+@SuppressWarnings("checkstyle:classfanoutcomplexity") // dependencies are 
mostly about HC configuration
 public final class HttpClientUtils {
 
     private static final Logger log = 
LoggerFactory.getLogger(HttpClientUtils.class);
 
-    /**
-     * How many retries allowed on error.
-     */
+    /** Configuration name for proxy host. */
+    public static final String HTTP_PROXY = "http.proxyHost";
+
+    /** Configuration name for proxy port. */
+    public static final String HTTP_PROXY_PORT = "http.proxyPort";
+
+    /** How many retries allowed on error. */
     public static final int RETRIES = 3;
-    /**
-     * Retry interval, in ms.
-     */
+
+    /** Retry interval, in ms. */
     public static final int RETRY_INTERVAL = 500;
 
-    /**
-     * Configure request to ignore cookies.
-     */
+    /** Configure request to ignore cookies. */
     public static void ignoreCookies(HttpRequestBase request) {
         RequestConfig noCookiesConfig = 
RequestConfig.custom().setCookieSpec(CookieSpecs.IGNORE_COOKIES).build();
         request.setConfig(noCookiesConfig);
@@ -51,14 +67,53 @@
         // Uncallable utility constructor
     }
 
-    public static CloseableHttpClient createHttpClient() {
-        return HttpClients.custom()
-                .setMaxConnPerRoute(100).setMaxConnTotal(100)
-                .setRetryHandler(getRetryHandler(RETRIES))
-                .setServiceUnavailableRetryStrategy(getRetryStrategy(RETRIES, 
RETRY_INTERVAL))
-                .disableCookieManagement()
-                .setUserAgent("Wikidata Query Service Updater")
-                .build();
+    public static CloseableHttpClient createHttpClient(boolean trustAll) {
+        try {
+            return HttpClients.custom()
+                    .setMaxConnPerRoute(100).setMaxConnTotal(100)
+                    .setRetryHandler(getRetryHandler(RETRIES))
+                    
.setServiceUnavailableRetryStrategy(getRetryStrategy(RETRIES, RETRY_INTERVAL))
+                    .disableCookieManagement()
+                    .setRoutePlanner(routePlanner())
+                    .setSSLSocketFactory(socketFactory(trustAll))
+                    .setUserAgent("Wikidata Query Service Updater")
+                    .build();
+        } catch (KeyStoreException | NoSuchAlgorithmException | 
KeyManagementException e) {
+            throw new RuntimeException("Could not create HttpClient", e);
+        }
+    }
+
+    private static LayeredConnectionSocketFactory socketFactory(boolean 
trustAll)
+            throws KeyStoreException, NoSuchAlgorithmException, 
KeyManagementException {
+        if (!trustAll) return null;
+        SSLContextBuilder builder = new SSLContextBuilder();
+        builder.loadTrustMaterial(null, new TrustSelfSignedStrategy());
+        return new SSLConnectionSocketFactory(builder.build());
+    }
+
+    private static HttpRoutePlanner routePlanner() {
+        String httpProxyProp = System.getProperty(HttpClientUtils.HTTP_PROXY);
+        String httpProxyPortProp = 
System.getProperty(HttpClientUtils.HTTP_PROXY_PORT);
+        if (httpProxyProp == null || httpProxyPortProp == null) {
+            return null;
+        }
+
+        HttpHost proxyHost = new HttpHost(httpProxyProp, 
parseInt(httpProxyPortProp), "http");
+
+        return new DefaultProxyRoutePlanner(proxyHost) {
+            @Override
+            public HttpRoute determineRoute(
+                    final HttpHost host,
+                    final HttpRequest request,
+                    final HttpContext context) throws HttpException {
+                String hostname = host.getHostName();
+                if (hostname.equals("127.0.0.1") || 
hostname.equalsIgnoreCase("localhost")) {
+                    // Return direct route
+                    return new HttpRoute(host);
+                }
+                return super.determineRoute(host, request, context);
+            }
+        };
     }
 
     /**
diff --git a/tools/src/main/java/org/wikidata/query/rdf/tool/Update.java 
b/tools/src/main/java/org/wikidata/query/rdf/tool/Update.java
index 3ed90cd..b171f67 100644
--- a/tools/src/main/java/org/wikidata/query/rdf/tool/Update.java
+++ b/tools/src/main/java/org/wikidata/query/rdf/tool/Update.java
@@ -53,9 +53,9 @@
         URI sparqlUri = sparqlUri(options);
         WikibaseUris uris = new WikibaseUris(options.wikibaseHost());
 
-        try (CloseableHttpClient httpClient = 
HttpClientUtils.createHttpClient();
-             RdfRepository rdfRepository = new RdfRepository(sparqlUri, uris)
+        try (CloseableHttpClient httpClient = 
HttpClientUtils.createHttpClient(false)
         ) {
+            RdfRepository rdfRepository = new RdfRepository(sparqlUri, uris, 
httpClient);
             WikibaseRepository wikibaseRepository = new 
WikibaseRepository(getUris(options), httpClient);
             Change.Source<? extends Change.Batch> changeSource = 
buildChangeSource(options, rdfRepository,
                     wikibaseRepository);
diff --git 
a/tools/src/main/java/org/wikidata/query/rdf/tool/http/OkOnlyResponseHandler.java
 
b/tools/src/main/java/org/wikidata/query/rdf/tool/http/OkOnlyResponseHandler.java
new file mode 100644
index 0000000..285358e
--- /dev/null
+++ 
b/tools/src/main/java/org/wikidata/query/rdf/tool/http/OkOnlyResponseHandler.java
@@ -0,0 +1,54 @@
+package org.wikidata.query.rdf.tool.http;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.apache.http.HttpStatus.SC_OK;
+
+import java.io.IOException;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.StatusLine;
+import org.apache.http.client.ResponseHandler;
+import org.apache.http.util.EntityUtils;
+import org.wikidata.query.rdf.tool.exception.ContainedException;
+
+/**
+ * A {@link ResponseHandler} that checks that the response is an HTTP/200.
+ *
+ * Also provides {@link #acceptHeader()} to expose the type of content this
+ * handler can parse.
+ *
+ * @param <T> the type of response parsed
+ */
+public abstract class OkOnlyResponseHandler<T> implements ResponseHandler<T> {
+    /**
+     * The contents of the accept header sent to the rdf repository.
+     */
+    public abstract String acceptHeader();
+
+
+    /**
+     * Handle the response entity and transform it into the actual response
+     * object.
+     */
+    protected abstract T handleEntity(HttpEntity entity) throws IOException;
+
+    /**
+     * Read the entity from the response body and pass it to the entity handler
+     * method if the response was successful (a 200 status code). If no 
response
+     * body exists, this returns null. If the response was unsuccessful, throws
+     * a {@link ContainedException}.
+     */
+    @Override
+    public T handleResponse(final HttpResponse response)
+            throws IOException {
+        final StatusLine statusLine = response.getStatusLine();
+        final HttpEntity entity = response.getEntity();
+        if (statusLine.getStatusCode() != SC_OK) {
+            String body = entity == null ? null : EntityUtils.toString(entity, 
UTF_8);
+            throw new ContainedException("Non-200 response from triple store:  
" + response
+                    + " body=\n" + body);
+        }
+        return entity == null ? null : handleEntity(entity);
+    }
+}
diff --git 
a/tools/src/main/java/org/wikidata/query/rdf/tool/rdf/RdfRepository.java 
b/tools/src/main/java/org/wikidata/query/rdf/tool/rdf/RdfRepository.java
index 8120f95..78f5d47 100644
--- a/tools/src/main/java/org/wikidata/query/rdf/tool/rdf/RdfRepository.java
+++ b/tools/src/main/java/org/wikidata/query/rdf/tool/rdf/RdfRepository.java
@@ -1,9 +1,10 @@
 package org.wikidata.query.rdf.tool.rdf;
 
+import static com.google.common.base.Charsets.UTF_8;
 import static com.google.common.collect.Sets.newHashSetWithExpectedSize;
 import static com.google.common.io.Resources.getResource;
+import static java.util.Collections.singletonList;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URL;
@@ -16,9 +17,6 @@
 import java.util.Locale;
 import java.util.Set;
 import java.util.TimeZone;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -26,16 +24,12 @@
 import javax.xml.datatype.DatatypeFactory;
 import javax.xml.datatype.XMLGregorianCalendar;
 
-import org.eclipse.jetty.client.HttpClient;
-import org.eclipse.jetty.client.HttpProxy;
-import org.eclipse.jetty.client.ProxyConfiguration;
-import org.eclipse.jetty.client.api.ContentResponse;
-import org.eclipse.jetty.client.api.Request;
-import org.eclipse.jetty.client.util.FormContentProvider;
-import org.eclipse.jetty.http.HttpMethod;
-import org.eclipse.jetty.http.HttpStatus;
-import org.eclipse.jetty.util.Fields;
-import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.util.EntityUtils;
 import org.openrdf.model.Literal;
 import org.openrdf.model.Statement;
 import org.openrdf.query.Binding;
@@ -53,8 +47,8 @@
 import org.wikidata.query.rdf.common.uri.SchemaDotOrg;
 import org.wikidata.query.rdf.common.uri.WikibaseUris;
 import org.wikidata.query.rdf.tool.change.Change;
-import org.wikidata.query.rdf.tool.exception.ContainedException;
 import org.wikidata.query.rdf.tool.exception.FatalException;
+import org.wikidata.query.rdf.tool.http.OkOnlyResponseHandler;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -62,14 +56,6 @@
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.JsonMappingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.github.rholder.retry.Attempt;
-import com.github.rholder.retry.RetryException;
-import com.github.rholder.retry.RetryListener;
-import com.github.rholder.retry.Retryer;
-import com.github.rholder.retry.RetryerBuilder;
-import com.github.rholder.retry.StopStrategies;
-import com.github.rholder.retry.WaitStrategies;
-import com.google.common.base.Charsets;
 import com.google.common.collect.ImmutableSetMultimap;
 import com.google.common.io.Resources;
 
@@ -81,7 +67,7 @@
 // TODO fan out complexity
 @SuppressWarnings("checkstyle:classfanoutcomplexity")
 @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification = 
"spotbug limitation: https://github.com/spotbugs/spotbugs/issues/463";)
-public class RdfRepository implements AutoCloseable {
+public class RdfRepository  {
     private static final Logger log = 
LoggerFactory.getLogger(RdfRepository.class);
     /**
      * UTC timezone.
@@ -136,26 +122,6 @@
     private final String verify;
 
     /**
-     * How many times we retry a failed HTTP call.
-     */
-    private int maxRetries = 6;
-    /**
-     * How long to delay after failing first HTTP call, in milliseconds.
-     * Next retries would be slower exponentially by 2x until maxRetries is 
exhausted.
-     * Note that the first retry is 2x delay due to the way Retryer is 
implemented.
-     */
-    private int delay = 1000;
-
-    /**
-     * Configuration name for proxy host.
-     */
-    private static final String HTTP_PROXY = "http.proxyHost";
-    /**
-     * Configuration name for proxy port.
-     */
-    private static final String HTTP_PROXY_PORT = "http.proxyPort";
-
-    /**
      * Request timeout property.
      */
     public static final String TIMEOUT_PROPERTY = RdfRepository.class + 
".timeout";
@@ -165,12 +131,7 @@
      */
     private final int timeout;
 
-    /**
-     * Retryer for fetching data from RDF store.
-     */
-    private final Retryer<ContentResponse> retryer;
-
-    public RdfRepository(URI uri, WikibaseUris uris) {
+    public RdfRepository(URI uri, WikibaseUris uris, HttpClient httpClient) {
         this.uri = uri;
         this.uris = uris;
         msyncBody = loadBody("multiSync");
@@ -183,96 +144,7 @@
         verify = loadBody("verify");
 
         timeout = Integer.parseInt(System.getProperty(TIMEOUT_PROPERTY, "-1"));
-        httpClient = new HttpClient(new SslContextFactory(true/* trustAll */));
-        setupHttpClient();
-
-        retryer = RetryerBuilder.<ContentResponse>newBuilder()
-                .retryIfExceptionOfType(TimeoutException.class)
-                .retryIfExceptionOfType(ExecutionException.class)
-                .retryIfExceptionOfType(IOException.class)
-                .retryIfRuntimeException()
-                .withWaitStrategy(WaitStrategies.exponentialWait(delay, 10, 
TimeUnit.SECONDS))
-                .withStopStrategy(StopStrategies.stopAfterAttempt(maxRetries))
-                .withRetryListener(new RetryListener() {
-                    @Override
-                    public <V> void onRetry(Attempt<V> attempt) {
-                        if (attempt.hasException()) {
-                            log.info("HTTP request failed: {}, attempt {}, 
will {}",
-                                    attempt.getExceptionCause(),
-                                    attempt.getAttemptNumber(),
-                                    attempt.getAttemptNumber() < maxRetries ? 
"retry" : "fail");
-                        }
-                    }
-                })
-                .build();
-    }
-
-    /**
-     * Setup HTTP client settings.
-     */
-    @SuppressWarnings("checkstyle:illegalcatch")
-    private void setupHttpClient() {
-        if (System.getProperty(HTTP_PROXY) != null
-                && System.getProperty(HTTP_PROXY_PORT) != null) {
-            final ProxyConfiguration proxyConfig = 
httpClient.getProxyConfiguration();
-            final HttpProxy proxy = new HttpProxy(
-                    System.getProperty(HTTP_PROXY),
-                    Integer.parseInt(System.getProperty(HTTP_PROXY_PORT)));
-            proxy.getExcludedAddresses().add("localhost");
-            proxy.getExcludedAddresses().add("127.0.0.1");
-            proxyConfig.getProxies().add(proxy);
-        }
-        try {
-            httpClient.start();
-        // Who would think declaring it as throws Exception is a good idea?
-        } catch (Exception e) {
-            throw new RuntimeException("Unable to start HttpClient", e);
-        }
-    }
-
-    /**
-     * Close the repository.
-     * @throws Exception
-     */
-    @Override
-    public void close() throws Exception {
-        httpClient.stop();
-    }
-
-    /**
-     * Get max retries count.
-     * @return How many times we retry a failed HTTP call.
-     */
-    public int getMaxRetries() {
-        return maxRetries;
-    }
-
-    /**
-     * Set how many times we retry a failed HTTP call.
-     * @return this
-     */
-    public RdfRepository setMaxRetries(int maxRetries) {
-        this.maxRetries = maxRetries;
-        return this;
-    }
-
-    /**
-     * Get retry delay.
-     * @return How long to delay after failing first HTTP call, in 
milliseconds.
-     */
-    public int getDelay() {
-        return delay;
-    }
-
-    /**
-     * Set retry delay.
-     * Specifies how long to delay after failing first HTTP call, in 
milliseconds.
-     * Next retries would be slower by 2x, 3x, 4x etc. until maxRetries is 
exhausted.
-     * @return The repository object
-     */
-    public RdfRepository setDelay(int delay) {
-        this.delay = delay;
-        return this;
+        this.httpClient = httpClient;
     }
 
     /**
@@ -286,7 +158,7 @@
     private static String loadBody(String name) {
         URL url = getResource(RdfRepository.class, "RdfRepository." + name + 
".sparql");
         try {
-            return Resources.toString(url, Charsets.UTF_8);
+            return Resources.toString(url, UTF_8);
         } catch (IOException e) {
             throw new FatalException("Can't load " + url, e);
         }
@@ -300,7 +172,7 @@
      * @return Collection of strings resulting from the query.
      */
     private Set<String> resultToSet(TupleQueryResult result, String binding) {
-        HashSet<String> values = new HashSet<String>();
+        HashSet<String> values = new HashSet<>();
         try {
             while (result.hasNext()) {
                 Binding value = result.next().getBinding(binding);
@@ -686,22 +558,25 @@
      * @param accept Accept header (can be null)
      * @return Request object
      */
-    private Request makeRequest(String type, String sparql, String accept) {
-        Request post = httpClient.newRequest(uri);
-        post.method(HttpMethod.POST);
-        if (timeout > 0) {
-            post.timeout(timeout, TimeUnit.SECONDS);
-        }
+    private HttpPost makeRequest(String type, String sparql, String accept) 
throws IOException {
+        HttpPost post = new HttpPost(uri);
+
+        // TODO: manage timeouts
+        // if (timeout > 0) {
+        //     post.timeout(timeout, TimeUnit.SECONDS);
+        // }
+
         // Note that Blazegraph totally ignores the Accept header for SPARQL
         // updates so the response is just html in that case...
         if (accept != null) {
-            post.header("Accept", accept);
+            post.addHeader("Accept", accept);
         }
 
-        final Fields fields = new Fields();
-        fields.add(type, sparql);
-        final FormContentProvider form = new FormContentProvider(fields, 
Charsets.UTF_8);
-        post.content(form);
+        post.setEntity(new UrlEncodedFormEntity(
+                singletonList(
+                        new BasicNameValuePair(type, sparql)),
+                UTF_8));
+
         return post;
     }
 
@@ -711,35 +586,19 @@
      * @param type name of the parameter in which to send sparql
      * @return results string from the server
      */
-    protected <T> T execute(String type, ResponseHandler<T> responseHandler, 
String sparql) {
+    protected <T> T execute(String type, OkOnlyResponseHandler<T> 
responseHandler, String sparql) {
         log.debug("Running SPARQL: {}", sparql);
         long startQuery = System.currentTimeMillis();
         // TODO we might want to look into Blazegraph's incremental update
-        // reporting.....
-        final ContentResponse response;
         try {
-            response = retryer.call(()
-                -> makeRequest(type, sparql, 
responseHandler.acceptHeader()).send());
-
-            if (response.getStatus() != HttpStatus.OK_200) {
-                throw new ContainedException("Non-200 response from triple 
store:  " + response
-                                + " body=\n" + responseBodyAsString(response));
-            }
-
-            log.debug("Completed in {} ms", System.currentTimeMillis() - 
startQuery);
-            return responseHandler.parse(response);
-        } catch (ExecutionException | RetryException | IOException e) {
+            return httpClient.execute(
+                    makeRequest(type, sparql, responseHandler.acceptHeader()),
+                    responseHandler);
+        } catch (IOException e) {
             throw new FatalException("Error updating triple store", e);
+        } finally {
+            log.debug("Completed in {} ms", System.currentTimeMillis() - 
startQuery);
         }
-    }
-
-    /**
-     * Fetch the body of the response as a string.
-     *
-     * @throws IOException if there is an error reading the response
-     */
-    protected String responseBodyAsString(ContentResponse response) throws 
IOException {
-        return response.getContentAsString();
     }
 
     /**
@@ -769,47 +628,23 @@
     }
 
     /**
-     * Passed to execute to setup the accept header and parse the response. Its
-     * super ultra mega important to parse the response in execute because
-     * execute manages closing the http response object. If execute return the
-     * input stream after closing the response then everything would
-     * <strong>mostly</strong> work but things would blow up with strange 
socket
-     * closed errors.
-     *
-     * @param <T> the type of response parsed
-     */
-    private interface ResponseHandler<T> {
-        /**
-         * The contents of the accept header sent to the rdf repository.
-         */
-        String acceptHeader();
-
-        /**
-         * Parse the response.
-         *
-         * @throws IOException if there is an error reading the response
-         */
-        T parse(ContentResponse entity) throws IOException;
-    }
-
-    /**
      * Count and log the number of updates.
      */
-    protected static final ResponseHandler<Integer> UPDATE_COUNT_RESPONSE = 
new UpdateCountResponse();
+    protected static final OkOnlyResponseHandler<Integer> 
UPDATE_COUNT_RESPONSE = new UpdateCountResponse();
     /**
      * Parse the response from a regular query into a TupleQueryResult.
      */
-    protected static final ResponseHandler<TupleQueryResult> 
TUPLE_QUERY_RESPONSE = new TupleQueryResponse();
+    protected static final OkOnlyResponseHandler<TupleQueryResult> 
TUPLE_QUERY_RESPONSE = new TupleQueryResponse();
     /**
      * Parse the response from an ask query into a boolean.
      */
-    protected static final ResponseHandler<Boolean> ASK_QUERY_RESPONSE = new 
AskQueryResponse();
+    protected static final OkOnlyResponseHandler<Boolean> ASK_QUERY_RESPONSE = 
new AskQueryResponse();
 
     /**
      * Attempts to log update response information but very likely only works
      * for Blazegraph.
      */
-    protected static class UpdateCountResponse implements 
ResponseHandler<Integer> {
+    protected static class UpdateCountResponse extends 
OkOnlyResponseHandler<Integer> {
         /**
          * The pattern for the response for an update.
          */
@@ -843,9 +678,11 @@
 
         @Override
         @SuppressFBWarnings(value = "PRMC_POSSIBLY_REDUNDANT_METHOD_CALLS", 
justification = "more readable with 2 calls")
-        public Integer parse(ContentResponse entity) throws IOException {
+        public Integer handleEntity(HttpEntity entity) throws IOException {
             Integer mutationCount = null;
-            for (String line : entity.getContentAsString().split("\\r?\\n")) {
+
+            // TODO: we could probably replace this with a buffered reader
+            for (String line : EntityUtils.toString(entity, 
UTF_8).split("\\r?\\n")) {
                 Matcher m;
                 m = ELAPSED_LINE_FLUSH.matcher(line);
                 if (m.matches()) {
@@ -891,19 +728,19 @@
     /**
      * Parses responses to regular queries into TupleQueryResults.
      */
-    private static class TupleQueryResponse implements 
ResponseHandler<TupleQueryResult> {
+    private static class TupleQueryResponse extends 
OkOnlyResponseHandler<TupleQueryResult> {
         @Override
         public String acceptHeader() {
             return "application/x-binary-rdf-results-table";
         }
 
         @Override
-        public TupleQueryResult parse(ContentResponse entity) throws 
IOException {
+        public TupleQueryResult handleEntity(HttpEntity entity) throws 
IOException {
             BinaryQueryResultParser p = new BinaryQueryResultParser();
             TupleQueryResultBuilder collector = new TupleQueryResultBuilder();
             p.setQueryResultHandler(collector);
             try {
-                p.parseQueryResult(new 
ByteArrayInputStream(entity.getContent()));
+                p.parseQueryResult(entity.getContent());
             } catch (QueryResultParseException | QueryResultHandlerException | 
IllegalStateException e) {
                 throw new RuntimeException("Error parsing query", e);
             }
@@ -914,18 +751,22 @@
     /**
      * Parses responses to ask queries into booleans.
      */
-    private static class AskQueryResponse implements ResponseHandler<Boolean> {
+    private static class AskQueryResponse extends 
OkOnlyResponseHandler<Boolean> {
         @Override
         public String acceptHeader() {
             return "application/json";
         }
+        private final ObjectMapper mapper;
+
+        AskQueryResponse() {
+            mapper = new ObjectMapper();
+            
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+        }
 
         @Override
-        public Boolean parse(ContentResponse entity) throws IOException {
+        public Boolean handleEntity(HttpEntity entity) throws IOException {
             try {
-                ObjectMapper mapper = new ObjectMapper();
-                
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-                return mapper.readValue(entity.getContentAsString(), 
Resp.class).aBoolean;
+                return mapper.readValue(entity.getContent(), 
Resp.class).aBoolean;
 
             } catch (JsonParseException | JsonMappingException e) {
                 throw new IOException("Error parsing response", e);
diff --git 
a/tools/src/test/java/org/wikidata/query/rdf/tool/AbstractUpdaterIntegrationTestBase.java
 
b/tools/src/test/java/org/wikidata/query/rdf/tool/AbstractUpdaterIntegrationTestBase.java
index 3b3a36e..24f2756 100644
--- 
a/tools/src/test/java/org/wikidata/query/rdf/tool/AbstractUpdaterIntegrationTestBase.java
+++ 
b/tools/src/test/java/org/wikidata/query/rdf/tool/AbstractUpdaterIntegrationTestBase.java
@@ -9,9 +9,9 @@
 
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.junit.Rule;
+import org.junit.rules.RuleChain;
 import org.junit.runner.RunWith;
 import org.wikidata.query.rdf.common.uri.WikibaseUris;
-import org.wikidata.query.rdf.test.CloseableRule;
 import org.wikidata.query.rdf.tool.change.Change;
 import org.wikidata.query.rdf.tool.change.IdRangeChangeSource;
 import org.wikidata.query.rdf.tool.rdf.Munger;
@@ -27,25 +27,23 @@
 @RunWith(RandomizedRunner.class)
 public class AbstractUpdaterIntegrationTestBase extends RandomizedTest {
 
-    @Rule
-    public final CloseableRule<CloseableHttpClient> httpClient = 
autoClose(HttpClientUtils.createHttpClient());
+    private final CloseableHttpClient httpClient = 
HttpClientUtils.createHttpClient(false);
+    /** Repository to test with. */
+    protected final RdfRepositoryForTesting rdfRepository = new 
RdfRepositoryForTesting("wdq", httpClient);
+
+    @Rule public RuleChain chain = RuleChain.outerRule(autoClose(httpClient))
+            .around(rdfRepository);
+
     /**
      * Wikibase test against.
      */
     public final WikibaseRepository wikibaseRepository =
-            new WikibaseRepository(new Uris("https", "www.wikidata.org"), 
httpClient.get());
+            new WikibaseRepository(new Uris("https", "www.wikidata.org"), 
httpClient);
 
     /**
      * Munger to test against.
      */
     private final Munger munger = new 
Munger(WikibaseUris.getURISystem()).removeSiteLinks();
-
-    /**
-     * Repository to test with.
-     */
-    @Rule
-    public RdfRepositoryForTesting rdfRepository = new 
RdfRepositoryForTesting("wdq");
-
 
     /**
      * Update all ids from from to to.
diff --git 
a/tools/src/test/java/org/wikidata/query/rdf/tool/IOBlastingIntegrationTest.java
 
b/tools/src/test/java/org/wikidata/query/rdf/tool/IOBlastingIntegrationTest.java
index 173f0a2..b795c1c 100644
--- 
a/tools/src/test/java/org/wikidata/query/rdf/tool/IOBlastingIntegrationTest.java
+++ 
b/tools/src/test/java/org/wikidata/query/rdf/tool/IOBlastingIntegrationTest.java
@@ -1,6 +1,7 @@
 package org.wikidata.query.rdf.tool;
 
 import static org.hamcrest.Matchers.hasItems;
+import static org.wikidata.query.rdf.test.CloseableRule.autoClose;
 import static 
org.wikidata.query.rdf.test.Matchers.subjectPredicateObjectMatchers;
 import static 
org.wikidata.query.rdf.test.StatementHelper.randomStatementsAbout;
 import static org.wikidata.query.rdf.tool.TupleQueryResultHelper.toIterable;
@@ -13,16 +14,20 @@
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.http.client.HttpClient;
+import org.apache.http.impl.client.CloseableHttpClient;
 import org.hamcrest.Matcher;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.openrdf.model.Statement;
 import org.openrdf.query.BindingSet;
 import org.openrdf.query.QueryEvaluationException;
 import org.openrdf.query.TupleQueryResult;
+import org.wikidata.query.rdf.test.CloseableRule;
 import org.wikidata.query.rdf.tool.exception.ContainedException;
 import org.wikidata.query.rdf.tool.rdf.RdfRepository;
 
@@ -46,10 +51,13 @@
             new 
ThreadFactoryBuilder().setNameFormat("IO-Blasting-%d").build());
     private List<Future<IOBlasterResults>> resultses = new ArrayList<>();
 
+    @Rule
+    public CloseableRule<CloseableHttpClient> httpClient = 
autoClose(HttpClientUtils.createHttpClient(true));
+
     @Before
     public void setup() {
         for (int i = 1; i <= TOTAL_NAMESPACES; i++) {
-            resultses.add(pool.submit(new IOBlaster("wdq" + i)));
+            resultses.add(pool.submit(new IOBlaster("wdq" + i, 
httpClient.get())));
         }
     }
 
@@ -99,8 +107,8 @@
 
         private final RdfRepositoryForTesting rdfRepository;
 
-        IOBlaster(String namespace) {
-            rdfRepository = new RdfRepositoryForTesting(namespace);
+        IOBlaster(String namespace, HttpClient httpClient) {
+            rdfRepository = new RdfRepositoryForTesting(namespace, httpClient);
         }
 
         /**
diff --git 
a/tools/src/test/java/org/wikidata/query/rdf/tool/MungeIntegrationTest.java 
b/tools/src/test/java/org/wikidata/query/rdf/tool/MungeIntegrationTest.java
index bc09050..0a5d3af 100644
--- a/tools/src/test/java/org/wikidata/query/rdf/tool/MungeIntegrationTest.java
+++ b/tools/src/test/java/org/wikidata/query/rdf/tool/MungeIntegrationTest.java
@@ -1,6 +1,7 @@
 package org.wikidata.query.rdf.tool;
 
 import static com.google.common.io.Resources.getResource;
+import static org.wikidata.query.rdf.test.CloseableRule.autoClose;
 import static org.wikidata.query.rdf.test.Matchers.binds;
 import static org.wikidata.query.rdf.tool.StreamUtils.utf8;
 
@@ -19,8 +20,10 @@
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.http.impl.client.CloseableHttpClient;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.RuleChain;
 import org.junit.runner.RunWith;
 import org.openrdf.model.impl.LiteralImpl;
 import org.openrdf.model.vocabulary.XMLSchema;
@@ -56,9 +59,13 @@
      */
     private final WikibaseUris uris = new WikibaseUris("test.wikidata.org");
 
+    private final CloseableHttpClient httpClient = 
HttpClientUtils.createHttpClient(true);
+
+    private final RdfRepositoryForTesting rdfRepository = new 
RdfRepositoryForTesting("wdq", httpClient);
 
     @Rule
-    public RdfRepositoryForTesting rdfRepository = new 
RdfRepositoryForTesting("wdq");
+    public RuleChain chain = RuleChain.outerRule(autoClose(httpClient))
+            .around(rdfRepository);
 
     /**
      * Loads a truncated version of a test dump from test wikidata.
diff --git 
a/tools/src/test/java/org/wikidata/query/rdf/tool/RdfRepositoryForTesting.java 
b/tools/src/test/java/org/wikidata/query/rdf/tool/RdfRepositoryForTesting.java
index ab98668..0a74d39 100644
--- 
a/tools/src/test/java/org/wikidata/query/rdf/tool/RdfRepositoryForTesting.java
+++ 
b/tools/src/test/java/org/wikidata/query/rdf/tool/RdfRepositoryForTesting.java
@@ -2,6 +2,7 @@
 
 import java.net.URI;
 
+import org.apache.http.client.HttpClient;
 import org.junit.rules.TestRule;
 import org.junit.runner.Description;
 import org.junit.runners.model.Statement;
@@ -14,14 +15,8 @@
  */
 public class RdfRepositoryForTesting extends RdfRepository implements TestRule 
{
 
-    /**
-     * The namespace of the local RDF repository, e.g. "kb" or "wdq".
-     */
-    private final String namespace;
-
-    public RdfRepositoryForTesting(String namespace) {
-        super(url("/namespace/" + namespace + "/sparql"), 
WikibaseUris.WIKIDATA);
-        this.namespace = namespace;
+    public RdfRepositoryForTesting(String namespace, HttpClient httpClient) {
+        super(url("/namespace/" + namespace + "/sparql"), 
WikibaseUris.WIKIDATA, httpClient);
     }
 
     /**
@@ -109,8 +104,7 @@
     /**
      * Clear and close repository after test.
      */
-    private void after() throws Exception {
+    private void after() {
         clear();
-        close();
     }
 }
diff --git 
a/tools/src/test/java/org/wikidata/query/rdf/tool/http/OkOnlyResponseHandlerTest.java
 
b/tools/src/test/java/org/wikidata/query/rdf/tool/http/OkOnlyResponseHandlerTest.java
new file mode 100644
index 0000000..6194196
--- /dev/null
+++ 
b/tools/src/test/java/org/wikidata/query/rdf/tool/http/OkOnlyResponseHandlerTest.java
@@ -0,0 +1,92 @@
+package org.wikidata.query.rdf.tool.http;
+
+import static org.apache.http.HttpStatus.SC_CREATED;
+import static org.apache.http.HttpStatus.SC_OK;
+import static org.apache.http.HttpStatus.SC_SERVICE_UNAVAILABLE;
+import static org.apache.http.HttpVersion.HTTP_1_1;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.message.BasicStatusLine;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.wikidata.query.rdf.tool.exception.ContainedException;
+
+public class OkOnlyResponseHandlerTest {
+
+    private final MockHanlder handler = new MockHanlder();
+
+
+    @Test(expected = ContainedException.class)
+    public void exceptionRaisedWhenNotOk() throws IOException {
+        HttpResponse response = Mockito.mock(HttpResponse.class);
+
+        mockStatus(response, SC_CREATED);
+
+        handler.handleResponse(response);
+    }
+
+    @Test
+    public void bodyIsReturnedInException() throws IOException {
+        HttpResponse response = Mockito.mock(HttpResponse.class);
+
+        mockStatus(response, SC_SERVICE_UNAVAILABLE);
+        mockBody(response, "some body");
+
+        try {
+            handler.handleResponse(response);
+            fail();
+        } catch (ContainedException ce) {
+            assertThat(ce.getMessage(), containsString("some body"));
+        }
+    }
+
+    @Test
+    public void nullIsReturnedOnNullBody() throws IOException {
+        HttpResponse response = Mockito.mock(HttpResponse.class);
+
+        mockStatus(response, SC_OK);
+
+        Object result = handler.handleResponse(response);
+
+        assertNull(result);
+        assertFalse(handler.handleEntityCalled);
+    }
+
+    private void mockBody(HttpResponse response, String body) throws 
UnsupportedEncodingException {
+        when(response.getEntity())
+                .thenReturn(new StringEntity(body));
+    }
+
+    private void mockStatus(HttpResponse response, int statusCode) {
+        when(response.getStatusLine())
+                .thenReturn(new BasicStatusLine(HTTP_1_1, statusCode, 
"NOT_FOUND"));
+    }
+
+
+    private static class MockHanlder extends OkOnlyResponseHandler {
+
+        private boolean handleEntityCalled;
+
+        @Override
+        public String acceptHeader() {
+            return null;
+        }
+
+        @Override
+        public Object handleEntity(HttpEntity entity) {
+            handleEntityCalled = true;
+            return null;
+        }
+    }
+}
diff --git 
a/tools/src/test/java/org/wikidata/query/rdf/tool/rdf/RdfRepositoryIntegrationTest.java
 
b/tools/src/test/java/org/wikidata/query/rdf/tool/rdf/RdfRepositoryIntegrationTest.java
index 27b963f..5b9bfba 100644
--- 
a/tools/src/test/java/org/wikidata/query/rdf/tool/rdf/RdfRepositoryIntegrationTest.java
+++ 
b/tools/src/test/java/org/wikidata/query/rdf/tool/rdf/RdfRepositoryIntegrationTest.java
@@ -1,6 +1,7 @@
 package org.wikidata.query.rdf.tool.rdf;
 
 import static org.hamcrest.Matchers.allOf;
+import static org.wikidata.query.rdf.test.CloseableRule.autoClose;
 import static org.wikidata.query.rdf.test.Matchers.binds;
 import static org.wikidata.query.rdf.test.StatementHelper.siteLink;
 import static org.wikidata.query.rdf.test.StatementHelper.statement;
@@ -15,9 +16,11 @@
 import java.util.Locale;
 import java.util.Set;
 
+import org.apache.http.impl.client.CloseableHttpClient;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.RuleChain;
 import org.junit.runner.RunWith;
 import org.openrdf.model.BNode;
 import org.openrdf.model.Statement;
@@ -33,6 +36,7 @@
 import org.wikidata.query.rdf.common.uri.SchemaDotOrg;
 import org.wikidata.query.rdf.common.uri.WikibaseUris;
 import org.wikidata.query.rdf.common.uri.WikibaseUris.PropertyType;
+import org.wikidata.query.rdf.tool.HttpClientUtils;
 import org.wikidata.query.rdf.tool.RdfRepositoryForTesting;
 
 import com.carrotsearch.randomizedtesting.RandomizedRunner;
@@ -51,8 +55,12 @@
      */
     private final WikibaseUris uris = WikibaseUris.getURISystem();
 
+    private final CloseableHttpClient httpClient = 
HttpClientUtils.createHttpClient(true);
+    private final RdfRepositoryForTesting rdfRepository = new 
RdfRepositoryForTesting("wdq", httpClient);
+
     @Rule
-    public RdfRepositoryForTesting rdfRepository = new 
RdfRepositoryForTesting("wdq");
+    public RuleChain chain = RuleChain.outerRule(autoClose(httpClient))
+            .around(rdfRepository);
 
     @Before
     public void cleanList() {
diff --git 
a/tools/src/test/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepositoryIntegrationTest.java
 
b/tools/src/test/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepositoryIntegrationTest.java
index 0d4e258..061fd39 100644
--- 
a/tools/src/test/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepositoryIntegrationTest.java
+++ 
b/tools/src/test/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepositoryIntegrationTest.java
@@ -35,7 +35,7 @@
 public class WikibaseRepositoryIntegrationTest extends RandomizedTest {
     private static final String HOST = "test.wikidata.org";
     @Rule
-    public final CloseableRule<CloseableHttpClient> httpClient = 
autoClose(HttpClientUtils.createHttpClient());
+    public final CloseableRule<CloseableHttpClient> httpClient = 
autoClose(HttpClientUtils.createHttpClient(false));
     private final WikibaseRepository repo = new WikibaseRepository(new 
Uris("https", HOST), httpClient.get());
     private final WikibaseRepository proxyRepo = new WikibaseRepository(new 
Uris("http", "localhost", 8812), httpClient.get());
     private final WikibaseUris uris = new WikibaseUris(HOST);
@@ -138,7 +138,7 @@
     @Test
     public void fetchIsNormalized() throws RetryableException, 
ContainedException, IOException {
         long now = System.currentTimeMillis();
-        try (CloseableHttpClient httpClient = 
HttpClientUtils.createHttpClient()) {
+        try (CloseableHttpClient httpClient = 
HttpClientUtils.createHttpClient(true)) {
             WikibaseRepository proxyRepo = new WikibaseRepository(new 
Uris("http", "localhost", 8812), httpClient);
             String entityId = 
repo.firstEntityIdForLabelStartingWith("QueryTestItem", "en", "item");
             repo.setLabel(entityId, "item", "QueryTestItem" + now, "en");
diff --git 
a/tools/src/test/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepositoryWireIntegrationTest.java
 
b/tools/src/test/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepositoryWireIntegrationTest.java
index 9a9abf4..31275b3 100644
--- 
a/tools/src/test/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepositoryWireIntegrationTest.java
+++ 
b/tools/src/test/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepositoryWireIntegrationTest.java
@@ -32,7 +32,7 @@
 public class WikibaseRepositoryWireIntegrationTest {
 
     @Rule
-    public final CloseableRule<CloseableHttpClient> httpClient = 
autoClose(HttpClientUtils.createHttpClient());
+    public final CloseableRule<CloseableHttpClient> httpClient = 
autoClose(HttpClientUtils.createHttpClient(false));
 
     @Rule public WireMockRule wireMockRule = new WireMockRule(wireMockConfig()
             .dynamicPort()

-- 
To view, visit https://gerrit.wikimedia.org/r/402753
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I2ea367e3f7c77338d487793fe2b9982b8538b017
Gerrit-PatchSet: 8
Gerrit-Project: wikidata/query/rdf
Gerrit-Branch: master
Gerrit-Owner: Gehel <[email protected]>
Gerrit-Reviewer: Gehel <[email protected]>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to