NIFI-2417: Adding Query and Scroll processors for Elasticsearch

Signed-off-by: Matt Burgess <mattyb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/00412f6e
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/00412f6e
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/00412f6e

Branch: refs/heads/master
Commit: 00412f6e97fdc077cf9c8c0d365ebe4876a33305
Parents: e973874
Author: Joe Gresock <joseph.gres...@lmco.com>
Authored: Sun Aug 28 10:09:32 2016 +0000
Committer: Matt Burgess <mattyb...@apache.org>
Committed: Thu Sep 22 15:00:06 2016 -0400

----------------------------------------------------------------------
 .../nifi-elasticsearch-processors/pom.xml       |   2 +-
 .../elasticsearch/QueryElasticsearchHttp.java   | 410 +++++++++++++++++
 .../elasticsearch/RetryableException.java       |  42 ++
 .../elasticsearch/ScrollElasticsearchHttp.java  | 415 +++++++++++++++++
 .../elasticsearch/UnretryableException.java     |  43 ++
 .../org.apache.nifi.processor.Processor         |   2 +
 .../elasticsearch/ITQueryElasticsearchHttp.java |  94 ++++
 .../ITScrollElasticsearchHttp.java              |  64 +++
 .../TestQueryElasticsearchHttp.java             | 443 +++++++++++++++++++
 .../TestScrollElasticsearchHttp.java            | 398 +++++++++++++++++
 .../src/test/resources/query-page1.json         |  57 +++
 .../src/test/resources/query-page2.json         |  36 ++
 .../src/test/resources/query-page3.json         |  14 +
 .../src/test/resources/scroll-page1.json        |  56 +++
 .../src/test/resources/scroll-page2.json        |  36 ++
 .../src/test/resources/scroll-page3.json        |  15 +
 16 files changed, 2126 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/00412f6e/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml
index 4766815..bd47b7d 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml
@@ -92,7 +92,7 @@ language governing permissions and limitations under the 
License. -->
                 <artifactId>apache-rat-plugin</artifactId>
                 <configuration>
                     <excludes combine.children="append">
-                        
<exclude>src/test/resources/DocumentExample.json</exclude>
+                        <exclude>src/test/resources/*.json</exclude>
                     </excludes>
                 </configuration>
             </plugin>

http://git-wip-us.apache.org/repos/asf/nifi/blob/00412f6e/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
new file mode 100644
index 0000000..ba3969f
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.ByteArrayInputStream;
+import org.codehaus.jackson.JsonNode;
+
+import okhttp3.HttpUrl;
+import okhttp3.OkHttpClient;
+import okhttp3.Response;
+import okhttp3.ResponseBody;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@EventDriven
+@SupportsBatching
+@Tags({ "elasticsearch", "query", "read", "get", "http" })
+@CapabilityDescription("Queries Elasticsearch using the specified connection 
properties. "
+        + "Note that the full body of each page of documents will be read into 
memory before being "
+        + "written to Flow Files for transfer.  Also note that the 
Elasticsearch max_result_window index "
+        + "setting is the upper bound on the number of records that can be 
retrieved using this query.  "
+        + "To retrieve more records, use the ScrollElasticsearchHttp 
processor.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "filename", description = "The filename 
attribute is set to the document identifier"),
+        @WritesAttribute(attribute = "es.index", description = "The 
Elasticsearch index containing the document"),
+        @WritesAttribute(attribute = "es.type", description = "The 
Elasticsearch document type"),
+        @WritesAttribute(attribute = "es.result.*", description = "If Target 
is 'Flow file attributes', the JSON attributes of "
+                + "each result will be placed into corresponding attributes 
with this prefix.") })
+public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor 
{
+
+    private static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include";
+    private static final String QUERY_QUERY_PARAM = "q";
+    private static final String SORT_QUERY_PARAM = "sort";
+    private static final String FROM_QUERY_PARAM = "from";
+    private static final String SIZE_QUERY_PARAM = "size";
+
+    public static final String TARGET_FLOW_FILE_CONTENT = "Flow file content";
+    public static final String TARGET_FLOW_FILE_ATTRIBUTES = "Flow file 
attributes";
+    private static final String ATTRIBUTE_PREFIX = "es.result.";
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description(
+                    "All FlowFiles that are read from Elasticsearch are routed 
to this relationship.")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description(
+                    "All FlowFiles that cannot be read from Elasticsearch are 
routed to this relationship. Note that only incoming "
+                            + "flow files will be routed to failure.").build();
+
+    public static final Relationship REL_RETRY = new Relationship.Builder()
+            .name("retry")
+            .description(
+                    "A FlowFile is routed to this relationship if the document 
cannot be fetched but attempting the operation again may "
+                            + "succeed. Note that if the processor has no 
incoming connections, flow files may still be sent to this relationship "
+                            + "based on the processor properties and the 
results of the fetch operation.")
+            .build();
+
+    public static final PropertyDescriptor QUERY = new 
PropertyDescriptor.Builder()
+            .name("query-es-query").displayName("Query")
+            .description("The Lucene-style query to run against 
ElasticSearch").required(true)
+            
.expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor INDEX = new 
PropertyDescriptor.Builder()
+            .name("query-es-index").displayName("Index")
+            .description("The name of the index to read from").required(true)
+            
.expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor TYPE = new 
PropertyDescriptor.Builder()
+            .name("query-es-type")
+            .displayName("Type")
+            .description(
+                    "The (optional) type of this document, used by 
Elasticsearch for indexing and searching. If the property is empty or set "
+                            + "to _all, the first document matching the 
identifier across all types will be retrieved.")
+            .required(false).expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+    public static final PropertyDescriptor FIELDS = new 
PropertyDescriptor.Builder()
+            .name("query-es-fields")
+            .displayName("Fields")
+            .description(
+                    "A comma-separated list of fields to retrieve from the 
document. If the Fields property is left blank, "
+                            + "then the entire document's source will be 
retrieved.")
+            .required(false).expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+    public static final PropertyDescriptor SORT = new 
PropertyDescriptor.Builder()
+            .name("query-es-sort")
+            .displayName("Sort")
+            .description(
+                    "A sort parameter (e.g., timestamp:asc). If the Sort 
property is left blank, "
+                            + "then the results will be retrieved in document 
order.")
+            .required(false).expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+    public static final PropertyDescriptor PAGE_SIZE = new 
PropertyDescriptor.Builder()
+            .name("query-es-size").displayName("Page Size").defaultValue("20")
+            .description("Determines how many documents to return per page 
during scrolling.")
+            .required(true).expressionLanguageSupported(true)
+            
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
+
+    public static final PropertyDescriptor LIMIT = new 
PropertyDescriptor.Builder()
+            .name("query-es-limit").displayName("Limit")
+            .description("If set, limits the number of results that will be 
returned.")
+            .required(false).expressionLanguageSupported(true)
+            
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
+
+    public static final PropertyDescriptor TARGET = new 
PropertyDescriptor.Builder()
+            .name("query-es-target")
+            .displayName("Target")
+            .description(
+                    "Indicates where the results should be placed.  In the 
case of 'Flow file content', the JSON "
+                            + "response will be written as the content of the 
flow file.  In the case of 'Flow file attributes', "
+                            + "the original flow file (if applicable) will be 
cloned for each result, and all return fields will be placed "
+                            + "in a flow file attribute of the same name, but 
prefixed by 'es.result.'")
+            .required(true).expressionLanguageSupported(false)
+            .defaultValue(TARGET_FLOW_FILE_CONTENT)
+            .allowableValues(TARGET_FLOW_FILE_CONTENT, 
TARGET_FLOW_FILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        relationships.add(REL_RETRY);
+        return Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(ES_URL);
+        descriptors.add(PROP_SSL_CONTEXT_SERVICE);
+        descriptors.add(USERNAME);
+        descriptors.add(PASSWORD);
+        descriptors.add(CONNECT_TIMEOUT);
+        descriptors.add(RESPONSE_TIMEOUT);
+        descriptors.add(QUERY);
+        descriptors.add(PAGE_SIZE);
+        descriptors.add(INDEX);
+        descriptors.add(TYPE);
+        descriptors.add(FIELDS);
+        descriptors.add(SORT);
+        descriptors.add(LIMIT);
+        descriptors.add(TARGET);
+
+        return Collections.unmodifiableList(descriptors);
+    }
+
+    @OnScheduled
+    public void setup(ProcessContext context) {
+        super.setup(context);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session)
+            throws ProcessException {
+
+        FlowFile flowFile = null;
+        if (context.hasIncomingConnection()) {
+            flowFile = session.get();
+
+            // If we have no FlowFile, and all incoming connections are 
self-loops then we can
+            // continue on.
+            // However, if we have no FlowFile and we have connections coming 
from other Processors,
+            // then
+            // we know that we should run only if we have a FlowFile.
+            if (flowFile == null && context.hasNonLoopConnection()) {
+                return;
+            }
+        }
+
+        OkHttpClient okHttpClient = getClient();
+
+        final String index = 
context.getProperty(INDEX).evaluateAttributeExpressions(flowFile)
+                .getValue();
+        final String query = 
context.getProperty(QUERY).evaluateAttributeExpressions(flowFile)
+                .getValue();
+        final String docType = 
context.getProperty(TYPE).evaluateAttributeExpressions(flowFile)
+                .getValue();
+        final int pageSize = 
context.getProperty(PAGE_SIZE).evaluateAttributeExpressions(flowFile)
+                .asInteger().intValue();
+        final Integer limit = context.getProperty(LIMIT).isSet() ? 
context.getProperty(LIMIT)
+                .evaluateAttributeExpressions(flowFile).asInteger().intValue() 
: null;
+        final String fields = context.getProperty(FIELDS).isSet() ? 
context.getProperty(FIELDS)
+                .evaluateAttributeExpressions(flowFile).getValue() : null;
+        final String sort = context.getProperty(SORT).isSet() ? 
context.getProperty(SORT)
+                .evaluateAttributeExpressions(flowFile).getValue() : null;
+        final boolean targetIsContent = context.getProperty(TARGET).getValue()
+                .equals(TARGET_FLOW_FILE_CONTENT);
+
+        // Authentication
+        final String username = context.getProperty(USERNAME).getValue();
+        final String password = context.getProperty(PASSWORD).getValue();
+
+        final ComponentLog logger = getLogger();
+
+        int fromIndex = 0;
+        int numResults;
+
+        try {
+            logger.debug("Querying {}/{} from Elasticsearch: {}", new Object[] 
{ index, docType,
+                    query });
+
+            final long startNanos = System.nanoTime();
+            // read the url property from the context
+            final String urlstr = 
StringUtils.trimToEmpty(context.getProperty(ES_URL).getValue());
+
+            boolean hitLimit = false;
+            do {
+                int mPageSize = pageSize;
+                if (limit != null && limit <= (fromIndex + pageSize)) {
+                    mPageSize = limit - fromIndex;
+                    hitLimit = true;
+                }
+
+                final URL queryUrl = buildRequestURL(urlstr, query, index, 
docType, fields, sort,
+                        mPageSize, fromIndex);
+
+                final Response getResponse = 
sendRequestToElasticsearch(okHttpClient, queryUrl,
+                        username, password, "GET", null);
+                numResults = this.getPage(getResponse, queryUrl, context, 
session, flowFile,
+                        logger, startNanos, targetIsContent);
+                fromIndex += pageSize;
+            } while (numResults > 0 && !hitLimit);
+
+            if (flowFile != null) {
+                session.remove(flowFile);
+            }
+        } catch (IOException ioe) {
+            logger.error(
+                    "Failed to read from Elasticsearch due to {}, this may 
indicate an error in configuration "
+                            + "(hosts, username/password, etc.). Routing to 
retry",
+                    new Object[] { ioe.getLocalizedMessage() }, ioe);
+            if (flowFile != null) {
+                session.transfer(flowFile, REL_RETRY);
+            }
+            context.yield();
+
+        } catch (RetryableException e) {
+            logger.error(e.getMessage(), new Object[] { 
e.getLocalizedMessage() }, e);
+            if (flowFile != null) {
+                session.transfer(flowFile, REL_RETRY);
+            }
+            context.yield();
+        } catch (Exception e) {
+            logger.error("Failed to read {} from Elasticsearch due to {}", new 
Object[] { flowFile,
+                    e.getLocalizedMessage() }, e);
+            if (flowFile != null) {
+                session.transfer(flowFile, REL_FAILURE);
+            }
+            context.yield();
+        }
+    }
+
+    private int getPage(final Response getResponse, final URL url, final 
ProcessContext context,
+            final ProcessSession session, FlowFile flowFile, final 
ComponentLog logger,
+            final long startNanos, boolean targetIsContent)
+            throws IOException {
+        List<FlowFile> page = new ArrayList<>();
+        final int statusCode = getResponse.code();
+
+        if (isSuccess(statusCode)) {
+            ResponseBody body = getResponse.body();
+            final byte[] bodyBytes = body.bytes();
+            JsonNode responseJson = parseJsonResponse(new 
ByteArrayInputStream(bodyBytes));
+            JsonNode hits = responseJson.get("hits").get("hits");
+
+            for(int i = 0; i < hits.size(); i++) {
+                JsonNode hit = hits.get(i);
+                String retrievedId = hit.get("_id").asText();
+                String retrievedIndex = hit.get("_index").asText();
+                String retrievedType = hit.get("_type").asText();
+
+                FlowFile documentFlowFile = null;
+                if (flowFile != null) {
+                    documentFlowFile = targetIsContent ? 
session.create(flowFile) : session.clone(flowFile);
+                } else {
+                    documentFlowFile = session.create();
+                }
+
+                JsonNode source = hit.get("_source");
+                documentFlowFile = session.putAttribute(documentFlowFile, 
"es.index", retrievedIndex);
+                documentFlowFile = session.putAttribute(documentFlowFile, 
"es.type", retrievedType);
+
+                if (targetIsContent) {
+                    documentFlowFile = session.putAttribute(documentFlowFile, 
"filename", retrievedId);
+                    documentFlowFile = session.write(documentFlowFile, out -> {
+                        out.write(source.toString().getBytes());
+                    });
+                } else {
+                    Map<String, String> attributes = new HashMap<>();
+                    for(Iterator<Entry<String, JsonNode>> it = 
source.getFields(); it.hasNext(); ) {
+                        Entry<String, JsonNode> entry = it.next();
+                        attributes.put(ATTRIBUTE_PREFIX + entry.getKey(), 
entry.getValue().asText());
+                    }
+                    documentFlowFile = 
session.putAllAttributes(documentFlowFile, attributes);
+                }
+                page.add(documentFlowFile);
+            }
+            logger.debug("Elasticsearch retrieved " + responseJson.size() + " 
documents, routing to success");
+
+            session.transfer(page, REL_SUCCESS);
+        } else {
+            try {
+                // 5xx -> RETRY, but a server error might last a while, so 
yield
+                if (statusCode / 100 == 5) {
+                    throw new RetryableException(String.format("Elasticsearch 
returned code %s with message %s, transferring flow file to retry. This is 
likely a server problem, yielding...",
+                            statusCode, getResponse.message()));
+                } else if (context.hasIncomingConnection()) {  // 1xx, 3xx, 
4xx -> NO RETRY
+                    throw new 
UnretryableException(String.format("Elasticsearch returned code %s with message 
%s, transferring flow file to failure",
+                            statusCode, getResponse.message()));
+                } else {
+                    logger.warn("Elasticsearch returned code {} with message 
{}", new Object[]{statusCode, getResponse.message()});
+                }
+            } finally {
+                if (!page.isEmpty()) {
+                    session.remove(page);
+                    page.clear();
+                }
+            }
+        }
+
+        // emit provenance event
+        final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startNanos);
+        if (!page.isEmpty()) {
+            if (context.hasNonLoopConnection()) {
+                page.forEach(f -> session.getProvenanceReporter().fetch(f, 
url.toExternalForm(), millis));
+            } else {
+                page.forEach(f -> session.getProvenanceReporter().receive(f, 
url.toExternalForm(), millis));
+            }
+        }
+        return page.size();
+    }
+
+    private URL buildRequestURL(String baseUrl, String query, String index, 
String type, String fields,
+            String sort, int pageSize, int fromIndex) throws 
MalformedURLException {
+        if (StringUtils.isEmpty(baseUrl)) {
+            throw new MalformedURLException("Base URL cannot be null");
+        }
+        HttpUrl.Builder builder = HttpUrl.parse(baseUrl).newBuilder();
+        builder.addPathSegment(index);
+        builder.addPathSegment((StringUtils.isEmpty(type)) ? "_all" : type);
+        builder.addPathSegment("_search");
+        builder.addQueryParameter(QUERY_QUERY_PARAM, query);
+        builder.addQueryParameter(SIZE_QUERY_PARAM, String.valueOf(pageSize));
+        builder.addQueryParameter(FROM_QUERY_PARAM, String.valueOf(fromIndex));
+        if (!StringUtils.isEmpty(fields)) {
+            String trimmedFields = 
Stream.of(fields.split(",")).map(String::trim).collect(Collectors.joining(","));
+            builder.addQueryParameter(FIELD_INCLUDE_QUERY_PARAM, 
trimmedFields);
+        }
+        if (!StringUtils.isEmpty(sort)) {
+            String trimmedFields = 
Stream.of(sort.split(",")).map(String::trim).collect(Collectors.joining(","));
+            builder.addQueryParameter(SORT_QUERY_PARAM, trimmedFields);
+        }
+
+        return builder.build().url();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/00412f6e/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/RetryableException.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/RetryableException.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/RetryableException.java
new file mode 100644
index 0000000..8e94145
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/RetryableException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+/**
+ * Represents a retryable exception from ElasticSearch.
+ */
+public class RetryableException extends RuntimeException {
+
+    private static final long serialVersionUID = -2755015600102381620L;
+
+    public RetryableException() {
+        super();
+    }
+
+    public RetryableException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public RetryableException(String message) {
+        super(message);
+    }
+
+    public RetryableException(Throwable cause) {
+        super(cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/00412f6e/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
new file mode 100644
index 0000000..accdca9
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.ByteArrayInputStream;
+import org.codehaus.jackson.JsonNode;
+
+import okhttp3.HttpUrl;
+import okhttp3.OkHttpClient;
+import okhttp3.Response;
+import okhttp3.ResponseBody;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@EventDriven
+@SupportsBatching
+@Tags({ "elasticsearch", "query", "scroll", "read", "get", "http" })
+@CapabilityDescription("Scrolls through an Elasticsearch query using the 
specified connection properties. "
+        + "This processor is intended to be run on the primary node, and is 
designed for scrolling through "
+        + "huge result sets, as in the case of a reindex.  The state must be 
cleared before another query "
+        + "can be run.  Each page of results is returned, wrapped in a JSON 
object like so: { \"hits\" : [ <doc1>, <doc2>, <docn> ] }.  "
+        + "Note that the full body of each page of documents will be read into 
memory before being "
+        + "written to a Flow File for transfer.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "es.index", description = "The 
Elasticsearch index containing the document"),
+        @WritesAttribute(attribute = "es.type", description = "The 
Elasticsearch document type") })
+@Stateful(description = "After each successful scroll page, the latest 
scroll_id is persisted in scrollId as input for the next scroll call.  "
+        + "Once the entire query is complete, finishedQuery state will be set 
to true, and the processor will not execute unless this is cleared.", scopes = 
{ Scope.LOCAL })
+public class ScrollElasticsearchHttp extends 
AbstractElasticsearchHttpProcessor {
+
+    private static final String FINISHED_QUERY_STATE = "finishedQuery";
+    private static final String SCROLL_ID_STATE = "scrollId";
+    private static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include";
+    private static final String QUERY_QUERY_PARAM = "q";
+    private static final String SORT_QUERY_PARAM = "sort";
+    private static final String SCROLL_QUERY_PARAM = "scroll";
+    private static final String SCROLL_ID_QUERY_PARAM = "scroll_id";
+    private static final String SIZE_QUERY_PARAM = "size";
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description(
+                    "All FlowFiles that are read from Elasticsearch are routed 
to this relationship.")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description(
+                    "All FlowFiles that cannot be read from Elasticsearch are 
routed to this relationship. Note that only incoming "
+                            + "flow files will be routed to failure.").build();
+
+    public static final PropertyDescriptor QUERY = new 
PropertyDescriptor.Builder()
+            .name("scroll-es-query").displayName("Query")
+            .description("The Lucene-style query to run against 
ElasticSearch").required(true)
+            
.expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor SCROLL_DURATION = new 
PropertyDescriptor.Builder()
+            .name("scroll-es-scroll")
+            .displayName("Scroll Duration")
+            .description("The scroll duration is how long each search context 
is kept in memory.")
+            .defaultValue("1m")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(
+                    
StandardValidators.createRegexMatchingValidator(Pattern.compile("[0-9]+(m|h)")))
+            .build();
+
+    public static final PropertyDescriptor INDEX = new 
PropertyDescriptor.Builder()
+            .name("scroll-es-index").displayName("Index")
+            .description("The name of the index to read from").required(true)
+            
.expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor TYPE = new 
PropertyDescriptor.Builder()
+            .name("scroll-es-type")
+            .displayName("Type")
+            .description(
+                    "The (optional) type of this document, used by 
Elasticsearch for indexing and searching. If the property is empty or set "
+                            + "to _all, the first document matching the 
identifier across all types will be retrieved.")
+            .required(false).expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+    public static final PropertyDescriptor FIELDS = new 
PropertyDescriptor.Builder()
+            .name("scroll-es-fields")
+            .displayName("Fields")
+            .description(
+                    "A comma-separated list of fields to retrieve from the 
document. If the Fields property is left blank, "
+                            + "then the entire document's source will be 
retrieved.")
+            .required(false).expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+    public static final PropertyDescriptor SORT = new 
PropertyDescriptor.Builder()
+            .name("scroll-es-sort")
+            .displayName("Sort")
+            .description(
+                    "A sort parameter (e.g., timestamp:asc). If the Sort 
property is left blank, "
+                            + "then the results will be retrieved in document 
order.")
+            .required(false).expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+    public static final PropertyDescriptor PAGE_SIZE = new 
PropertyDescriptor.Builder()
+            .name("scroll-es-size").displayName("Page Size").defaultValue("20")
+            .description("Determines how many documents to return per page 
during scrolling.")
+            .required(true).expressionLanguageSupported(true)
+            
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        return Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(ES_URL);
+        descriptors.add(PROP_SSL_CONTEXT_SERVICE);
+        descriptors.add(USERNAME);
+        descriptors.add(PASSWORD);
+        descriptors.add(CONNECT_TIMEOUT);
+        descriptors.add(RESPONSE_TIMEOUT);
+        descriptors.add(QUERY);
+        descriptors.add(SCROLL_DURATION);
+        descriptors.add(PAGE_SIZE);
+        descriptors.add(INDEX);
+        descriptors.add(TYPE);
+        descriptors.add(FIELDS);
+        descriptors.add(SORT);
+
+        return Collections.unmodifiableList(descriptors);
+    }
+
+    @OnScheduled
+    public void setup(ProcessContext context) {
+        super.setup(context);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session)
+            throws ProcessException {
+
+        try {
+            if (isQueryFinished(context.getStateManager())) {
+                getLogger().trace(
+                        "Query has been marked finished in the state manager.  
"
+                                + "To run another query, clear the state.");
+                return;
+            }
+        } catch (IOException e) {
+            throw new ProcessException("Could not retrieve state", e);
+        }
+
+        OkHttpClient okHttpClient = getClient();
+
+        FlowFile flowFile = session.create();
+
+        final String index = 
context.getProperty(INDEX).evaluateAttributeExpressions(flowFile)
+                .getValue();
+        final String query = 
context.getProperty(QUERY).evaluateAttributeExpressions(flowFile)
+                .getValue();
+        final String docType = 
context.getProperty(TYPE).evaluateAttributeExpressions(flowFile)
+                .getValue();
+        final int pageSize = 
context.getProperty(PAGE_SIZE).evaluateAttributeExpressions(flowFile)
+                .asInteger().intValue();
+        final String fields = context.getProperty(FIELDS).isSet() ? 
context.getProperty(FIELDS)
+                .evaluateAttributeExpressions(flowFile).getValue() : null;
+        final String sort = context.getProperty(SORT).isSet() ? 
context.getProperty(SORT)
+                .evaluateAttributeExpressions(flowFile).getValue() : null;
+        final String scroll = context.getProperty(SCROLL_DURATION).isSet() ? 
context
+                
.getProperty(SCROLL_DURATION).evaluateAttributeExpressions(flowFile).getValue() 
: null;
+
+        // Authentication
+        final String username = context.getProperty(USERNAME).getValue();
+        final String password = context.getProperty(PASSWORD).getValue();
+
+        final ComponentLog logger = getLogger();
+
+        try {
+            String scrollId = loadScrollId(context.getStateManager());
+
+            if (scrollId != null) {
+                // read the url property from the context
+                final String urlstr = 
StringUtils.trimToEmpty(context.getProperty(ES_URL)
+                        .getValue());
+                final URL scrollurl = buildRequestURL(urlstr, query, index, 
docType, fields, sort,
+                        scrollId, pageSize, scroll);
+                final long startNanos = System.nanoTime();
+
+                final Response getResponse = 
sendRequestToElasticsearch(okHttpClient, scrollurl,
+                        username, password, "GET", null);
+                this.getPage(getResponse, scrollurl, context, session, 
flowFile, logger, startNanos);
+            } else {
+                logger.debug("Querying {}/{} from Elasticsearch: {}", new 
Object[] { index,
+                        docType, query });
+
+                // read the url property from the context
+                final String urlstr = 
StringUtils.trimToEmpty(context.getProperty(ES_URL)
+                        .getValue());
+                final URL queryUrl = buildRequestURL(urlstr, query, index, 
docType, fields, sort,
+                        scrollId, pageSize, scroll);
+                final long startNanos = System.nanoTime();
+
+                final Response getResponse = 
sendRequestToElasticsearch(okHttpClient, queryUrl,
+                        username, password, "GET", null);
+                this.getPage(getResponse, queryUrl, context, session, 
flowFile, logger, startNanos);
+            }
+
+        } catch (IOException ioe) {
+            logger.error(
+                    "Failed to read from Elasticsearch due to {}, this may 
indicate an error in configuration "
+                            + "(hosts, username/password, etc.).",
+                    new Object[] { ioe.getLocalizedMessage() }, ioe);
+            session.remove(flowFile);
+            context.yield();
+
+        } catch (Exception e) {
+            logger.error("Failed to read {} from Elasticsearch due to {}", new 
Object[] { flowFile,
+                    e.getLocalizedMessage() }, e);
+            session.transfer(flowFile, REL_FAILURE);
+            context.yield();
+        }
+    }
+
+    private void getPage(final Response getResponse, final URL url, final 
ProcessContext context,
+            final ProcessSession session, FlowFile flowFile, final 
ComponentLog logger, final long startNanos)
+            throws IOException {
+        final int statusCode = getResponse.code();
+
+        if (isSuccess(statusCode)) {
+            ResponseBody body = getResponse.body();
+            final byte[] bodyBytes = body.bytes();
+            JsonNode responseJson = parseJsonResponse(new 
ByteArrayInputStream(bodyBytes));
+            String scrollId = responseJson.get("_scroll_id").asText();
+
+            StringBuilder builder = new StringBuilder();
+
+            builder.append("{ \"hits\" : [");
+
+            JsonNode hits = responseJson.get("hits").get("hits");
+            if (hits.size() == 0) {
+                finishQuery(context.getStateManager());
+                session.remove(flowFile);
+                return;
+            }
+
+            for(int i = 0; i < hits.size(); i++) {
+                JsonNode hit = hits.get(i);
+                String retrievedIndex = hit.get("_index").asText();
+                String retrievedType = hit.get("_type").asText();
+
+                JsonNode source = hit.get("_source");
+                flowFile = session.putAttribute(flowFile, "es.index", 
retrievedIndex);
+                flowFile = session.putAttribute(flowFile, "es.type", 
retrievedType);
+
+                builder.append(source.toString());
+                if (i < hits.size() - 1) {
+                    builder.append(", ");
+                }
+            }
+            builder.append("] }");
+            logger.debug("Elasticsearch retrieved " + responseJson.size() + " 
documents, routing to success");
+
+            flowFile = session.write(flowFile, out -> {
+                out.write(builder.toString().getBytes());
+            });
+            session.transfer(flowFile, REL_SUCCESS);
+
+            saveScrollId(context.getStateManager(), scrollId);
+
+            // emit provenance event
+            final long millis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+            session.getProvenanceReporter().receive(flowFile, 
url.toExternalForm(), millis);
+        } else {
+            // 5xx -> RETRY, but a server error might last a while, so yield
+            if (statusCode / 100 == 5) {
+
+                logger.warn("Elasticsearch returned code {} with message {}, 
removing the flow file. This is likely a server problem, yielding...",
+                        new Object[]{statusCode, getResponse.message()});
+                session.remove(flowFile);
+                context.yield();
+            }  else {
+                logger.warn("Elasticsearch returned code {} with message {}", 
new Object[]{statusCode, getResponse.message()});
+                session.remove(flowFile);
+            }
+        }
+    }
+
+    private boolean isQueryFinished(StateManager stateManager) throws 
IOException {
+        final StateMap stateMap = stateManager.getState(Scope.LOCAL);
+
+        if (stateMap.getVersion() < 0) {
+            getLogger().debug("No previous state found");
+            return false;
+        }
+
+        final String isQueryFinished = stateMap.get(FINISHED_QUERY_STATE);
+        getLogger().debug("Loaded state with finishedQuery = {}", new Object[] 
{ isQueryFinished });
+
+        return "true".equals(isQueryFinished);
+    }
+
+    private String loadScrollId(StateManager stateManager) throws IOException {
+        final StateMap stateMap = stateManager.getState(Scope.LOCAL);
+
+        if (stateMap.getVersion() < 0) {
+            getLogger().debug("No previous state found");
+            return null;
+        }
+
+        final String scrollId = stateMap.get(SCROLL_ID_STATE);
+        getLogger().debug("Loaded state with scrollId {}", new Object[] { 
scrollId });
+
+        return scrollId;
+    }
+
+    private void finishQuery(StateManager stateManager) throws IOException {
+
+        Map<String, String> state = new HashMap<>(2);
+        state.put(FINISHED_QUERY_STATE, "true");
+
+        getLogger().debug("Saving state with finishedQuery = true");
+        stateManager.setState(state, Scope.LOCAL);
+    }
+
+    private void saveScrollId(StateManager stateManager, String scrollId) 
throws IOException {
+
+        Map<String, String> state = new HashMap<>(2);
+        state.put(SCROLL_ID_STATE, scrollId);
+
+        getLogger().debug("Saving state with scrollId of {}", new Object[] { 
scrollId });
+        stateManager.setState(state, Scope.LOCAL);
+    }
+
+    private URL buildRequestURL(String baseUrl, String query, String index, 
String type, String fields,
+            String sort, String scrollId, int pageSize, String scroll) throws 
MalformedURLException {
+        if (StringUtils.isEmpty(baseUrl)) {
+            throw new MalformedURLException("Base URL cannot be null");
+        }
+        HttpUrl.Builder builder = HttpUrl.parse(baseUrl).newBuilder();
+        if (!StringUtils.isEmpty(scrollId)) {
+            builder.addPathSegment("_search");
+            builder.addPathSegment("scroll");
+            builder.addQueryParameter(SCROLL_ID_QUERY_PARAM, scrollId);
+        } else {
+            builder.addPathSegment(index);
+            builder.addPathSegment((StringUtils.isEmpty(type)) ? "_all" : 
type);
+            builder.addPathSegment("_search");
+            builder.addQueryParameter(QUERY_QUERY_PARAM, query);
+            builder.addQueryParameter(SIZE_QUERY_PARAM, 
String.valueOf(pageSize));
+            if (!StringUtils.isEmpty(fields)) {
+                String trimmedFields = 
Stream.of(fields.split(",")).map(String::trim).collect(Collectors.joining(","));
+                builder.addQueryParameter(FIELD_INCLUDE_QUERY_PARAM, 
trimmedFields);
+            }
+            if (!StringUtils.isEmpty(sort)) {
+                String trimmedFields = 
Stream.of(sort.split(",")).map(String::trim).collect(Collectors.joining(","));
+                builder.addQueryParameter(SORT_QUERY_PARAM, trimmedFields);
+            }
+        }
+        builder.addQueryParameter(SCROLL_QUERY_PARAM, scroll);
+
+        return builder.build().url();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/00412f6e/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/UnretryableException.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/UnretryableException.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/UnretryableException.java
new file mode 100644
index 0000000..bae83cf
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/UnretryableException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+/**
+ * Represents an unrecoverable error from ElasticSearch.
+ * @author jgresock
+ *
+ */
+public class UnretryableException extends RuntimeException {
+    private static final long serialVersionUID = -4528006567211380914L;
+
+    public UnretryableException() {
+        super();
+    }
+
+    public UnretryableException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public UnretryableException(String message) {
+        super(message);
+    }
+
+    public UnretryableException(Throwable cause) {
+        super(cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/00412f6e/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 782f87e..a6cd087 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -16,3 +16,5 @@ org.apache.nifi.processors.elasticsearch.FetchElasticsearch
 org.apache.nifi.processors.elasticsearch.PutElasticsearch
 org.apache.nifi.processors.elasticsearch.FetchElasticsearchHttp
 org.apache.nifi.processors.elasticsearch.PutElasticsearchHttp
+org.apache.nifi.processors.elasticsearch.QueryElasticsearchHttp
+org.apache.nifi.processors.elasticsearch.ScrollElasticsearchHttp

http://git-wip-us.apache.org/repos/asf/nifi/blob/00412f6e/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITQueryElasticsearchHttp.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITQueryElasticsearchHttp.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITQueryElasticsearchHttp.java
new file mode 100644
index 0000000..15f2707
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITQueryElasticsearchHttp.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Test;
+
+public class ITQueryElasticsearchHttp {
+
+    private TestRunner runner;
+
+    @After
+    public void teardown() {
+        runner = null;
+    }
+
+    @Test
+    public void testFetchElasticsearchOnTrigger() throws IOException {
+        runner = TestRunners.newTestRunner(QueryElasticsearchHttp.class); // 
all docs are found
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL,
+                "http://localhost.internal:9200";);
+
+        runner.setProperty(QueryElasticsearchHttp.INDEX, "prod-accounting");
+        runner.assertNotValid();
+        runner.setProperty(QueryElasticsearchHttp.TYPE, "provenance");
+        runner.assertNotValid();
+        runner.setProperty(QueryElasticsearchHttp.QUERY,
+                "identifier:2f79eba8839f5976cd0b1e16a0e7fe8d7dd0ceca");
+        runner.setProperty(QueryElasticsearchHttp.SORT, "timestamp:asc");
+        runner.setProperty(QueryElasticsearchHttp.FIELDS, 
"transit_uri,version");
+        runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "1");
+        runner.assertValid();
+
+        runner.setIncomingConnection(false);
+        runner.run(1, true, true);
+
+        
runner.assertAllFlowFilesTransferred(QueryElasticsearchHttp.REL_SUCCESS, 3);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(
+                QueryElasticsearchHttp.REL_SUCCESS).get(0);
+        assertNotNull(out);
+    }
+
+    @Test
+    public void testFetchElasticsearchOnTrigger_IncomingFile() throws 
IOException {
+        runner = TestRunners.newTestRunner(QueryElasticsearchHttp.class); // 
all docs are found
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL,
+                "http://localhost.internal:9200";);
+
+        runner.setProperty(QueryElasticsearchHttp.INDEX, "prod-accounting");
+        runner.assertNotValid();
+        runner.setProperty(QueryElasticsearchHttp.TYPE, "provenance");
+        runner.assertNotValid();
+        runner.setProperty(QueryElasticsearchHttp.QUERY, "${query}");
+        runner.setProperty(QueryElasticsearchHttp.SORT, "timestamp:asc");
+        runner.setProperty(QueryElasticsearchHttp.FIELDS, 
"transit_uri,version");
+        runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "1");
+        runner.assertValid();
+
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put("query", 
"identifier:2f79eba8839f5976cd0b1e16a0e7fe8d7dd0ceca");
+        runner.enqueue("".getBytes(), attributes);
+        runner.run(1, true, true);
+
+        
runner.assertAllFlowFilesTransferred(QueryElasticsearchHttp.REL_SUCCESS, 3);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(
+                QueryElasticsearchHttp.REL_SUCCESS).get(0);
+        assertNotNull(out);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/00412f6e/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITScrollElasticsearchHttp.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITScrollElasticsearchHttp.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITScrollElasticsearchHttp.java
new file mode 100644
index 0000000..aa2a1e0
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITScrollElasticsearchHttp.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Test;
+
+public class ITScrollElasticsearchHttp {
+
+    private TestRunner runner;
+
+    @After
+    public void teardown() {
+        runner = null;
+    }
+
+    @Test
+    public void testFetchElasticsearchOnTrigger() throws IOException {
+        runner = TestRunners.newTestRunner(ScrollElasticsearchHttp.class); // 
all docs are found
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL,
+                "http://ip-172-31-49-152.ec2.internal:9200";);
+
+        runner.setProperty(ScrollElasticsearchHttp.INDEX, "prod-accounting");
+        runner.assertNotValid();
+        runner.setProperty(ScrollElasticsearchHttp.TYPE, "provenance");
+        runner.assertNotValid();
+        runner.setProperty(ScrollElasticsearchHttp.QUERY,
+                "identifier:2f79eba8839f5976cd0b1e16a0e7fe8d7dd0ceca");
+        runner.setProperty(ScrollElasticsearchHttp.SORT, "timestamp:asc");
+        runner.setProperty(ScrollElasticsearchHttp.FIELDS, 
"transit_uri,version");
+        runner.setProperty(ScrollElasticsearchHttp.PAGE_SIZE, "1");
+        runner.assertValid();
+
+        runner.setIncomingConnection(false);
+        runner.run(4, true, true);
+
+        
runner.assertAllFlowFilesTransferred(ScrollElasticsearchHttp.REL_SUCCESS, 3);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(
+                ScrollElasticsearchHttp.REL_SUCCESS).get(0);
+        assertNotNull(out);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/00412f6e/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java
new file mode 100644
index 0000000..b9ec1f9
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mockito.stubbing.OngoingStubbing;
+
+import okhttp3.Call;
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Protocol;
+import okhttp3.Request;
+import okhttp3.Response;
+import okhttp3.ResponseBody;
+
+public class TestQueryElasticsearchHttp {
+
+    private TestRunner runner;
+
+    @After
+    public void teardown() {
+        runner = null;
+    }
+
+    @Test
+    public void testQueryElasticsearchOnTrigger_withInput() throws IOException 
{
+        runner = TestRunners.newTestRunner(new 
QueryElasticsearchHttpTestProcessor());
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, 
"http://127.0.0.1:9200";);
+
+        runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
+        runner.assertNotValid();
+        runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
+        runner.assertNotValid();
+        runner.setProperty(QueryElasticsearchHttp.QUERY,
+                "source:Twitter AND identifier:\"${identifier}\"");
+        runner.assertValid();
+        runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2");
+        runner.assertValid();
+
+        runAndVerifySuccess(true);
+    }
+
+    @Test
+    public void testQueryElasticsearchOnTrigger_withInput_attributeTarget() 
throws IOException {
+        runner = TestRunners.newTestRunner(new 
QueryElasticsearchHttpTestProcessor());
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, 
"http://127.0.0.1:9200";);
+
+        runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
+        runner.assertNotValid();
+        runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
+        runner.assertNotValid();
+        runner.setProperty(QueryElasticsearchHttp.QUERY,
+                "source:Twitter AND identifier:\"${identifier}\"");
+        runner.assertValid();
+        runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2");
+        runner.assertValid();
+        runner.setProperty(QueryElasticsearchHttp.TARGET,
+                QueryElasticsearchHttp.TARGET_FLOW_FILE_ATTRIBUTES);
+
+        runAndVerifySuccess(false);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(
+                QueryElasticsearchHttp.REL_SUCCESS).get(0);
+        assertNotNull(out);
+        assertEquals("blah", new String(out.toByteArray()));
+        assertEquals("Twitter", out.getAttribute("es.result.source"));
+    }
+
+    @Test
+    public void testQueryElasticsearchOnTrigger_withNoInput() throws 
IOException {
+        runner = TestRunners.newTestRunner(new 
QueryElasticsearchHttpTestProcessor());
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, 
"http://127.0.0.1:9200";);
+
+        runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
+        runner.assertNotValid();
+        runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
+        runner.assertNotValid();
+        runner.setProperty(QueryElasticsearchHttp.QUERY,
+                "source:Twitter AND identifier:\"${identifier}\"");
+        runner.assertValid();
+        runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2");
+        runner.assertValid();
+
+        runner.setIncomingConnection(false);
+        runAndVerifySuccess(true);
+    }
+
+    private void runAndVerifySuccess(int expectedResults, boolean 
targetIsContent) {
+        runner.enqueue("blah".getBytes(), new HashMap<String, String>() {
+            {
+                put("identifier", "28039652140");
+            }
+        });
+
+        // Running once should page through all 3 docs
+        runner.run(1, true, true);
+
+        
runner.assertAllFlowFilesTransferred(QueryElasticsearchHttp.REL_SUCCESS, 
expectedResults);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(
+                QueryElasticsearchHttp.REL_SUCCESS).get(0);
+        assertNotNull(out);
+        if (targetIsContent) {
+        out.assertAttributeEquals("filename", "abc-97b-ASVsZu_"
+                + "vShwtGCJpGOObmuSqUJRUC3L_-SEND-S3");
+        }
+    }
+
+    // By default, 3 files should go to Success
+    private void runAndVerifySuccess(boolean targetIsContent) {
+        runAndVerifySuccess(3, targetIsContent);
+    }
+
+    @Test
+    public void testQueryElasticsearchOnTriggerWithFields() throws IOException 
{
+        runner = TestRunners.newTestRunner(new 
QueryElasticsearchHttpTestProcessor());
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, 
"http://127.0.0.1:9200";);
+
+        runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
+        runner.assertNotValid();
+        runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
+        runner.assertNotValid();
+        runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
+        runner.assertValid();
+        runner.setProperty(QueryElasticsearchHttp.FIELDS, "id,, 
userinfo.location");
+        runner.assertValid();
+        runner.setProperty(QueryElasticsearchHttp.SORT, 
"timestamp:asc,identifier:desc");
+        runner.assertValid();
+
+        runAndVerifySuccess(true);
+    }
+
+    @Test
+    public void testQueryElasticsearchOnTriggerWithLimit() throws IOException {
+        runner = TestRunners.newTestRunner(new 
QueryElasticsearchHttpTestProcessor());
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, 
"http://127.0.0.1:9200";);
+
+        runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
+        runner.assertNotValid();
+        runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
+        runner.assertNotValid();
+        runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
+        runner.assertValid();
+        runner.setProperty(QueryElasticsearchHttp.FIELDS, "id,, 
userinfo.location");
+        runner.assertValid();
+        runner.setProperty(QueryElasticsearchHttp.SORT, 
"timestamp:asc,identifier:desc");
+        runner.assertValid();
+        runner.setProperty(QueryElasticsearchHttp.LIMIT, "2");
+
+        runAndVerifySuccess(2, true);
+    }
+
+    @Test
+    public void testQueryElasticsearchOnTriggerWithServerErrorRetry() throws 
IOException {
+        QueryElasticsearchHttpTestProcessor processor = new 
QueryElasticsearchHttpTestProcessor();
+        processor.setStatus(500, "Server error");
+        runner = TestRunners.newTestRunner(processor); // simulate doc not 
found
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, 
"http://127.0.0.1:9200";);
+        runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
+
+        runner.enqueue("".getBytes(), new HashMap<String, String>() {
+            {
+                put("identifier", "28039652140");
+            }
+        });
+
+        runner.run(1, true, true);
+
+        // This test generates a HTTP 500 "Server error"
+        runner.assertAllFlowFilesTransferred(QueryElasticsearchHttp.REL_RETRY, 
1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(
+                QueryElasticsearchHttp.REL_RETRY).get(0);
+        assertNotNull(out);
+    }
+
+    @Test
+    public void testQueryElasticsearchOnTriggerWithServerFail() throws 
IOException {
+        QueryElasticsearchHttpTestProcessor processor = new 
QueryElasticsearchHttpTestProcessor();
+        processor.setStatus(100, "Should fail");
+        runner = TestRunners.newTestRunner(processor); // simulate doc not 
found
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, 
"http://127.0.0.1:9200";);
+        runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
+
+        runner.enqueue("".getBytes(), new HashMap<String, String>() {
+            {
+                put("identifier", "28039652140");
+            }
+        });
+
+        runner.run(1, true, true);
+
+        // This test generates a HTTP 100 "Should fail"
+        
runner.assertAllFlowFilesTransferred(QueryElasticsearchHttp.REL_FAILURE, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(
+                QueryElasticsearchHttp.REL_FAILURE).get(0);
+        assertNotNull(out);
+    }
+
+    @Test
+    public void testQueryElasticsearchOnTriggerWithIOException() throws 
IOException {
+        QueryElasticsearchHttpTestProcessor processor = new 
QueryElasticsearchHttpTestProcessor();
+        processor.setExceptionToThrow(new IOException("Error reading from 
disk"));
+        runner = TestRunners.newTestRunner(processor); // simulate doc not 
found
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, 
"http://127.0.0.1:9200";);
+        runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
+
+        runner.enqueue("".getBytes(), new HashMap<String, String>() {
+            {
+                put("identifier", "28039652140");
+            }
+        });
+
+        runner.run(1, true, true);
+
+        // This test generates a HTTP 100 "Should fail"
+        runner.assertAllFlowFilesTransferred(QueryElasticsearchHttp.REL_RETRY, 
1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(
+                QueryElasticsearchHttp.REL_RETRY).get(0);
+        assertNotNull(out);
+    }
+
+    @Test
+    public void testQueryElasticsearchOnTriggerWithServerFailAfterSuccess() 
throws IOException {
+        QueryElasticsearchHttpTestProcessor processor = new 
QueryElasticsearchHttpTestProcessor();
+        processor.setStatus(100, "Should fail", 2);
+        runner = TestRunners.newTestRunner(processor); // simulate doc not 
found
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, 
"http://127.0.0.1:9200";);
+        runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
+
+        runner.enqueue("".getBytes(), new HashMap<String, String>() {
+            {
+                put("identifier", "28039652140");
+            }
+        });
+
+        runner.run(1, true, true);
+
+        // This test generates a HTTP 100 "Should fail"
+        runner.assertTransferCount(QueryElasticsearchHttp.REL_SUCCESS, 2);
+        runner.assertTransferCount(QueryElasticsearchHttp.REL_FAILURE, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(
+                QueryElasticsearchHttp.REL_FAILURE).get(0);
+        assertNotNull(out);
+    }
+
+    @Test
+    public void 
testQueryElasticsearchOnTriggerWithServerFailNoIncomingFlowFile() throws 
IOException {
+        QueryElasticsearchHttpTestProcessor processor = new 
QueryElasticsearchHttpTestProcessor();
+        processor.setStatus(100, "Should fail", 1);
+        runner = TestRunners.newTestRunner(processor); // simulate doc not 
found
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, 
"http://127.0.0.1:9200";);
+        runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
+        runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
+
+        runner.setIncomingConnection(false);
+        runner.run(1, true, true);
+
+        // This test generates a HTTP 100 with no incoming flow file, so 
nothing should be transferred
+        processor.getRelationships().forEach(relationship -> 
runner.assertTransferCount(relationship, 0));
+        runner.assertTransferCount(QueryElasticsearchHttp.REL_FAILURE, 0);
+    }
+
+    @Test
+    public void testSetupSecureClient() throws Exception {
+        QueryElasticsearchHttpTestProcessor processor = new 
QueryElasticsearchHttpTestProcessor();
+        runner = TestRunners.newTestRunner(processor);
+        SSLContextService sslService = mock(SSLContextService.class);
+        when(sslService.getIdentifier()).thenReturn("ssl-context");
+        runner.addControllerService("ssl-context", sslService);
+        runner.enableControllerService(sslService);
+        runner.setProperty(QueryElasticsearchHttp.PROP_SSL_CONTEXT_SERVICE, 
"ssl-context");
+        runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, 
"http://127.0.0.1:9200";);
+        runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
+        runner.setValidateExpressionUsage(true);
+        runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
+
+        // Allow time for the controller service to fully initialize
+        Thread.sleep(500);
+
+        runner.enqueue("".getBytes(), new HashMap<String, String>() {
+            {
+                put("doc_id", "28039652140");
+            }
+        });
+        runner.run(1, true, true);
+    }
+
+    /**
+     * A Test class that extends the processor in order to inject/mock behavior
+     */
+    private static class QueryElasticsearchHttpTestProcessor extends 
QueryElasticsearchHttp {
+        Exception exceptionToThrow = null;
+        OkHttpClient client;
+        int goodStatusCode = 200;
+        String goodStatusMessage = "OK";
+
+        int badStatusCode;
+        String badStatusMessage;
+        int runNumber;
+
+        List<String> pages = Arrays.asList(getDoc("query-page1.json"), 
getDoc("query-page2.json"),
+                getDoc("query-page3.json"));
+
+        public void setExceptionToThrow(Exception exceptionToThrow) {
+            this.exceptionToThrow = exceptionToThrow;
+        }
+
+        /**
+         * Sets the status code and message for the 1st query
+         *
+         * @param code
+         *            The status code to return
+         * @param message
+         *            The status message
+         */
+        void setStatus(int code, String message) {
+            this.setStatus(code, message, 1);
+        }
+
+        /**
+         * Sets the status code and message for the runNumber-th query
+         *
+         * @param code
+         *            The status code to return
+         * @param message
+         *            The status message
+         * @param runNumber
+         *            The run number for which to set this status
+         */
+        void setStatus(int code, String message, int runNumber) {
+            badStatusCode = code;
+            badStatusMessage = message;
+            this.runNumber = runNumber;
+        }
+
+        @Override
+        protected void createElasticsearchClient(ProcessContext context) 
throws ProcessException {
+            client = mock(OkHttpClient.class);
+
+            OngoingStubbing<Call> stub = 
when(client.newCall(any(Request.class)));
+
+            for (int i = 0; i < pages.size(); i++) {
+                String page = pages.get(i);
+                if (runNumber == i + 1) {
+                    stub = mockReturnDocument(stub, page, badStatusCode, 
badStatusMessage);
+                } else {
+                    stub = mockReturnDocument(stub, page, goodStatusCode, 
goodStatusMessage);
+                }
+            }
+        }
+
+        private OngoingStubbing<Call> mockReturnDocument(OngoingStubbing<Call> 
stub,
+                final String document, int statusCode, String statusMessage) {
+            return stub.thenAnswer(new Answer<Call>() {
+
+                @Override
+                public Call answer(InvocationOnMock invocationOnMock) throws 
Throwable {
+                    Request realRequest = (Request) 
invocationOnMock.getArguments()[0];
+                    Response mockResponse = new Response.Builder()
+                            .request(realRequest)
+                            .protocol(Protocol.HTTP_1_1)
+                            .code(statusCode)
+                            .message(statusMessage)
+                            
.body(ResponseBody.create(MediaType.parse("application/json"), document))
+                            .build();
+                    final Call call = mock(Call.class);
+                    if (exceptionToThrow != null) {
+                        when(call.execute()).thenThrow(exceptionToThrow);
+                    } else {
+                        when(call.execute()).thenReturn(mockResponse);
+                    }
+                    return call;
+                }
+            });
+        }
+
+        protected OkHttpClient getClient() {
+            return client;
+        }
+    }
+
+    private static String getDoc(String filename) {
+        try {
+            return 
IOUtils.toString(QueryElasticsearchHttp.class.getClassLoader()
+                    .getResourceAsStream(filename));
+        } catch (IOException e) {
+            System.out.println("Error reading document " + filename);
+            return "";
+        }
+    }
+}

Reply via email to