http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/client/ElasticsearchClientFactory.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/client/ElasticsearchClientFactory.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/client/ElasticsearchClientFactory.java
new file mode 100644
index 0000000..4e0b2fe
--- /dev/null
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/client/ElasticsearchClientFactory.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.elasticsearch.client;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.invoke.MethodHandles;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.security.KeyManagementException;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import javax.net.ssl.SSLContext;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.nio.reactor.IOReactorConfig;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.ssl.SSLContexts;
+import org.apache.metron.elasticsearch.config.ElasticsearchClientConfig;
+import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
+import org.apache.metron.elasticsearch.utils.ElasticsearchUtils.HostnamePort;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Main entry point to create the ES client.
+ */
+public class ElasticsearchClientFactory {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final String ES_SETTINGS_KEY = "es.client.settings"; // es 
config key in global config
+
+  /**
+   * Creates an Elasticsearch client from settings provided via the global 
config.
+   *
+   * @return new client
+   */
+  public static ElasticsearchClient create(Map<String, Object> globalConfig) {
+    ElasticsearchClientConfig esClientConfig = new ElasticsearchClientConfig(
+        getEsSettings(globalConfig));
+    HttpHost[] httpHosts = getHttpHosts(globalConfig, 
esClientConfig.getConnectionScheme());
+    RestClientBuilder builder = RestClient.builder(httpHosts);
+
+    builder.setRequestConfigCallback(reqConfigBuilder -> {
+      // Modifies request config builder with connection and socket timeouts.
+      // 
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/5.6/_timeouts.html
+      
reqConfigBuilder.setConnectTimeout(esClientConfig.getConnectTimeoutMillis());
+      
reqConfigBuilder.setSocketTimeout(esClientConfig.getSocketTimeoutMillis());
+      return reqConfigBuilder;
+    });
+    
builder.setMaxRetryTimeoutMillis(esClientConfig.getMaxRetryTimeoutMillis());
+
+    builder.setHttpClientConfigCallback(clientBuilder -> {
+      
clientBuilder.setDefaultIOReactorConfig(getIOReactorConfig(esClientConfig));
+      
clientBuilder.setDefaultCredentialsProvider(getCredentialsProvider(esClientConfig));
+      clientBuilder.setSSLContext(getSSLContext(esClientConfig));
+      return clientBuilder;
+    });
+
+    RestClient lowLevelClient = builder.build();
+    RestHighLevelClient client = new RestHighLevelClient(lowLevelClient);
+    return new ElasticsearchClient(lowLevelClient, client);
+  }
+
+  private static Map<String, Object> getEsSettings(Map<String, Object> 
globalConfig) {
+    return (Map<String, Object>) globalConfig.getOrDefault(ES_SETTINGS_KEY, 
new HashMap<>());
+  }
+
+  private static HttpHost[] getHttpHosts(Map<String, Object> 
globalConfiguration, String scheme) {
+    List<HostnamePort> hps = ElasticsearchUtils.getIps(globalConfiguration);
+    HttpHost[] httpHosts = new HttpHost[hps.size()];
+    int i = 0;
+    for (HostnamePort hp : hps) {
+      httpHosts[i++] = new HttpHost(hp.hostname, hp.port, scheme);
+    }
+    return httpHosts;
+  }
+
+  /**
+   * Creates config with setting for num connection threads. Default is ES 
client default,
+   * which is 1 to num processors per the documentation.
+   * 
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/5.6/_number_of_threads.html
+   */
+  private static IOReactorConfig getIOReactorConfig(ElasticsearchClientConfig 
esClientConfig) {
+    if (esClientConfig.getNumClientConnectionThreads().isPresent()) {
+      Integer numThreads = 
esClientConfig.getNumClientConnectionThreads().get();
+      LOG.info("Setting number of client connection threads: {}", numThreads);
+      return IOReactorConfig.custom().setIoThreadCount(numThreads).build();
+    } else {
+      return IOReactorConfig.DEFAULT;
+    }
+  }
+
+  private static CredentialsProvider getCredentialsProvider(
+      ElasticsearchClientConfig esClientConfig) {
+    Optional<Entry<String, String>> credentials = 
esClientConfig.getCredentials();
+    if (credentials.isPresent()) {
+      LOG.info(
+          "Found auth credentials - setting up user/pass authenticated client 
connection for ES.");
+      final CredentialsProvider credentialsProvider = new 
BasicCredentialsProvider();
+      UsernamePasswordCredentials upcredentials = new 
UsernamePasswordCredentials(
+          credentials.get().getKey(), credentials.get().getValue());
+      credentialsProvider.setCredentials(AuthScope.ANY, upcredentials);
+      return credentialsProvider;
+    } else {
+      LOG.info(
+          "Elasticsearch client credentials not provided. Defaulting to 
non-authenticated client connection.");
+      return null;
+    }
+  }
+
+  /**
+   * <p>Setup connection encryption details (SSL) if applicable.
+   * If ssl.enabled=true, sets up SSL connection. If enabled, keystore.path is 
required. User can
+   * also optionally set keystore.password and keystore.type.
+   * 
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/5.6/_encrypted_communication.html
+   * <p>
+   * <p>Other guidance on the HTTP Component library and configuring SSL 
connections.
+   * 
http://www.robinhowlett.com/blog/2016/01/05/everything-you-ever-wanted-to-know-about-ssl-but-were-afraid-to-ask.
+   * <p>
+   * <p>JSSE docs - 
https://docs.oracle.com/javase/8/docs/technotes/guides/security/jsse/JSSERefGuide.html
+   * <p>
+   * <p>Additional guidance for configuring Elasticsearch for SSL can be found 
here - https://www.elastic.co/guide/en/x-pack/5.6/ssl-tls.html
+   */
+  private static SSLContext getSSLContext(ElasticsearchClientConfig 
esClientConfig) {
+    if (esClientConfig.isSSLEnabled()) {
+      LOG.info("Configuring client for SSL connection.");
+      if (!esClientConfig.getKeyStorePath().isPresent()) {
+        throw new IllegalStateException("KeyStore path must be provided for 
SSL connection.");
+      }
+      Optional<String> optKeyStorePass = esClientConfig.getKeyStorePassword();
+      char[] keyStorePass = 
optKeyStorePass.map(String::toCharArray).orElse(null);
+      KeyStore trustStore = getStore(esClientConfig.getKeyStoreType(),
+          esClientConfig.getKeyStorePath().get(), keyStorePass);
+      try {
+        SSLContextBuilder sslBuilder = 
SSLContexts.custom().loadTrustMaterial(trustStore, null);
+        return sslBuilder.build();
+      } catch (NoSuchAlgorithmException | KeyStoreException | 
KeyManagementException e) {
+        throw new IllegalStateException("Unable to load truststore.", e);
+      }
+    }
+    return null;
+  }
+
+  private static KeyStore getStore(String type, Path path, char[] pass) {
+    KeyStore store;
+    try {
+      store = KeyStore.getInstance(type);
+    } catch (KeyStoreException e) {
+      throw new IllegalStateException("Unable to get keystore type '" + type + 
"'", e);
+    }
+    try (InputStream is = Files.newInputStream(path)) {
+      store.load(is, pass);
+    } catch (IOException | NoSuchAlgorithmException | CertificateException e) {
+      throw new IllegalStateException("Unable to load keystore from path '" + 
path + "'", e);
+    }
+    return store;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/config/ElasticsearchClientConfig.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/config/ElasticsearchClientConfig.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/config/ElasticsearchClientConfig.java
new file mode 100644
index 0000000..2ca4763
--- /dev/null
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/config/ElasticsearchClientConfig.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.elasticsearch.config;
+
+import static java.lang.String.format;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.AbstractMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.commons.collections4.map.AbstractMapDecorator;
+import org.apache.commons.lang.StringUtils;
+import org.apache.metron.common.utils.HDFSUtils;
+
+/**
+ * Access configuration options for the ES client.
+ */
+public class ElasticsearchClientConfig extends AbstractMapDecorator<String, 
Object> {
+
+  private static final Integer THIRTY_SECONDS_IN_MILLIS = 30_000;
+  private static final Integer ONE_SECONDS_IN_MILLIS = 1_000;
+  private static final String DEFAULT_KEYSTORE_TYPE = "JKS";
+
+  /**
+   * Initialize config from provided settings Map.
+   *
+   * @param settings Map of config options from which to initialize.
+   */
+  public ElasticsearchClientConfig(Map<String, Object> settings) {
+    super(settings);
+  }
+
+  /**
+   * @return Connection timeout as specified by user, or default 1s as defined 
by the ES client.
+   */
+  public Integer getConnectTimeoutMillis() {
+    return ElasticsearchClientOptions.CONNECTION_TIMEOUT_MILLIS
+        .getOrDefault(this, Integer.class, ONE_SECONDS_IN_MILLIS);
+  }
+
+  /**
+   * @return socket timeout specified by user, or default 30s as defined by 
the ES client.
+   */
+  public Integer getSocketTimeoutMillis() {
+    return ElasticsearchClientOptions.SOCKET_TIMEOUT_MILLIS
+        .getOrDefault(this, Integer.class, THIRTY_SECONDS_IN_MILLIS);
+  }
+
+  /**
+   * @return max retry timeout specified by user, or default 30s as defined by 
the ES client.
+   */
+  public Integer getMaxRetryTimeoutMillis() {
+    return ElasticsearchClientOptions.MAX_RETRY_TIMEOUT_MILLIS
+        .getOrDefault(this, Integer.class, THIRTY_SECONDS_IN_MILLIS);
+  }
+
+  /**
+   * Elasticsearch X-Pack credentials.
+   *
+   * @return Username, password
+   */
+  public Optional<Map.Entry<String, String>> getCredentials() {
+    if (ElasticsearchClientOptions.XPACK_PASSWORD_FILE.containsOption(this)) {
+      if (!ElasticsearchClientOptions.XPACK_USERNAME.containsOption(this) ||
+          
StringUtils.isEmpty(ElasticsearchClientOptions.XPACK_USERNAME.get(this, 
String.class))) {
+        throw new IllegalArgumentException(
+            "X-pack username is required when password supplied and cannot be 
empty");
+      }
+      String user = ElasticsearchClientOptions.XPACK_USERNAME.get(this, 
String.class);
+      String password = getPasswordFromFile(
+          ElasticsearchClientOptions.XPACK_PASSWORD_FILE.get(this, 
String.class));
+      if (user != null && password != null) {
+        return Optional.of(new AbstractMap.SimpleImmutableEntry<String, 
String>(user, password));
+      }
+    }
+    return Optional.empty();
+  }
+
+  /**
+   * Expects single password on first line.
+   */
+  private static String getPasswordFromFile(String hdfsPath) {
+    List<String> lines = readLines(hdfsPath);
+    if (lines.size() == 0) {
+      throw new IllegalArgumentException(format("No password found in file 
'%s'", hdfsPath));
+    }
+    return lines.get(0);
+  }
+
+  /**
+   * Read all lines from HDFS file.
+   *
+   * @param hdfsPath path to file
+   * @return lines
+   */
+  private static List<String> readLines(String hdfsPath) {
+    try {
+      return HDFSUtils.readFile(hdfsPath);
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          format("Unable to read XPack password file from HDFS location '%s'", 
hdfsPath), e);
+    }
+  }
+
+  /**
+   * Determines if SSL is enabled from user-supplied config ssl.enabled.
+   */
+  public boolean isSSLEnabled() {
+    return ElasticsearchClientOptions.SSL_ENABLED.getOrDefault(this, 
Boolean.class, false);
+  }
+
+  /**
+   * http by default, https if ssl is enabled.
+   */
+  public String getConnectionScheme() {
+    return isSSLEnabled() ? "https" : "http";
+  }
+
+  /**
+   * @return Number of threads to use for client connection.
+   */
+  public Optional<Integer> getNumClientConnectionThreads() {
+    if 
(ElasticsearchClientOptions.NUM_CLIENT_CONNECTION_THREADS.containsOption(this)) 
{
+      return Optional
+          
.of(ElasticsearchClientOptions.NUM_CLIENT_CONNECTION_THREADS.get(this, 
Integer.class));
+    }
+    return Optional.empty();
+  }
+
+  /**
+   * @return User-defined keystore type. Defaults to "JKS" if not defined.
+   */
+  public String getKeyStoreType() {
+    if (ElasticsearchClientOptions.KEYSTORE_TYPE.containsOption(this)
+        && StringUtils
+        .isNotEmpty(ElasticsearchClientOptions.KEYSTORE_TYPE.get(this, 
String.class))) {
+      return ElasticsearchClientOptions.KEYSTORE_TYPE.get(this, String.class);
+    }
+    return DEFAULT_KEYSTORE_TYPE;
+  }
+
+  /**
+   * Reads keystore password from the HDFS file defined by setting 
"keystore.password.file", if it
+   * exists.
+   *
+   * @return password if it exists, empty optional otherwise.
+   */
+  public Optional<String> getKeyStorePassword() {
+    if 
(ElasticsearchClientOptions.KEYSTORE_PASSWORD_FILE.containsOption(this)) {
+      String password = getPasswordFromFile(
+          ElasticsearchClientOptions.KEYSTORE_PASSWORD_FILE.get(this, 
String.class));
+      if (StringUtils.isNotEmpty(password)) {
+        return Optional.of(password);
+      }
+    }
+    return Optional.empty();
+  }
+
+  /**
+   * @return keystore path.
+   */
+  public Optional<Path> getKeyStorePath() {
+    if (ElasticsearchClientOptions.KEYSTORE_PATH.containsOption(this)) {
+      return 
Optional.of(Paths.get(ElasticsearchClientOptions.KEYSTORE_PATH.get(this, 
String.class)));
+    }
+    return Optional.empty();
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/config/ElasticsearchClientOptions.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/config/ElasticsearchClientOptions.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/config/ElasticsearchClientOptions.java
new file mode 100644
index 0000000..c92a34f
--- /dev/null
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/config/ElasticsearchClientOptions.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.elasticsearch.config;
+
+import org.apache.metron.common.configuration.ConfigOption;
+
+public enum ElasticsearchClientOptions implements ConfigOption {
+  CONNECTION_TIMEOUT_MILLIS("connection.timeout.millis"),
+  SOCKET_TIMEOUT_MILLIS("socket.timeout.millis"),
+  MAX_RETRY_TIMEOUT_MILLIS("max.retry.timeout.millis"),
+  NUM_CLIENT_CONNECTION_THREADS("num.client.connection.threads"),
+  // authentication
+  XPACK_USERNAME("xpack.username"),
+  XPACK_PASSWORD_FILE("xpack.password.file"),
+  // security/encryption
+  SSL_ENABLED("ssl.enabled"),
+  KEYSTORE_TYPE("keystore.type"),
+  KEYSTORE_PATH("keystore.path"),
+  KEYSTORE_PASSWORD_FILE("keystore.password.file");
+
+  private final String key;
+
+  ElasticsearchClientOptions(String key) {
+    this.key = key;
+  }
+
+  @Override
+  public String getKey() {
+    return key;
+  }
+
+  /**
+   * Convenience method for printing all options as their key representation.
+   */
+  public static void printOptions() {
+    String newLine = "";
+    for (ElasticsearchClientOptions opt : ElasticsearchClientOptions.values()) 
{
+      System.out.print(newLine);
+      System.out.print(opt.getKey());
+      newLine = System.lineSeparator();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java
index 64a641f..cb44694 100644
--- 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java
@@ -18,33 +18,23 @@
 
 package org.apache.metron.elasticsearch.dao;
 
-import org.apache.metron.elasticsearch.utils.ElasticsearchClient;
-import org.apache.metron.elasticsearch.utils.FieldMapping;
-import org.apache.metron.elasticsearch.utils.FieldProperties;
-import org.apache.metron.indexing.dao.ColumnMetadataDao;
-import org.apache.metron.indexing.dao.search.FieldType;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
-import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexRequestBuilder;
-import org.elasticsearch.client.AdminClient;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.cluster.metadata.MappingMetaData;
-import org.elasticsearch.common.collect.ImmutableOpenMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static 
org.apache.metron.elasticsearch.utils.ElasticsearchUtils.INDEX_NAME_DELIMITER;
 
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
-import static 
org.apache.metron.elasticsearch.utils.ElasticsearchUtils.INDEX_NAME_DELIMITER;
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
+import org.apache.metron.elasticsearch.utils.FieldMapping;
+import org.apache.metron.elasticsearch.utils.FieldProperties;
+import org.apache.metron.indexing.dao.ColumnMetadataDao;
+import org.apache.metron.indexing.dao.search.FieldType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Responsible for retrieving column-level metadata for Elasticsearch search 
indices.
@@ -68,16 +58,13 @@ public class ElasticsearchColumnMetadataDao implements 
ColumnMetadataDao {
     elasticsearchTypeMap = Collections.unmodifiableMap(fieldTypeMap);
   }
 
-  /**
-   * An Elasticsearch administrative client.
-   */
-  private transient ElasticsearchClient adminClient;
+  private transient ElasticsearchClient esClient;
 
   /**
-   * @param adminClient The Elasticsearch admin client.
+   * @param esClient The Elasticsearch client.
    */
-  public ElasticsearchColumnMetadataDao(ElasticsearchClient adminClient) {
-    this.adminClient = adminClient;
+  public ElasticsearchColumnMetadataDao(ElasticsearchClient esClient) {
+    this.esClient = esClient;
   }
 
   @SuppressWarnings("unchecked")
@@ -90,7 +77,7 @@ public class ElasticsearchColumnMetadataDao implements 
ColumnMetadataDao {
     String[] latestIndices = getLatestIndices(indices);
     if (latestIndices.length > 0) {
 
-     Map<String, FieldMapping>  mappings = 
adminClient.getMappings(latestIndices);
+     Map<String, FieldMapping>  mappings = 
esClient.getMappingByIndex(latestIndices);
 
       // for each index
       for (Map.Entry<String, FieldMapping> kv : mappings.entrySet()) {
@@ -166,7 +153,7 @@ public class ElasticsearchColumnMetadataDao implements 
ColumnMetadataDao {
     LOG.debug("Getting latest indices; indices={}", includeIndices);
     Map<String, String> latestIndices = new HashMap<>();
 
-    String[] indices = adminClient.getIndices();
+    String[] indices = esClient.getIndices();
 
     for (String index : indices) {
       int prefixEnd = index.indexOf(INDEX_NAME_DELIMITER);

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
index fa04610..210e1ce 100644
--- 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
@@ -22,9 +22,8 @@ import java.lang.invoke.MethodHandles;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-
-import org.apache.metron.elasticsearch.utils.ElasticsearchClient;
-import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
+import org.apache.metron.elasticsearch.client.ElasticsearchClientFactory;
 import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.dao.IndexDao;
 import org.apache.metron.indexing.dao.RetrieveLatestDao;
@@ -40,8 +39,6 @@ import org.apache.metron.indexing.dao.update.Document;
 import org.apache.metron.indexing.dao.update.OriginalNotFoundException;
 import org.apache.metron.indexing.dao.update.PatchRequest;
 import org.apache.metron.indexing.dao.update.ReplaceRequest;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -99,8 +96,7 @@ public class ElasticsearchDao implements IndexDao {
   @Override
   public synchronized void init(AccessConfig config) {
     if (this.client == null) {
-      this.client = ElasticsearchUtils
-          .getClient(config.getGlobalConfigSupplier().get());
+      this.client = 
ElasticsearchClientFactory.create(config.getGlobalConfigSupplier().get());
       this.accessConfig = config;
       this.columnMetadataDao = new ElasticsearchColumnMetadataDao(this.client);
       this.requestSubmitter = new ElasticsearchRequestSubmitter(this.client);

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java
index 64d9200..c63532e 100644
--- 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java
@@ -20,20 +20,16 @@ package org.apache.metron.elasticsearch.dao;
 
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.metron.elasticsearch.utils.ElasticsearchClient;
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
 import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
 import org.apache.metron.indexing.dao.search.InvalidSearchException;
-import org.elasticsearch.action.search.SearchPhaseExecutionException;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.search.ShardSearchFailure;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.rest.RestStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 
 /**

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java
index ff1189c..0c91007 100644
--- 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java
@@ -29,14 +29,11 @@ import java.util.List;
 import java.util.Optional;
 import java.util.function.Function;
 
-import org.apache.metron.elasticsearch.utils.ElasticsearchClient;
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
 import org.apache.metron.indexing.dao.RetrieveLatestDao;
 import org.apache.metron.indexing.dao.search.GetRequest;
 import org.apache.metron.indexing.dao.update.Document;
 import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.index.query.IdsQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java
index 32cefe0..0b87e56 100644
--- 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java
@@ -19,26 +19,19 @@ package org.apache.metron.elasticsearch.dao;
 
 import static 
org.apache.metron.elasticsearch.utils.ElasticsearchUtils.INDEX_NAME_DELIMITER;
 
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.function.Function;
 
-import org.apache.metron.elasticsearch.utils.ElasticsearchClient;
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
 import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
 import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.dao.search.FieldType;
-import org.apache.metron.indexing.dao.search.GetRequest;
 import org.apache.metron.indexing.dao.search.Group;
 import org.apache.metron.indexing.dao.search.GroupOrder;
 import org.apache.metron.indexing.dao.search.GroupOrderType;
@@ -52,17 +45,10 @@ import org.apache.metron.indexing.dao.search.SearchResponse;
 import org.apache.metron.indexing.dao.search.SearchResult;
 import org.apache.metron.indexing.dao.search.SortField;
 import org.apache.metron.indexing.dao.search.SortOrder;
-import org.apache.metron.indexing.dao.update.Document;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.index.mapper.LegacyIpFieldMapper;
-import org.elasticsearch.index.query.IdsQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.index.query.QueryStringQueryBuilder;
 import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.SearchHits;
 import org.elasticsearch.search.aggregations.Aggregation;
 import org.elasticsearch.search.aggregations.AggregationBuilders;
 import org.elasticsearch.search.aggregations.Aggregations;

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java
index 75300ea..c769b2f 100644
--- 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java
@@ -29,7 +29,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
-import org.apache.metron.elasticsearch.utils.ElasticsearchClient;
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
 import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
 import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.dao.search.AlertComment;
@@ -37,13 +37,10 @@ import 
org.apache.metron.indexing.dao.update.CommentAddRemoveRequest;
 import org.apache.metron.indexing.dao.update.Document;
 import org.apache.metron.indexing.dao.update.UpdateDao;
 import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.index.IndexResponse;
 import 
org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.client.transport.TransportClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchClient.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchClient.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchClient.java
deleted file mode 100644
index 669ac10..0000000
--- 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchClient.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.elasticsearch.utils;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.HttpEntity;
-import org.apache.http.entity.BasicHttpEntity;
-import org.apache.http.entity.StringEntity;
-import org.apache.metron.common.utils.JSONUtils;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Request;
-import org.elasticsearch.client.Response;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.cluster.metadata.MappingMetaData;
-import org.elasticsearch.common.collect.ImmutableOpenMap;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class ElasticsearchClient implements AutoCloseable{
-  private RestClient lowLevelClient;
-  private RestHighLevelClient highLevelClient;
-
-  public ElasticsearchClient(RestClient lowLevelClient, RestHighLevelClient 
highLevelClient) {
-    this.lowLevelClient = lowLevelClient;
-    this.highLevelClient = highLevelClient;
-  }
-
-  public RestClient getLowLevelClient() {
-    return lowLevelClient;
-  }
-
-  public RestHighLevelClient getHighLevelClient() {
-    return highLevelClient;
-  }
-
-  @Override
-  public void close() throws IOException {
-    if(lowLevelClient != null) {
-      lowLevelClient.close();
-    }
-  }
-
-  public void putMapping(String index, String type, String source) throws 
IOException {
-    HttpEntity entity = new StringEntity(source);
-    Response response = lowLevelClient.performRequest("PUT"
-            , "/" + index + "/_mapping/" + type
-            , Collections.emptyMap()
-            , entity
-    );
-
-    if(response.getStatusLine().getStatusCode() != 200) {
-      String responseStr = IOUtils.toString(response.getEntity().getContent());
-      throw new IllegalStateException("Got a " + 
response.getStatusLine().getStatusCode() + " due to " + responseStr);
-    }
-    /**
-     * ((ElasticsearchDao) 
esDao).getClient().admin().indices().preparePutMapping(INDEX)
-            .setType("test_doc")
-            .setSource(nestedAlertMapping)
-            .get();
-     */
-  }
-
-  public String[] getIndices() throws IOException {
-    Response response = lowLevelClient.performRequest("GET", "/_cat/indices");
-    if(response.getStatusLine().getStatusCode() == 200) {
-      String responseStr = IOUtils.toString(response.getEntity().getContent());
-      List<String> indices = new ArrayList<>();
-      for(String line : Splitter.on("\n").split(responseStr)) {
-        Iterable<String> splits = Splitter.on(" 
").split(line.replaceAll("\\s+", " ").trim());
-        if(Iterables.size(splits) > 3) {
-          String index = Iterables.get(splits, 2, "");
-          if(!StringUtils.isEmpty(index)) {
-            indices.add(index.trim());
-          }
-        }
-      }
-      String[] ret = new String[indices.size()];
-      ret=indices.toArray(ret);
-      return ret;
-    }
-    return null;
-  }
-
-  private Map<String, Object> getInnerMap(Map<String, Object> outerMap, 
String... keys) {
-    Map<String, Object> ret = outerMap;
-    if(keys.length == 0) {
-      return outerMap;
-    }
-    for(String key : keys) {
-      ret = (Map<String, Object>)ret.get(key);
-      if(ret == null) {
-        return ret;
-      }
-    }
-    return ret;
-  }
-
-  public Map<String, FieldMapping> getMappings(String[] indices) throws 
IOException {
-    Map<String, FieldMapping> ret = new HashMap<>();
-    String indicesCsv = Joiner.on(",").join(indices);
-    Response response = lowLevelClient.performRequest("GET", "/" + indicesCsv 
+ "/_mapping");
-    if(response.getStatusLine().getStatusCode() == 200) {
-      String responseStr = IOUtils.toString(response.getEntity().getContent());
-      Map<String, Object> indexToMapping = 
JSONUtils.INSTANCE.load(responseStr, JSONUtils.MAP_SUPPLIER);
-      for(Map.Entry<String, Object> index2Mapping : indexToMapping.entrySet()) 
{
-        String index = index2Mapping.getKey();
-        Map<String, Object> mappings = getInnerMap((Map<String, 
Object>)index2Mapping.getValue(), "mappings");
-        if(mappings.size() > 0) {
-          Map.Entry<String, Object> docMap = 
Iterables.getFirst(mappings.entrySet(), null);
-          if(docMap != null) {
-            Map<String, Object> fieldPropertiesMap = getInnerMap((Map<String, 
Object>)docMap.getValue(), "properties");
-            if(fieldPropertiesMap != null) {
-              FieldMapping mapping = new FieldMapping();
-              for (Map.Entry<String, Object> field2PropsKV : 
fieldPropertiesMap.entrySet()) {
-                if(field2PropsKV.getValue() != null) {
-                  FieldProperties props = new FieldProperties((Map<String, 
Object>) field2PropsKV.getValue());
-                  mapping.put(field2PropsKV.getKey(), props);
-                }
-              }
-              ret.put(index, mapping);
-            }
-          }
-        }
-      }
-    }
-    return ret;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
index 838f8c7..47cbd98 100644
--- 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
@@ -17,68 +17,35 @@
  */
 package org.apache.metron.elasticsearch.utils;
 
-import static java.lang.String.format;
-
 import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
 import java.text.SimpleDateFormat;
-import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 import java.util.stream.Collectors;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
-import org.apache.metron.common.utils.HDFSUtils;
-import org.apache.metron.common.utils.ReflectionUtils;
-import org.apache.metron.indexing.dao.search.SearchRequest;
 import org.apache.metron.indexing.dao.search.SearchResponse;
 import org.apache.metron.indexing.dao.search.SearchResult;
-import org.apache.metron.netty.utils.NettyRuntimeWrapper;
-import org.apache.metron.stellar.common.utils.ConversionUtils;
 import org.codehaus.jackson.map.ObjectMapper;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestClientBuilder;
 import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.bytes.BytesReference;
-import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
-import org.elasticsearch.search.query.QuerySearchRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ElasticsearchUtils {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  private static final String ES_CLIENT_CLASS_DEFAULT = 
"org.elasticsearch.transport.client.PreBuiltTransportClient";
-  private static final String PWD_FILE_CONFIG_KEY = "es.xpack.password.file";
-  private static final String USERNAME_CONFIG_KEY = "es.xpack.username";
-  private static final String TRANSPORT_CLIENT_USER_KEY = 
"xpack.security.user";
-
-
   private static ThreadLocal<Map<String, SimpleDateFormat>> DATE_FORMAT_CACHE
           = ThreadLocal.withInitial(() -> new HashMap<>());
 
@@ -92,10 +59,6 @@ public class ElasticsearchUtils {
    */
   public static final String INDEX_NAME_DELIMITER = "_index";
 
-  public static SimpleDateFormat getIndexFormat(WriterConfiguration 
configurations) {
-    return getIndexFormat(configurations.getGlobalConfig());
-  }
-
   public static SimpleDateFormat getIndexFormat(Map<String, Object> 
globalConfig) {
     String format = (String) globalConfig.get("es.date.format");
     return DATE_FORMAT_CACHE.get().computeIfAbsent(format, 
SimpleDateFormat::new);
@@ -116,176 +79,16 @@ public class ElasticsearchUtils {
     return indexName;
   }
 
-  /**
-   * Extracts the base index name from a full index name.
-   *
-   * For example, given an index named 'bro_index_2017.01.01.01', the base
-   * index name is 'bro'.
-   *
-   * @param indexName The full index name including delimiter and date postfix.
-   * @return The base index name.
-   */
-  public static String getBaseIndexName(String indexName) {
-
-    String[] parts = indexName.split(INDEX_NAME_DELIMITER);
-    if(parts.length < 1 || StringUtils.isEmpty(parts[0])) {
-      String msg = format("Unexpected index name; index=%s, delimiter=%s", 
indexName, INDEX_NAME_DELIMITER);
-      throw new IllegalStateException(msg);
-    }
-
-    return parts[0];
-  }
-
-  /**
-   * Instantiates an Elasticsearch client based on es.client.class, if set. 
Defaults to
-   * org.elasticsearch.transport.client.PreBuiltTransportClient.
-   *
-   * @param globalConfiguration Metron global config
-   * @return
-   */
-  public static ElasticsearchClient getClient(Map<String, Object> 
globalConfiguration) {
-    Map<String, String> esSettings = getEsSettings(globalConfiguration);
-    Optional<Map.Entry<String, String>> credentials = 
getCredentials(esSettings);
-    Set<String> customESSettings = new HashSet<>();
-
-
-    RestClientBuilder builder = null;
-    List<HostnamePort> hps = getIps(globalConfiguration);
-    {
-      HttpHost[] posts = new HttpHost[hps.size()];
-      int i = 0;
-      for (HostnamePort hp : hps) {
-        posts[i++] = new HttpHost(hp.hostname, hp.port);
-      }
-      builder = RestClient.builder(posts);
-    }
-    if(credentials.isPresent()) {
-      final CredentialsProvider credentialsProvider = new 
BasicCredentialsProvider();
-      credentialsProvider.setCredentials(AuthScope.ANY,
-              new UsernamePasswordCredentials(credentials.get().getKey(), 
credentials.get().getValue()));
-      builder = builder.setHttpClientConfigCallback(
-              httpAsyncClientBuilder -> 
httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
-      );
-    }
-    RestClient lowLevelClient = builder.build();
-    RestHighLevelClient client = new RestHighLevelClient(lowLevelClient);
-    return new ElasticsearchClient(lowLevelClient, client);
-
-    /*customESSettings.addAll(Arrays.asList("es.client.class", 
USERNAME_CONFIG_KEY, PWD_FILE_CONFIG_KEY));
-    Settings.Builder settingsBuilder = Settings.builder();
-    for (Map.Entry<String, String> entry : esSettings.entrySet()) {
-      String key = entry.getKey();
-      String value = entry.getValue();
-      if (!customESSettings.contains(key)) {
-        settingsBuilder.put(key, value);
-      }
-    }
-    settingsBuilder.put("cluster.name", 
globalConfiguration.get("es.clustername"));
-    settingsBuilder.put("client.transport.ping_timeout", 
esSettings.getOrDefault("client.transport.ping_timeout","500s"));
-    setXPackSecurityOrNone(settingsBuilder, esSettings);
-
-    try {
-      LOG.info("Number of available processors in Netty: {}", 
NettyRuntimeWrapper.availableProcessors());
-      // Netty sets available processors statically and if an attempt is made 
to set it more than
-      // once an IllegalStateException is thrown by 
NettyRuntime.setAvailableProcessors(NettyRuntime.java:87)
-      // 
https://discuss.elastic.co/t/getting-availableprocessors-is-already-set-to-1-rejecting-1-illegalstateexception-exception/103082
-      // 
https://discuss.elastic.co/t/elasticsearch-5-4-1-availableprocessors-is-already-set/88036
-      System.setProperty("es.set.netty.runtime.available.processors", "false");
-      TransportClient client = createTransportClient(settingsBuilder.build(), 
esSettings);
-      for (HostnamePort hp : getIps(globalConfiguration)) {
-        client.addTransportAddress(
-                new 
InetSocketTransportAddress(InetAddress.getByName(hp.hostname), hp.port)
-        );
-      }
-      return client;
-    } catch (UnknownHostException exception) {
-      throw new RuntimeException(exception);
-    }*/
-  }
-
-  private static Map<String, String> getEsSettings(Map<String, Object> config) 
{
-    return ConversionUtils
-        .convertMap((Map<String, Object>) 
config.getOrDefault("es.client.settings", new HashMap<String, Object>()),
-            String.class);
-  }
-
-  private static Optional<Map.Entry<String, String>> 
getCredentials(Map<String, String> esSettings) {
-    Optional<Map.Entry<String, String>> ret = Optional.empty();
-    if (esSettings.containsKey(PWD_FILE_CONFIG_KEY)) {
-
-      if (!esSettings.containsKey(USERNAME_CONFIG_KEY) || 
StringUtils.isEmpty(esSettings.get(USERNAME_CONFIG_KEY))) {
-        throw new IllegalArgumentException("X-pack username is required and 
cannot be empty");
-      }
-      String user = esSettings.get(USERNAME_CONFIG_KEY);
-      String password = 
esSettings.containsKey(PWD_FILE_CONFIG_KEY)?esSettings.get(getPasswordFromFile(esSettings.get(PWD_FILE_CONFIG_KEY))):null;
-      if(user != null && password != null) {
-        return Optional.of(new AbstractMap.SimpleImmutableEntry<String, 
String>(user, password));
-      }
-    }
-    return ret;
-  }
-
-  /*
-   * Append Xpack security settings (if any)
-   */
-  private static void setXPackSecurityOrNone(Settings.Builder settingsBuilder, 
Map<String, String> esSettings) {
-
-    if (esSettings.containsKey(PWD_FILE_CONFIG_KEY)) {
-
-      if (!esSettings.containsKey(USERNAME_CONFIG_KEY) || 
StringUtils.isEmpty(esSettings.get(USERNAME_CONFIG_KEY))) {
-        throw new IllegalArgumentException("X-pack username is required and 
cannot be empty");
-      }
-
-      settingsBuilder.put(
-         TRANSPORT_CLIENT_USER_KEY,
-         esSettings.get(USERNAME_CONFIG_KEY) + ":" + 
getPasswordFromFile(esSettings.get(PWD_FILE_CONFIG_KEY))
-      );
-    }
-  }
-
-  /*
-   * Single password on first line
-   */
-  private static String getPasswordFromFile(String hdfsPath) {
-    List<String> lines = null;
-    try {
-      lines = HDFSUtils.readFile(hdfsPath);
-    } catch (IOException e) {
-      throw new IllegalArgumentException(
-          format("Unable to read XPack password file from HDFS location '%s'", 
hdfsPath), e);
-    }
-    if (lines.size() == 0) {
-      throw new IllegalArgumentException(format("No password found in file 
'%s'", hdfsPath));
-    }
-    return lines.get(0);
-  }
-
-  /**
-   * Constructs ES transport client from the provided ES settings additional 
es config
-   *
-   * @param settings client settings
-   * @param esSettings client type to instantiate
-   * @return client with provided settings
-   */
-  private static TransportClient createTransportClient(Settings settings,
-      Map<String, String> esSettings) {
-    String esClientClassName = (String) esSettings
-        .getOrDefault("es.client.class", ES_CLIENT_CLASS_DEFAULT);
-    return ReflectionUtils
-        .createInstance(esClientClassName, new Class[]{Settings.class, 
Class[].class},
-            new Object[]{settings, new Class[0]});
-  }
-
   public static class HostnamePort {
-    String hostname;
-    Integer port;
+    public String hostname;
+    public Integer port;
     public HostnamePort(String hostname, Integer port) {
       this.hostname = hostname;
       this.port = port;
     }
   }
 
-  protected static List<HostnamePort> getIps(Map<String, Object> 
globalConfiguration) {
+  public static List<HostnamePort> getIps(Map<String, Object> 
globalConfiguration) {
     Object ipObj = globalConfiguration.get("es.ip");
     Object portObj = globalConfiguration.get("es.port");
     if(ipObj == null) {

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldMapping.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldMapping.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldMapping.java
index 101e288..15bcb4c 100644
--- 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldMapping.java
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldMapping.java
@@ -21,6 +21,9 @@ import 
org.apache.commons.collections4.map.AbstractMapDecorator;
 
 import java.util.HashMap;
 
+/**
+ * Typedef that maps Elasticsearch index name to properties.
+ */
 public class FieldMapping extends AbstractMapDecorator<String, 
FieldProperties>{
   public FieldMapping() {
     super(new HashMap<String, FieldProperties>());

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldProperties.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldProperties.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldProperties.java
index 82aca42..d116b40 100644
--- 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldProperties.java
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldProperties.java
@@ -22,6 +22,9 @@ import 
org.apache.commons.collections4.map.AbstractMapDecorator;
 import java.util.HashMap;
 import java.util.Map;
 
+/**
+ * Typedef that maps Elasticsearch field names to types.
+ */
 public class FieldProperties extends AbstractMapDecorator<String, Object> {
   public FieldProperties() {
     super(new HashMap<>());

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
index 20f387f..fbdd4fe 100644
--- 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
@@ -23,18 +23,15 @@ import org.apache.metron.common.field.FieldNameConverter;
 import org.apache.metron.common.field.FieldNameConverters;
 import org.apache.metron.common.writer.BulkMessageWriter;
 import org.apache.metron.common.writer.BulkWriterResponse;
-import org.apache.metron.elasticsearch.utils.ElasticsearchClient;
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
+import org.apache.metron.elasticsearch.client.ElasticsearchClientFactory;
 import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Tuple;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexRequestBuilder;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.client.transport.TransportClient;
 import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,7 +66,7 @@ public class ElasticsearchWriter implements 
BulkMessageWriter<JSONObject>, Seria
   public void init(Map stormConf, TopologyContext topologyContext, 
WriterConfiguration configurations) {
 
     Map<String, Object> globalConfiguration = configurations.getGlobalConfig();
-    client = ElasticsearchUtils.getClient(globalConfiguration);
+    client = ElasticsearchClientFactory.create(globalConfiguration);
     dateFormat = ElasticsearchUtils.getIndexFormat(globalConfiguration);
   }
 
@@ -81,7 +78,6 @@ public class ElasticsearchWriter implements 
BulkMessageWriter<JSONObject>, Seria
 
     final String indexPostfix = dateFormat.format(new Date());
     BulkRequest bulkRequest = new BulkRequest();
-    //BulkRequestBuilder bulkRequest = client.prepareBulk();
     for(JSONObject message: messages) {
 
       JSONObject esDoc = new JSONObject();

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDaoTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDaoTest.java
 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDaoTest.java
index e2a675f..c9389c0 100644
--- 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDaoTest.java
+++ 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDaoTest.java
@@ -18,18 +18,10 @@
 
 package org.apache.metron.elasticsearch.dao;
 
-import org.apache.metron.elasticsearch.utils.ElasticsearchClient;
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
 import org.apache.metron.elasticsearch.utils.FieldMapping;
-import org.elasticsearch.action.ActionFuture;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequestBuilder;
-import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
-import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
-import org.elasticsearch.client.AdminClient;
-import org.elasticsearch.client.IndicesAdminClient;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.cluster.metadata.MappingMetaData;
-import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -42,7 +34,6 @@ import java.util.Map;
 import static org.junit.Assert.assertArrayEquals;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 /**
  * Tests the ElasticsearchColumnMetadata class.
@@ -72,7 +63,7 @@ public class ElasticsearchColumnMetadataDaoTest {
       }
 
       @Override
-      public Map<String, FieldMapping> getMappings(String[] indices) throws 
IOException {
+      public Map<String, FieldMapping> getMappingByIndex(String[] indices) 
throws IOException {
         return mappings;
       }
     };

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
index 2855bbc..6dc01a4 100644
--- 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
+++ 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
@@ -30,7 +30,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.metron.elasticsearch.utils.ElasticsearchClient;
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
 import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
 import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.dao.search.FieldType;
@@ -41,7 +41,6 @@ import org.apache.metron.indexing.dao.search.SortField;
 import org.apache.metron.indexing.dao.search.SortOrder;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.SearchHits;

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java
 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java
index 8cf39dd..7a84588 100644
--- 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java
+++ 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java
@@ -18,20 +18,17 @@
 
 package org.apache.metron.elasticsearch.dao;
 
-import org.apache.metron.elasticsearch.utils.ElasticsearchClient;
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
 import org.apache.metron.indexing.dao.search.InvalidSearchException;
-import org.elasticsearch.action.ActionFuture;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.search.ShardSearchFailure;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.search.SearchShardTarget;
 import org.junit.Test;
-import org.mockito.Mockito;
 
 import java.io.IOException;
 

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDaoTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDaoTest.java
 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDaoTest.java
index 3b48a60..3b7f132 100644
--- 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDaoTest.java
+++ 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDaoTest.java
@@ -18,30 +18,32 @@
 
 package org.apache.metron.elasticsearch.dao;
 
+import static org.mockito.Mockito.mock;
+
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
 import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.dao.UpdateDaoTest;
 import org.apache.metron.indexing.dao.update.UpdateDao;
-import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
 import org.junit.Before;
 
-import static org.mockito.Mockito.mock;
-
 /**
  * This class returns the ElasticsearchUpdateDao implementation to be used in 
UpdateDaoTest.  UpdateDaoTest contains a
  * common set of tests that all Dao implementations must pass.
  */
 public class ElasticsearchUpdateDaoTest extends UpdateDaoTest {
 
-  private TransportClient client;
   private AccessConfig accessConfig;
   private ElasticsearchRetrieveLatestDao retrieveLatestDao;
   private ElasticsearchUpdateDao updateDao;
 
   @Before
   public void setup() {
-    client = mock(TransportClient.class);
     accessConfig = new AccessConfig();
     retrieveLatestDao = mock(ElasticsearchRetrieveLatestDao.class);
+    RestHighLevelClient highLevel = mock(RestHighLevelClient.class);
+    ElasticsearchClient client = new 
ElasticsearchClient(mock(RestClient.class), highLevel);
     updateDao = new ElasticsearchUpdateDao(client, accessConfig, 
retrieveLatestDao);
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/8bf3b6ec/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
index 61dd0f6..d03da0e 100644
--- 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
+++ 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
@@ -18,15 +18,24 @@
 package org.apache.metron.elasticsearch.integration;
 
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+
 import java.io.File;
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import org.adrianwalker.multilinestring.Multiline;
+import org.apache.http.HttpEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.nio.entity.NStringEntity;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
+import org.apache.metron.elasticsearch.client.ElasticsearchClientFactory;
 import org.apache.metron.elasticsearch.dao.ElasticsearchDao;
 import 
org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent;
 import org.apache.metron.indexing.dao.AccessConfig;
@@ -39,11 +48,13 @@ import org.apache.metron.indexing.dao.search.SearchRequest;
 import org.apache.metron.indexing.dao.search.SearchResponse;
 import org.apache.metron.indexing.dao.search.SearchResult;
 import org.apache.metron.integration.InMemoryComponent;
-import org.apache.metron.integration.utils.TestUtils;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequestBuilder;
-import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
 import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
@@ -56,159 +67,40 @@ public class ElasticsearchSearchIntegrationTest extends 
SearchIntegrationTest {
 
   private static String indexDir = "target/elasticsearch_search";
   private static String dateFormat = "yyyy.MM.dd.HH";
-  private static final int MAX_RETRIES = 10;
-  private static final int SLEEP_MS = 500;
+  private static String broTemplatePath = 
"../../metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/bro_index.template";
+  private static String snortTemplatePath = 
"../../metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/snort_index.template";
+  protected static final String BRO_INDEX = "bro_index_2017.01.01.01";
+  protected static final String SNORT_INDEX = "snort_index_2017.01.01.02";
+  protected static Map<String, Object> globalConfig;
+  protected static RestClient lowLevelClient;
+  protected static RestHighLevelClient highLevelClient;
   protected static IndexDao dao;
 
-  /**
-   * {
-   * "bro_doc": {
-   *   "properties": {
-   *     "source:type": {
-   *        "type": "text",
-   *        "fielddata" : "true"
-   *     },
-   *     "guid" : {
-   *        "type" : "keyword"
-   *     },
-   *     "ip_src_addr": {
-   *        "type": "ip"
-   *     },
-   *     "ip_src_port": {
-   *        "type": "integer"
-   *     },
-   *     "long_field": {
-   *        "type": "long"
-   *     },
-   *     "timestamp": {
-   *        "type": "date",
-   *        "format": "epoch_millis"
-   *      },
-   *     "latitude" : {
-   *        "type": "float"
-   *      },
-   *     "score": {
-   *        "type": "double"
-   *     },
-   *     "is_alert": {
-   *        "type": "boolean"
-   *     },
-   *     "location_point": {
-   *        "type": "geo_point"
-   *     },
-   *     "bro_field": {
-   *        "type": "text",
-   *        "fielddata" : "true"
-   *     },
-   *     "ttl": {
-   *        "type": "text",
-   *        "fielddata" : "true"
-   *     },
-   *     "alert": {
-   *         "type": "nested"
-   *     }
-   *   }
-   *  }
-   * }
-   */
-  @Multiline
-  private static String broTypeMappings;
-
-  /**
-   * {
-   *  "snort_doc": {
-   *     "properties": {
-   *        "source:type": {
-   *          "type": "text",
-   *          "fielddata" : "true"
-   *        },
-   *        "guid" : {
-   *          "type" : "keyword"
-   *        },
-   *        "ip_src_addr": {
-   *          "type": "ip"
-   *        },
-   *        "ip_src_port": {
-   *          "type": "integer"
-   *        },
-   *        "long_field": {
-   *          "type": "long"
-   *        },
-   *        "timestamp": {
-   *          "type": "date",
-   *          "format": "epoch_millis"
-   *        },
-   *        "latitude" : {
-   *          "type": "float"
-   *        },
-   *        "score": {
-   *          "type": "double"
-   *        },
-   *        "is_alert": {
-   *          "type": "boolean"
-   *        },
-   *        "location_point": {
-   *          "type": "geo_point"
-   *        },
-   *        "snort_field": {
-   *          "type": "integer"
-   *        },
-   *        "ttl": {
-   *          "type": "integer"
-   *        },
-   *        "alert": {
-   *           "type": "nested"
-   *        },
-   *        "threat:triage:score": {
-   *           "type": "float"
-   *        }
-   *      }
-   *    }
-   * }
-   */
-  @Multiline
-  private static String snortTypeMappings;
-
-  /**
-   * {
-   * "bro_doc_default": {
-   *   "dynamic_templates": [{
-   *     "strings": {
-   *       "match_mapping_type": "string",
-   *       "mapping": {
-   *         "type": "text"
-   *       }
-   *     }
-   *   }]
-   *  }
-   * }
-   */
-  @Multiline
-  private static String broDefaultStringMappings;
-
   @BeforeClass
   public static void setup() throws Exception {
     indexComponent = startIndex();
-    dao = createDao();
+    globalConfig = new HashMap<String, Object>() {{
+      put("es.clustername", "metron");
+      put("es.port", "9200");
+      put("es.ip", "localhost");
+      put("es.date.format", dateFormat);
+    }};
+    ElasticsearchClient esClient = 
ElasticsearchClientFactory.create(globalConfig);
+    lowLevelClient = esClient.getLowLevelClient();
+    highLevelClient = esClient.getHighLevelClient();
+    dao = createDao(globalConfig);
     // The data is all static for searches, so we can set it up beforehand, 
and it's faster
     loadTestData();
   }
 
-  protected static IndexDao createDao() {
-    AccessConfig config = new AccessConfig();
-    config.setMaxSearchResults(100);
-    config.setMaxSearchGroups(100);
-    config.setGlobalConfigSupplier( () ->
-            new HashMap<String, Object>() {{
-              put("es.clustername", "metron");
-              put("es.port", "9200");
-              put("es.ip", "localhost");
-              put("es.date.format", dateFormat);
-            }}
-    );
+  protected static IndexDao createDao(Map<String, Object> globalConfig) {
+    AccessConfig accessConfig = new AccessConfig();
+    accessConfig.setMaxSearchResults(100);
+    accessConfig.setMaxSearchGroups(100);
+    accessConfig.setGlobalConfigSupplier(() -> globalConfig);
 
     IndexDao dao = new ElasticsearchDao();
-    dao.init(config);
+    dao.init(accessConfig);
     return dao;
   }
 
@@ -221,42 +113,80 @@ public class ElasticsearchSearchIntegrationTest extends 
SearchIntegrationTest {
     return es;
   }
 
-  protected static void loadTestData() throws ParseException {
-    ElasticSearchComponent es = (ElasticSearchComponent) indexComponent;
-    es.getClient().admin().indices().prepareCreate("bro_index_2017.01.01.01")
-        .addMapping("bro_doc", broTypeMappings)
-        .addMapping("bro_doc_default", broDefaultStringMappings).get();
-    es.getClient().admin().indices().prepareCreate("snort_index_2017.01.01.02")
-        .addMapping("snort_doc", snortTypeMappings).get();
+  protected static void loadTestData() throws ParseException, IOException {
+    // add bro template
+    JSONObject broTemplate = JSONUtils.INSTANCE.load(new 
File(broTemplatePath), JSONObject.class);
+    addTestFieldMappings(broTemplate, "bro_doc");
+    String broTemplateJson = JSONUtils.INSTANCE.toJSON(broTemplate, true);
+    HttpEntity broEntity = new NStringEntity(broTemplateJson, 
ContentType.APPLICATION_JSON);
+    Response response = lowLevelClient.performRequest("PUT", 
"/_template/bro_template", Collections.emptyMap(), broEntity);
+    assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
+    // add snort template
+    JSONObject snortTemplate = JSONUtils.INSTANCE.load(new 
File(snortTemplatePath), JSONObject.class);
+    addTestFieldMappings(snortTemplate, "snort_doc");
+    String snortTemplateJson = JSONUtils.INSTANCE.toJSON(snortTemplate, true);
+    HttpEntity snortEntity = new NStringEntity(snortTemplateJson, 
ContentType.APPLICATION_JSON);
+    response = lowLevelClient.performRequest("PUT", 
"/_template/snort_template", Collections.emptyMap(), snortEntity);
+    assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
+    // create bro index
+    response = lowLevelClient.performRequest("PUT", BRO_INDEX);
+    assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
+    // create snort index
+    response = lowLevelClient.performRequest("PUT", SNORT_INDEX);
+    assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
 
-    BulkRequestBuilder bulkRequest = es.getClient().prepareBulk()
-        .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
-    JSONArray broArray = (JSONArray) new JSONParser().parse(broData);
-    for (Object o : broArray) {
-      JSONObject jsonObject = (JSONObject) o;
-      IndexRequestBuilder indexRequestBuilder = es.getClient()
-          .prepareIndex("bro_index_2017.01.01.01", "bro_doc");
-      indexRequestBuilder = indexRequestBuilder.setId((String) 
jsonObject.get("guid"));
-      indexRequestBuilder = 
indexRequestBuilder.setSource(jsonObject.toJSONString());
-      indexRequestBuilder = indexRequestBuilder
-          .setTimestamp(jsonObject.get("timestamp").toString());
-      bulkRequest.add(indexRequestBuilder);
-    }
-    JSONArray snortArray = (JSONArray) new JSONParser().parse(snortData);
-    for (Object o : snortArray) {
-      JSONObject jsonObject = (JSONObject) o;
-      IndexRequestBuilder indexRequestBuilder = es.getClient()
-          .prepareIndex("snort_index_2017.01.01.02", "snort_doc");
-      indexRequestBuilder = indexRequestBuilder.setId((String) 
jsonObject.get("guid"));
-      indexRequestBuilder = 
indexRequestBuilder.setSource(jsonObject.toJSONString());
-      indexRequestBuilder = indexRequestBuilder
-          .setTimestamp(jsonObject.get("timestamp").toString());
-      bulkRequest.add(indexRequestBuilder);
+    JSONArray broRecords = (JSONArray) new JSONParser().parse(broData);
+
+    BulkRequest bulkRequest = new BulkRequest();
+    for (Object o : broRecords) {
+      JSONObject json = (JSONObject) o;
+      IndexRequest indexRequest = new IndexRequest(BRO_INDEX, "bro_doc", 
(String) json.get("guid"));
+      indexRequest.source(json);
+      indexRequest.timestamp(json.get("timestamp").toString());
+      bulkRequest.add(indexRequest);
     }
-    BulkResponse bulkResponse = bulkRequest.execute().actionGet();
-    if (bulkResponse.hasFailures()) {
-      throw new RuntimeException("Failed to index test data");
+    bulkRequest.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL);
+    BulkResponse bulkResponse = highLevelClient.bulk(bulkRequest);
+    assertFalse(bulkResponse.hasFailures());
+    assertThat(bulkResponse.status().getStatus(), equalTo(200));
+
+    JSONArray snortRecords = (JSONArray) new JSONParser().parse(snortData);
+
+    bulkRequest = new BulkRequest();
+    for (Object o : snortRecords) {
+      JSONObject json = (JSONObject) o;
+      IndexRequest indexRequest = new IndexRequest(SNORT_INDEX, "snort_doc", 
(String) json.get("guid"));
+      indexRequest.source(json);
+      indexRequest.timestamp(json.get("timestamp").toString());
+      bulkRequest.add(indexRequest);
     }
+    bulkRequest.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL);
+    bulkResponse = highLevelClient.bulk(bulkRequest);
+    assertFalse(bulkResponse.hasFailures());
+    assertThat(bulkResponse.status().getStatus(), equalTo(200));
+  }
+
+  /**
+   * Add test fields to a template with defined types in case they are not 
defined in the sensor template shipped with Metron.
+   * This is useful for testing certain cases, for example faceting on fields 
of various types.
+   * Template follows this pattern:
+   * { "mappings" : { "xxx_doc" : { "properties" : { ... }}}}
+   * @param template - this method has side effects - template is modified 
with field mappings.
+   * @param docType
+   */
+  private static void addTestFieldMappings(JSONObject template, String 
docType) {
+    Map mappings = (Map) template.get("mappings");
+    Map docTypeJSON = (Map) mappings.get(docType);
+    Map properties = (Map) docTypeJSON.get("properties");
+    Map<String, String> longType = new HashMap<>();
+    longType.put("type", "long");
+    properties.put("long_field", longType);
+    Map<String, String> floatType = new HashMap<>();
+    floatType.put("type", "float");
+    properties.put("latitude", floatType);
+    Map<String, String> doubleType = new HashMap<>();
+    doubleType.put("type", "double");
+    properties.put("score", doubleType);
   }
 
   @Test
@@ -267,20 +197,17 @@ public class ElasticsearchSearchIntegrationTest extends 
SearchIntegrationTest {
     dao.search(request);
   }
 
-
-
   @Override
   public void returns_column_metadata_for_specified_indices() throws Exception 
{
     // getColumnMetadata with only bro
     {
-      //TODO: It shouldn't require an assertEventually() here as it should be 
synchronous.
-      // Before merging, please figure out why.
-      TestUtils.assertEventually(() -> Assert.assertEquals(13, 
dao.getColumnMetadata(Collections.singletonList("bro")).size()));
+      Assert.assertEquals(262, 
dao.getColumnMetadata(Collections.singletonList("bro")).size());
       Map<String, FieldType> fieldTypes = 
dao.getColumnMetadata(Collections.singletonList("bro"));
-      Assert.assertEquals(FieldType.TEXT, fieldTypes.get("bro_field"));
-      Assert.assertEquals(FieldType.TEXT, fieldTypes.get("ttl"));
+      Assert.assertEquals(262, fieldTypes.size());
+      Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("method"));
+      Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("ttl"));
       Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("guid"));
-      Assert.assertEquals(FieldType.TEXT, fieldTypes.get("source:type"));
+      Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("source:type"));
       Assert.assertEquals(FieldType.IP, fieldTypes.get("ip_src_addr"));
       Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("ip_src_port"));
       Assert.assertEquals(FieldType.LONG, fieldTypes.get("long_field"));
@@ -288,21 +215,18 @@ public class ElasticsearchSearchIntegrationTest extends 
SearchIntegrationTest {
       Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("latitude"));
       Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("score"));
       Assert.assertEquals(FieldType.BOOLEAN, fieldTypes.get("is_alert"));
-      Assert.assertEquals(FieldType.OTHER, fieldTypes.get("location_point"));
-      Assert.assertEquals(FieldType.TEXT, fieldTypes.get("bro_field"));
-      Assert.assertEquals(FieldType.TEXT, fieldTypes.get("ttl"));
-      Assert.assertEquals(FieldType.OTHER, fieldTypes.get("alert"));
+      Assert.assertEquals(FieldType.TEXT, fieldTypes.get("location_point"));
+      Assert.assertEquals(FieldType.OTHER, fieldTypes.get("metron_alert"));
     }
     // getColumnMetadata with only snort
     {
-      //TODO: It shouldn't require an assertEventually() here as it should be 
synchronous.
-      // Before merging, please figure out why.
-      TestUtils.assertEventually(() -> Assert.assertEquals(14, 
dao.getColumnMetadata(Collections.singletonList("snort")).size()));
+      Assert.assertEquals(32, 
dao.getColumnMetadata(Collections.singletonList("snort")).size());
       Map<String, FieldType> fieldTypes = 
dao.getColumnMetadata(Collections.singletonList("snort"));
-      Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("snort_field"));
+      Assert.assertEquals(32, fieldTypes.size());
+      Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("sig_generator"));
       Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("ttl"));
       Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("guid"));
-      Assert.assertEquals(FieldType.TEXT, fieldTypes.get("source:type"));
+      Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("source:type"));
       Assert.assertEquals(FieldType.IP, fieldTypes.get("ip_src_addr"));
       Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("ip_src_port"));
       Assert.assertEquals(FieldType.LONG, fieldTypes.get("long_field"));
@@ -310,34 +234,41 @@ public class ElasticsearchSearchIntegrationTest extends 
SearchIntegrationTest {
       Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("latitude"));
       Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("score"));
       Assert.assertEquals(FieldType.BOOLEAN, fieldTypes.get("is_alert"));
-      Assert.assertEquals(FieldType.OTHER, fieldTypes.get("location_point"));
+      Assert.assertEquals(FieldType.TEXT, fieldTypes.get("location_point"));
       Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("ttl"));
-      Assert.assertEquals(FieldType.OTHER, fieldTypes.get("alert"));
+      Assert.assertEquals(FieldType.OTHER, fieldTypes.get("metron_alert"));
     }
   }
 
   @Override
   public void returns_column_data_for_multiple_indices() throws Exception {
-    //TODO: It shouldn't require an assertEventually() here as it should be 
synchronous.
-    // Before merging, please figure out why.
-    TestUtils.assertEventually(() -> Assert.assertEquals(15, 
dao.getColumnMetadata(Arrays.asList("bro", "snort")).size()));
+    Assert.assertEquals(277, dao.getColumnMetadata(Arrays.asList("bro", 
"snort")).size());
     Map<String, FieldType> fieldTypes = 
dao.getColumnMetadata(Arrays.asList("bro", "snort"));
+    Assert.assertEquals(277, fieldTypes.size());
+
+    // Ensure internal Metron fields are properly defined
     Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("guid"));
-    Assert.assertEquals(FieldType.TEXT, fieldTypes.get("source:type"));
+    Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("source:type"));
+    Assert.assertEquals(FieldType.FLOAT, 
fieldTypes.get("threat:triage:score"));
+    Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("alert_status"));
+    Assert.assertEquals(FieldType.OTHER, fieldTypes.get("metron_alert"));
+
     Assert.assertEquals(FieldType.IP, fieldTypes.get("ip_src_addr"));
     Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("ip_src_port"));
     Assert.assertEquals(FieldType.LONG, fieldTypes.get("long_field"));
     Assert.assertEquals(FieldType.DATE, fieldTypes.get("timestamp"));
     Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("latitude"));
     Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("score"));
+    Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("suppress_for"));
     Assert.assertEquals(FieldType.BOOLEAN, fieldTypes.get("is_alert"));
-    Assert.assertEquals(FieldType.OTHER, fieldTypes.get("location_point"));
-    Assert.assertEquals(FieldType.TEXT, fieldTypes.get("bro_field"));
-    Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("snort_field"));
-    //NOTE: This is because the field is in both bro and snort and they have 
different types.
+
+    // Ensure a field defined only in bro is included
+    Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("method"));
+    // Ensure a field defined only in snort is included
+    Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("sig_generator"));
+    // Ensure fields in both bro and snort have type OTHER because they have 
different types
     Assert.assertEquals(FieldType.OTHER, fieldTypes.get("ttl"));
-    Assert.assertEquals(FieldType.FLOAT, 
fieldTypes.get("threat:triage:score"));
-    Assert.assertEquals(FieldType.OTHER, fieldTypes.get("alert"));
+    Assert.assertEquals(FieldType.OTHER, fieldTypes.get("msg"));
   }
 
   @Test
@@ -372,9 +303,9 @@ public class ElasticsearchSearchIntegrationTest extends 
SearchIntegrationTest {
   @Override
   protected String getIndexName(String sensorType) {
     if ("bro".equals(sensorType)) {
-      return "bro_index_2017.01.01.01";
+      return BRO_INDEX;
     } else {
-      return "snort_index_2017.01.01.02";
+      return SNORT_INDEX;
     }
   }
 }

Reply via email to