METRON-1448: Update SolrWriter to conform to new collection strategy this 
closes apache/incubator-metron#929


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/8cc8aab8
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/8cc8aab8
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/8cc8aab8

Branch: refs/heads/master
Commit: 8cc8aab8b7c5fb103c4375f47d47754492547bb1
Parents: 9b25084
Author: cstella <ceste...@gmail.com>
Authored: Fri Feb 9 09:42:12 2018 -0500
Committer: cstella <ceste...@gmail.com>
Committed: Fri Feb 9 09:42:12 2018 -0500

----------------------------------------------------------------------
 metron-platform/metron-solr/README.md           |  44 ++++
 .../metron/solr/writer/MetronSolrClient.java    |  34 +++
 .../apache/metron/solr/writer/SolrWriter.java   | 225 ++++++++++++++-----
 .../src/main/scripts/start_solr_topology.sh     |   2 +-
 .../SolrIndexingIntegrationTest.java            |   4 +-
 .../schema/SchemaValidationIntegrationTest.java |  31 ++-
 .../metron/solr/writer/SolrWriterTest.java      | 213 +++++++++++++++---
 7 files changed, 442 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/8cc8aab8/metron-platform/metron-solr/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-solr/README.md 
b/metron-platform/metron-solr/README.md
index 0ec6972..164160e 100644
--- a/metron-platform/metron-solr/README.md
+++ b/metron-platform/metron-solr/README.md
@@ -26,6 +26,50 @@ limitations under the License.
 
 Metron ships with Solr 6.6.2 support. Solr Cloud can be used as the real-time 
portion of the datastore resulting from 
[metron-indexing](../metron-indexing/README.md).
 
+## Configuration
+
+### The Indexing Topology
+
+Solr is a viable option for the `random access topology` and, similar to the 
Elasticsearch Writer, can be configured
+via the global config.  The following settings are possible as part of the 
global config:
+* `solr.zookeeper`
+  * The zookeeper quorum associated with the SolrCloud instance.  This is a 
required field with no default.
+* `solr.commitPerBatch`
+  * This is a boolean which defines whether the writer commits every batch.  
The default is `true`.
+  * _WARNING_: If you set this to `false`, then commits will happen based on 
the SolrClient's internal mechanism and
+    worker failure *may* result data being acknowledged in storm but not 
written in Solr.
+* `solr.commit.soft`
+  * This is a boolean which defines whether the writer makes a soft commit or 
a durable commit.  See 
[here](https://lucene.apache.org/solr/guide/6_6/near-real-time-searching.html#NearRealTimeSearching-AutoCommits)
  The default is `false`.
+  * _WARNING_: If you set this to `true`, then commits will happen based on 
the SolrClient's internal mechanism and
+    worker failure *may* result data being acknowledged in storm but not 
written in Solr.
+* `solr.commit.waitSearcher`
+  * This is a boolean which defines whether the writer blocks the commit until 
the data is available to search.  See 
[here](https://lucene.apache.org/solr/guide/6_6/near-real-time-searching.html#NearRealTimeSearching-AutoCommits)
  The default is `true`.
+  * _WARNING_: If you set this to `false`, then commits will happen based on 
the SolrClient's internal mechanism and
+    worker failure *may* result data being acknowledged in storm but not 
written in Solr.
+* `solr.commit.waitFlush`
+  * This is a boolean which defines whether the writer blocks the commit until 
the data is flushed.  See 
[here](https://lucene.apache.org/solr/guide/6_6/near-real-time-searching.html#NearRealTimeSearching-AutoCommits)
  The default is `true`.
+  * _WARNING_: If you set this to `false`, then commits will happen based on 
the SolrClient's internal mechanism and
+    worker failure *may* result data being acknowledged in storm but not 
written in Solr.
+* `solr.collection`
+  * The default solr collection (if unspecified, the name is `metron`).  By 
default, sensors will write to a collection associated with the index name in 
the
+  indexing config for that sensor.  If that index name is the empty string, 
then the default collection will be used.
+* `solr.http.config`
+  * This is a map which allows users to configure the Solr client's HTTP 
client.
+  * Possible fields here are:
+    * `socketTimeout` : Socket timeout measured in ms, closes a socket if read 
takes longer than x ms to complete
+    throws `java.net.SocketTimeoutException: Read timed out exception`
+    * `connTimeout` : Connection timeout measures in ms, closes a socket if 
connection cannot be established within x ms
+    with a `java.net.SocketTimeoutException: Connection timed out`
+    * `maxConectionsPerHost` : Maximum connections allowed per host
+    * `maxConnections` :  Maximum total connections allowed
+    * `retry` : Retry http requests on error
+    * `allowCompression` :  Allow compression (deflate,gzip) if server 
supports it
+    * `followRedirects` : Follow redirects
+    * `httpBasicAuthUser` : Basic auth username
+    * `httpBasicAuthPassword` : Basic auth password
+    * `solr.ssl.checkPeerName` : Check peer name
+
+
 ## Installing
 
 A script is provided in the installation for installing Solr Cloud in 
quick-start mode in the [full dev environment for 
CentOS](../../metron-deployment/development/centos6).

http://git-wip-us.apache.org/repos/asf/metron/blob/8cc8aab8/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/MetronSolrClient.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/MetronSolrClient.java
 
b/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/MetronSolrClient.java
index d3ef36f..5c27cce 100644
--- 
a/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/MetronSolrClient.java
+++ 
b/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/MetronSolrClient.java
@@ -17,18 +17,22 @@
  */
 package org.apache.metron.solr.writer;
 
+import com.google.common.collect.Iterables;
 import org.apache.metron.solr.SolrConstants;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.HttpClientUtil;
 import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.common.params.CollectionParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.NamedList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 
 public class MetronSolrClient extends CloudSolrClient {
 
@@ -40,6 +44,36 @@ public class MetronSolrClient extends CloudSolrClient {
     super(zkHost);
   }
 
+  public MetronSolrClient(String zkHost, Map<String, Object> solrHttpConfig) {
+    super(zkHost, HttpClientUtil.createClient(toSolrProps(solrHttpConfig)));
+  }
+
+  public static SolrParams toSolrProps(Map<String, Object> config) {
+    if(config == null || config.isEmpty()) {
+      return null;
+    }
+
+    ModifiableSolrParams ret = new ModifiableSolrParams();
+    for(Map.Entry<String, Object> kv : config.entrySet()) {
+      Object v = kv.getValue();
+      if(v instanceof Boolean) {
+        ret.set(kv.getKey(), (Boolean)v);
+      }
+      else if(v instanceof Integer) {
+        ret.set(kv.getKey(), (Integer)v);
+      }
+      else if(v instanceof Iterable) {
+        Iterable vals = (Iterable)v;
+        String[] strVals = new String[Iterables.size(vals)];
+        int i = 0;
+        for(Object o : (Iterable)v) {
+          strVals[i++] = o.toString();
+        }
+      }
+    }
+    return ret;
+  }
+
   public void createCollection(String name, int numShards, int 
replicationFactor) throws IOException, SolrServerException {
     if (!listCollections().contains(name)) {
       request(getCreateCollectionsRequest(name, numShards, replicationFactor));

http://git-wip-us.apache.org/repos/asf/metron/blob/8cc8aab8/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java
 
b/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java
index 923a8dd..e2659e9 100644
--- 
a/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java
+++ 
b/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java
@@ -17,10 +17,14 @@
  */
 package org.apache.metron.solr.writer;
 
+import com.google.common.base.Joiner;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.metron.common.Constants;
+import org.apache.metron.stellar.common.utils.ConversionUtils;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.common.SolrException;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Tuple;
-import org.apache.metron.common.configuration.Configurations;
-import org.apache.metron.common.configuration.EnrichmentConfigurations;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.writer.BulkMessageWriter;
 import org.apache.metron.common.writer.BulkWriterResponse;
@@ -33,101 +37,206 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.List;
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.function.Supplier;
 
 public class SolrWriter implements BulkMessageWriter<JSONObject>, Serializable 
{
 
-  public static final String DEFAULT_COLLECTION = "metron";
 
-  private static final Logger LOG = LoggerFactory
-          .getLogger(SolrWriter.class);
+  public enum SolrProperties {
+    ZOOKEEPER_QUORUM("solr.zookeeper"),
+    COMMIT_PER_BATCH("solr.commitPerBatch", Optional.of(true)),
+    COMMIT_WAIT_SEARCHER("solr.commit.waitSearcher", Optional.of(true)),
+    COMMIT_WAIT_FLUSH("solr.commit.waitFlush", Optional.of(true)),
+    COMMIT_SOFT("solr.commit.soft", Optional.of(false)),
+    DEFAULT_COLLECTION("solr.collection", Optional.of("metron")),
+    HTTP_CONFIG("solr.http.config", Optional.of(new HashMap<>()))
+    ;
+    String name;
+    Optional<Object> defaultValue;
+
+    SolrProperties(String name) {
+      this(name, Optional.empty());
+    }
+    SolrProperties(String name, Optional<Object> defaultValue) {
+      this.name = name;
+      this.defaultValue = defaultValue;
+    }
 
-  private boolean shouldCommit = false;
-  private MetronSolrClient solr;
+    public <T> Optional<T> coerceOrDefault(Map<String, Object> globalConfig, 
Class<T> clazz) {
+      Object val = globalConfig.get(name);
+      if(val != null) {
+        T ret = null;
+        try {
+          ret = ConversionUtils.convert(val, clazz);
+        }
+        catch(ClassCastException cce) {
+          ret = null;
+        }
+        if(ret == null) {
+          //unable to convert value
+          LOG.warn("Unable to convert {} to {}, was {}", name, 
clazz.getName(), "" + val);
+          if(defaultValue.isPresent()) {
+            return 
Optional.ofNullable(ConversionUtils.convert(defaultValue.get(), clazz));
+          }
+          else {
+            return Optional.empty();
+          }
+        }
+        else {
+          return Optional.ofNullable(ret);
+        }
+      }
+      else {
+        if(defaultValue.isPresent()) {
+          return 
Optional.ofNullable(ConversionUtils.convert(defaultValue.get(), clazz));
+        }
+        else {
+          return Optional.empty();
+        }
+      }
+    }
+
+    public Supplier<IllegalArgumentException> errorOut(Map<String, Object> 
globalConfig) {
+      String message = "Unable to retrieve " + name + " from global config, 
value associated is " + globalConfig.get(name);
+      return () -> new IllegalArgumentException(message);
+    }
+
+    public <T> T coerceOrDefaultOrExcept(Map<String, Object> globalConfig, 
Class<T> clazz) {
+         return this.coerceOrDefault(globalConfig, 
clazz).orElseThrow(this.errorOut(globalConfig));
+    }
 
-  public SolrWriter withShouldCommit(boolean shouldCommit) {
-    this.shouldCommit = shouldCommit;
-    return this;
   }
 
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private Boolean shouldCommit;
+  private Boolean softCommit;
+  private Boolean waitSearcher;
+  private Boolean waitFlush;
+  private String zookeeperUrl;
+  private String defaultCollection;
+  private Map<String, Object> solrHttpConfig;
+
+  private MetronSolrClient solr;
+
   public SolrWriter withMetronSolrClient(MetronSolrClient solr) {
     this.solr = solr;
     return this;
   }
 
+  public void initializeFromGlobalConfig(Map<String, Object> 
globalConfiguration) {
+    zookeeperUrl = 
SolrProperties.ZOOKEEPER_QUORUM.coerceOrDefaultOrExcept(globalConfiguration, 
String.class);
+    defaultCollection = 
SolrProperties.DEFAULT_COLLECTION.coerceOrDefaultOrExcept(globalConfiguration, 
String.class);
+    solrHttpConfig = 
SolrProperties.HTTP_CONFIG.coerceOrDefaultOrExcept(globalConfiguration, 
Map.class);
+    shouldCommit = 
SolrProperties.COMMIT_PER_BATCH.coerceOrDefaultOrExcept(globalConfiguration, 
Boolean.class);
+    softCommit = 
SolrProperties.COMMIT_SOFT.coerceOrDefaultOrExcept(globalConfiguration, 
Boolean.class);
+    waitSearcher = 
SolrProperties.COMMIT_WAIT_SEARCHER.coerceOrDefaultOrExcept(globalConfiguration,
 Boolean.class);
+    waitFlush = 
SolrProperties.COMMIT_WAIT_FLUSH.coerceOrDefaultOrExcept(globalConfiguration, 
Boolean.class);
+  }
+
   @Override
   public void init(Map stormConf, TopologyContext topologyContext, 
WriterConfiguration configurations) throws IOException, SolrServerException {
     Map<String, Object> globalConfiguration = configurations.getGlobalConfig();
-    if(solr == null) solr = new MetronSolrClient((String) 
globalConfiguration.get("solr.zookeeper"));
-    String collection = getCollection(configurations);
-    solr.createCollection(collection, (Integer) 
globalConfiguration.get("solr.numShards"), (Integer) 
globalConfiguration.get("solr.replicationFactor"));
-    solr.setDefaultCollection(collection);
+    initializeFromGlobalConfig(globalConfiguration);
+    LOG.info("Initializing SOLR writer: {}", zookeeperUrl);
+    LOG.info("Forcing commit per batch: {}", shouldCommit);
+    LOG.info("Soft commit: {}", softCommit);
+    LOG.info("Commit Wait Searcher: {}", waitSearcher);
+    LOG.info("Commit Wait Flush: {}", waitFlush);
+    LOG.info("Default Collection: {}", "" + defaultCollection );
+    if(solr == null) {
+      solr = new MetronSolrClient(zookeeperUrl, solrHttpConfig);
+    }
+    solr.setDefaultCollection(defaultCollection);
+
   }
 
-  @Override
-  public BulkWriterResponse write(String sourceType, WriterConfiguration 
configurations, Iterable<Tuple> tuples, List<JSONObject> messages) throws 
Exception {
+  public Collection<SolrInputDocument> toDocs(Iterable<JSONObject> messages) {
+    Collection<SolrInputDocument> ret = new ArrayList<>();
     for(JSONObject message: messages) {
       SolrInputDocument document = new SolrInputDocument();
-      document.addField("sensorType", sourceType);
-      for(Object key: message.keySet()) {
+      for (Object key : message.keySet()) {
         Object value = message.get(key);
-        if(value instanceof Iterable) {
-          for(Object v : (Iterable)value) {
-            document.addField(getFieldName(key, v), v);
+        if (value instanceof Iterable) {
+          for (Object v : (Iterable) value) {
+            document.addField("" + key, v);
           }
-        }
-        else {
-          document.addField(getFieldName(key, value), value);
+        } else {
+          document.addField("" + key, value);
         }
       }
-      if(!document.containsKey("id")) {
-        document.addField("id", getIdValue(message));
+      if (!document.containsKey(Constants.GUID)) {
+        document.addField(Constants.GUID, UUID.randomUUID().toString());
       }
-      UpdateResponse response = solr.add(document);
-    }
-    if (shouldCommit) {
-      solr.commit(getCollection(configurations));
+      ret.add(document);
     }
+    return ret;
+  }
 
-    // Solr commits the entire batch or throws an exception for it.  There's 
no way to get partial failures.
-    BulkWriterResponse response = new BulkWriterResponse();
-    response.addAllSuccesses(tuples);
-    return response;
+  protected String getCollection(String sourceType, WriterConfiguration 
configurations) {
+    String collection = configurations.getIndex(sourceType);
+    if(StringUtils.isEmpty(collection)) {
+      return solr.getDefaultCollection();
+    }
+    return collection;
   }
 
   @Override
-  public String getName() {
-    return "solr";
-  }
+  public BulkWriterResponse write(String sourceType, WriterConfiguration 
configurations, Iterable<Tuple> tuples, List<JSONObject> messages) throws 
Exception {
+    String collection = getCollection(sourceType, configurations);
+    BulkWriterResponse bulkResponse = new BulkWriterResponse();
+    Collection<SolrInputDocument> docs = toDocs(messages);
+    try {
+      Optional<SolrException> exceptionOptional = 
fromUpdateResponse(solr.add(collection, docs));
+      // Solr commits the entire batch or throws an exception for it.  There's 
no way to get partial failures.
+      if(exceptionOptional.isPresent()) {
+        bulkResponse.addAllErrors(exceptionOptional.get(), tuples);
+      }
+      else {
+        if (shouldCommit) {
+          exceptionOptional = fromUpdateResponse(solr.commit(collection, 
waitFlush, waitSearcher, softCommit));
+          if(exceptionOptional.isPresent()) {
+            bulkResponse.addAllErrors(exceptionOptional.get(), tuples);
+          }
+        }
+        if(!exceptionOptional.isPresent()) {
+          bulkResponse.addAllSuccesses(tuples);
+        }
+      }
+    }
+    catch(HttpSolrClient.RemoteSolrException sse) {
+      bulkResponse.addAllErrors(sse, tuples);
+    }
 
-  protected String getCollection(WriterConfiguration configurations) {
-    String collection = (String) 
configurations.getGlobalConfig().get("solr.collection");
-    return collection != null ? collection : DEFAULT_COLLECTION;
+    return bulkResponse;
   }
 
-  protected Object getIdValue(JSONObject message) {
-    return message.toJSONString().hashCode();
+  protected Optional<SolrException> fromUpdateResponse(UpdateResponse 
response) {
+    if(response != null && response.getStatus() > 0) {
+      String message = "Solr Update response: " + 
Joiner.on(",").join(response.getResponse());
+      return Optional.of(new 
SolrException(SolrException.ErrorCode.BAD_REQUEST, message));
+    }
+    return Optional.empty();
   }
 
-  protected String getFieldName(Object key, Object value) {
-    String field;
-    if (value instanceof Integer) {
-      field = key + "_i";
-    } else if (value instanceof Long) {
-      field = key + "_l";
-    } else if (value instanceof Float) {
-      field = key + "_f";
-    } else if (value instanceof Double) {
-      field = key + "_d";
-    } else {
-      field = key + "_s";
-    }
-    return field;
+  @Override
+  public String getName() {
+    return "solr";
   }
 
   @Override
   public void close() throws Exception {
-    solr.close();
+    if(solr != null) {
+      solr.close();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/8cc8aab8/metron-platform/metron-solr/src/main/scripts/start_solr_topology.sh
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-solr/src/main/scripts/start_solr_topology.sh 
b/metron-platform/metron-solr/src/main/scripts/start_solr_topology.sh
index cae0c3c..614423e 100755
--- a/metron-platform/metron-solr/src/main/scripts/start_solr_topology.sh
+++ b/metron-platform/metron-solr/src/main/scripts/start_solr_topology.sh
@@ -19,4 +19,4 @@
 METRON_VERSION=${project.version}
 METRON_HOME=/usr/metron/$METRON_VERSION
 TOPOLOGY_JAR=${project.artifactId}-$METRON_VERSION-uber.jar
-storm jar $METRON_HOME/lib/$TOPOLOGY_JAR org.apache.storm.flux.Flux --remote 
$METRON_HOME/flux/indexing/remote.yaml --filter 
$METRON_HOME/config/solr.properties
+storm jar $METRON_HOME/lib/$TOPOLOGY_JAR org.apache.storm.flux.Flux --remote 
$METRON_HOME/flux/indexing/random_access/remote.yaml --filter 
$METRON_HOME/config/solr.properties

http://git-wip-us.apache.org/repos/asf/metron/blob/8cc8aab8/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java
 
b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java
index 09e88a4..10239f1 100644
--- 
a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java
+++ 
b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java
@@ -39,7 +39,7 @@ import 
org.apache.metron.solr.integration.components.SolrComponent;
 
 public class SolrIndexingIntegrationTest extends IndexingIntegrationTest {
 
-  private String collection = "metron";
+  private String collection = "yaf";
   private FieldNameConverter fieldNameConverter = fieldName -> fieldName;
   @Override
   public FieldNameConverter getFieldNameConverter() {
@@ -49,7 +49,7 @@ public class SolrIndexingIntegrationTest extends 
IndexingIntegrationTest {
   @Override
   public InMemoryComponent getSearchComponent(final Properties 
topologyProperties) throws Exception {
     SolrComponent solrComponent = new SolrComponent.Builder()
-            .addCollection(collection, 
"../metron-solr/src/test/resources/solr/conf")
+            .addCollection(collection, 
"../metron-solr/src/main/config/schema/yaf")
             .withPostStartCallback(new Function<SolrComponent, Void>() {
               @Nullable
               @Override

http://git-wip-us.apache.org/repos/asf/metron/blob/8cc8aab8/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/schema/SchemaValidationIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/schema/SchemaValidationIntegrationTest.java
 
b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/schema/SchemaValidationIntegrationTest.java
index 075fdda..e655428 100644
--- 
a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/schema/SchemaValidationIntegrationTest.java
+++ 
b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/schema/SchemaValidationIntegrationTest.java
@@ -21,18 +21,22 @@ import com.google.common.collect.Iterables;
 import com.google.common.io.Files;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.common.writer.BulkWriterResponse;
 import org.apache.metron.solr.integration.components.SolrComponent;
 import org.apache.metron.solr.writer.SolrWriter;
 import org.apache.metron.stellar.common.utils.ConversionUtils;
+import org.apache.storm.tuple.Tuple;
 import org.json.simple.JSONObject;
 import org.junit.Assert;
 import org.junit.Test;
-
+import org.mockito.Mock;
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.*;
 
+import static org.mockito.Mockito.mock;
+
 public class SchemaValidationIntegrationTest {
 
   public static Iterable<String> getData(String sensor) throws IOException {
@@ -45,15 +49,12 @@ public class SchemaValidationIntegrationTest {
   public static Map<String, Object> getGlobalConfig(String sensorType, 
SolrComponent component) {
     Map<String, Object> globalConfig = new HashMap<>();
     globalConfig.put("solr.zookeeper", component.getZookeeperUrl());
-    globalConfig.put("solr.collection", sensorType + "_doc");
-    globalConfig.put("solr.numShards", 1);
-    globalConfig.put("solr.replicationFactor", 1);
     return globalConfig;
   }
 
   public static SolrComponent createSolrComponent(String sensor) throws 
Exception {
     return new SolrComponent.Builder()
-            .addCollection(String.format("%s_doc", sensor), 
String.format("src/main/config/schema/%s", sensor))
+            .addCollection(String.format("%s", sensor), 
String.format("src/main/config/schema/%s", sensor))
             .build();
   }
 
@@ -94,9 +95,12 @@ public class SchemaValidationIntegrationTest {
       Map<String, Object> globalConfig = getGlobalConfig(sensorType, 
component);
 
       List<JSONObject> inputs = new ArrayList<>();
+      List<Tuple> tuples = new ArrayList<>();
       Map<String, Map<String, Object>> index = new HashMap<>();
       for (String message : getData(sensorType)) {
         if (message.trim().length() > 0) {
+          Tuple t = mock(Tuple.class);
+          tuples.add(t);
           Map<String, Object> m = JSONUtils.INSTANCE.load(message.trim(), 
JSONUtils.MAP_SUPPLIER);
           String guid = getGuid(m);
           index.put(guid, m);
@@ -105,18 +109,8 @@ public class SchemaValidationIntegrationTest {
       }
       Assert.assertTrue(inputs.size() > 0);
 
-      SolrWriter solrWriter = new SolrWriter() {
-        @Override
-        protected String getFieldName(Object key, Object value) {
-          return "" + key;
-        }
-
-        @Override
-        protected Object getIdValue(JSONObject message) {
-          return message.get("guid");
-        }
+      SolrWriter solrWriter = new SolrWriter();
 
-      };
       WriterConfiguration writerConfig = new WriterConfiguration() {
         @Override
         public int getBatchSize(String sensorName) {
@@ -165,8 +159,9 @@ public class SchemaValidationIntegrationTest {
 
       solrWriter.init(null, null, writerConfig);
 
-      solrWriter.write(sensorType, writerConfig, new ArrayList<>(), inputs);
-      for (Map<String, Object> m : component.getAllIndexedDocs(sensorType + 
"_doc")) {
+      BulkWriterResponse response = solrWriter.write(sensorType, writerConfig, 
tuples, inputs);
+      Assert.assertTrue(response.getErrors().isEmpty());
+      for (Map<String, Object> m : component.getAllIndexedDocs(sensorType)) {
         Map<String, Object> expected = index.get(getGuid(m));
         for (Map.Entry<String, Object> field : expected.entrySet()) {
           if (field.getValue() instanceof Collection && ((Collection) 
field.getValue()).size() == 0) {

http://git-wip-us.apache.org/repos/asf/metron/blob/8cc8aab8/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java
 
b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java
index a56916f..685c5fd 100644
--- 
a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java
+++ 
b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java
@@ -17,6 +17,10 @@
  */
 package org.apache.metron.solr.writer;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.EnrichmentConfigurations;
 import org.apache.metron.common.configuration.IndexingConfigurations;
 import 
org.apache.metron.common.configuration.writer.IndexingWriterConfiguration;
@@ -25,14 +29,18 @@ import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.common.SolrInputDocument;
 import org.hamcrest.Description;
 import org.json.simple.JSONObject;
+import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.ArgumentMatcher;
 import org.mockito.Mockito;
 
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.argThat;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -60,33 +68,37 @@ public class SolrWriterTest {
     }
   }
 
-  static class SolrInputDocumentMatcher extends 
ArgumentMatcher<SolrInputDocument> {
+  static class SolrInputDocumentMatcher extends 
ArgumentMatcher<Collection<SolrInputDocument>> {
 
-    private int expectedId;
-    private String expectedSourceType;
-    private int expectedInt;
-    private double expectedDouble;
 
-    public SolrInputDocumentMatcher(int expectedId, String expectedSourceType, 
int expectedInt, double expectedDouble) {
-      this.expectedId = expectedId;
-      this.expectedSourceType = expectedSourceType;
-      this.expectedInt = expectedInt;
-      this.expectedDouble = expectedDouble;
+    List<Map<String, Object>> expectedDocs;
+
+    public SolrInputDocumentMatcher(List<Map<String, Object>> expectedDocs) {
+      this.expectedDocs = expectedDocs;
     }
 
     @Override
     public boolean matches(Object o) {
-      SolrInputDocument solrInputDocument = (SolrInputDocument) o;
-      int actualId = (Integer) solrInputDocument.get("id").getValue();
-      String actualName = (String) 
solrInputDocument.get("sensorType").getValue();
-      int actualInt = (Integer) solrInputDocument.get("intField_i").getValue();
-      double actualDouble = (Double) 
solrInputDocument.get("doubleField_d").getValue();
-      return expectedId == actualId && expectedSourceType.equals(actualName) 
&& expectedInt == actualInt && expectedDouble == actualDouble;
+      List<SolrInputDocument> docs = (List<SolrInputDocument>)o;
+      int size = docs.size();
+      if(size != expectedDocs.size()) {
+        return false;
+      }
+      for(int i = 0; i < size;++i) {
+        SolrInputDocument doc = docs.get(i);
+        Map<String, Object> expectedDoc = expectedDocs.get(i);
+        for(Map.Entry<String, Object> expectedKv : expectedDoc.entrySet()) {
+          
if(!expectedKv.getValue().equals(doc.get(expectedKv.getKey()).getValue())) {
+            return false;
+          }
+        }
+      }
+      return true;
     }
 
     @Override
     public void describeTo(Description description) {
-      description.appendText(String.format("fields: [id=%d, doubleField_d=%f, 
name=%s, intField_i=%d]", expectedId, expectedDouble, expectedSourceType, 
expectedInt));
+      description.appendText(expectedDocs.toString());
     }
 
   }
@@ -95,9 +107,13 @@ public class SolrWriterTest {
   public void testWriter() throws Exception {
     IndexingConfigurations configurations = 
SampleUtil.getSampleIndexingConfigs();
     JSONObject message1 = new JSONObject();
+    message1.put(Constants.GUID, "guid-1");
+    message1.put(Constants.SENSOR_TYPE, "test");
     message1.put("intField", 100);
     message1.put("doubleField", 100.0);
     JSONObject message2 = new JSONObject();
+    message2.put(Constants.GUID, "guid-2");
+    message2.put(Constants.SENSOR_TYPE, "test");
     message2.put("intField", 200);
     message2.put("doubleField", 200.0);
     List<JSONObject> messages = new ArrayList<>();
@@ -108,33 +124,166 @@ public class SolrWriterTest {
     MetronSolrClient solr = Mockito.mock(MetronSolrClient.class);
     SolrWriter writer = new SolrWriter().withMetronSolrClient(solr);
     writer.init(null, null,new IndexingWriterConfiguration("solr", 
configurations));
-    verify(solr, times(1)).createCollection(collection, 1, 1);
     verify(solr, times(1)).setDefaultCollection(collection);
 
     collection = "metron2";
-    int numShards = 4;
-    int replicationFactor = 2;
     Map<String, Object> globalConfig = configurations.getGlobalConfig();
     globalConfig.put("solr.collection", collection);
-    globalConfig.put("solr.numShards", numShards);
-    globalConfig.put("solr.replicationFactor", replicationFactor);
     configurations.updateGlobalConfig(globalConfig);
     writer = new SolrWriter().withMetronSolrClient(solr);
     writer.init(null, null, new IndexingWriterConfiguration("solr", 
configurations));
-    verify(solr, times(1)).createCollection(collection, numShards, 
replicationFactor);
     verify(solr, times(1)).setDefaultCollection(collection);
 
     writer.write("test", new IndexingWriterConfiguration("solr", 
configurations), new ArrayList<>(), messages);
-    verify(solr, times(1)).add(argThat(new 
SolrInputDocumentMatcher(message1.toJSONString().hashCode(), "test", 100, 
100.0)));
-    verify(solr, times(1)).add(argThat(new 
SolrInputDocumentMatcher(message2.toJSONString().hashCode(), "test", 200, 
200.0)));
-    verify(solr, times(0)).commit(collection);
+    verify(solr, times(1)).add(eq("yaf"), argThat(new 
SolrInputDocumentMatcher(ImmutableList.of(message1, message2))));
+    verify(solr, times(1)).commit("yaf"
+                                 , 
(boolean)SolrWriter.SolrProperties.COMMIT_WAIT_FLUSH.defaultValue.get()
+                                 , 
(boolean)SolrWriter.SolrProperties.COMMIT_WAIT_SEARCHER.defaultValue.get()
+                                 , 
(boolean)SolrWriter.SolrProperties.COMMIT_SOFT.defaultValue.get()
+                                 );
 
-    writer = new 
SolrWriter().withMetronSolrClient(solr).withShouldCommit(true);
-    writer.init(null, null, new IndexingWriterConfiguration("solr", 
configurations));
-    writer.write("test", new IndexingWriterConfiguration("solr", 
configurations), new ArrayList<>(), messages);
-    verify(solr, times(2)).add(argThat(new 
SolrInputDocumentMatcher(message1.toJSONString().hashCode(), "test", 100, 
100.0)));
-    verify(solr, times(2)).add(argThat(new 
SolrInputDocumentMatcher(message2.toJSONString().hashCode(), "test", 200, 
200.0)));
-    verify(solr, times(1)).commit(collection);
+  }
+
+  @Test
+  public void configTest_zookeeperQuorumSpecified() throws Exception {
+    String expected = "test";
+    Assert.assertEquals(expected,
+            SolrWriter.SolrProperties.ZOOKEEPER_QUORUM.coerceOrDefaultOrExcept(
+                    ImmutableMap.of( 
SolrWriter.SolrProperties.ZOOKEEPER_QUORUM.name, expected)
+                    , String.class));
+  }
+
+  @Test(expected=IllegalArgumentException.class)
+  public void configTest_zookeeperQuorumUnpecified() throws Exception {
+    SolrWriter.SolrProperties.ZOOKEEPER_QUORUM.coerceOrDefaultOrExcept(
+                    new HashMap<>()
+                    , String.class);
+  }
 
+
+  @Test
+  public void configTest_commitPerBatchSpecified() throws Exception {
+    Object expected = false;
+    Assert.assertEquals(expected,
+            SolrWriter.SolrProperties.COMMIT_PER_BATCH.coerceOrDefaultOrExcept(
+                    ImmutableMap.of( 
SolrWriter.SolrProperties.COMMIT_PER_BATCH.name, false)
+                    , Boolean.class));
+  }
+
+  @Test
+  public void configTest_commitPerBatchUnpecified() throws Exception {
+    
Assert.assertEquals(SolrWriter.SolrProperties.COMMIT_PER_BATCH.defaultValue.get(),
+    SolrWriter.SolrProperties.COMMIT_PER_BATCH.coerceOrDefaultOrExcept(
+                    new HashMap<>()
+                    , Boolean.class));
+    
Assert.assertEquals(SolrWriter.SolrProperties.COMMIT_PER_BATCH.defaultValue.get(),
+    SolrWriter.SolrProperties.COMMIT_PER_BATCH.coerceOrDefaultOrExcept(
+                    ImmutableMap.of( 
SolrWriter.SolrProperties.COMMIT_PER_BATCH.name, new DummyClass())
+                    , Boolean.class));
+  }
+
+  @Test
+  public void configTest_commitSoftSpecified() throws Exception {
+    Object expected = true;
+    Assert.assertEquals(expected,
+            SolrWriter.SolrProperties.COMMIT_SOFT.coerceOrDefaultOrExcept(
+                    ImmutableMap.of( 
SolrWriter.SolrProperties.COMMIT_SOFT.name, expected)
+                    , Boolean.class));
+  }
+
+  @Test
+  public void configTest_commitSoftUnpecified() throws Exception {
+    
Assert.assertEquals(SolrWriter.SolrProperties.COMMIT_SOFT.defaultValue.get(),
+    SolrWriter.SolrProperties.COMMIT_SOFT.coerceOrDefaultOrExcept(
+                    new HashMap<>()
+                    , Boolean.class));
+    
Assert.assertEquals(SolrWriter.SolrProperties.COMMIT_SOFT.defaultValue.get(),
+    SolrWriter.SolrProperties.COMMIT_SOFT.coerceOrDefaultOrExcept(
+                    ImmutableMap.of( 
SolrWriter.SolrProperties.COMMIT_SOFT.name, new DummyClass())
+                    , Boolean.class));
+  }
+
+  @Test
+  public void configTest_commitWaitFlushSpecified() throws Exception {
+    Object expected = false;
+    Assert.assertEquals(expected,
+            
SolrWriter.SolrProperties.COMMIT_WAIT_FLUSH.coerceOrDefaultOrExcept(
+                    ImmutableMap.of( 
SolrWriter.SolrProperties.COMMIT_WAIT_FLUSH.name, expected)
+                    , Boolean.class));
   }
+
+  @Test
+  public void configTest_commitWaitFlushUnspecified() throws Exception {
+    
Assert.assertEquals(SolrWriter.SolrProperties.COMMIT_WAIT_FLUSH.defaultValue.get(),
+    SolrWriter.SolrProperties.COMMIT_WAIT_FLUSH.coerceOrDefaultOrExcept(
+                    new HashMap<>()
+                    , Boolean.class));
+    
Assert.assertEquals(SolrWriter.SolrProperties.COMMIT_WAIT_FLUSH.defaultValue.get(),
+    SolrWriter.SolrProperties.COMMIT_WAIT_FLUSH.coerceOrDefaultOrExcept(
+                    ImmutableMap.of( 
SolrWriter.SolrProperties.COMMIT_WAIT_FLUSH.name, new DummyClass())
+                    , Boolean.class));
+  }
+
+  @Test
+  public void configTest_commitWaitSearcherSpecified() throws Exception {
+    Object expected = false;
+    Assert.assertEquals(expected,
+            
SolrWriter.SolrProperties.COMMIT_WAIT_SEARCHER.coerceOrDefaultOrExcept(
+                    ImmutableMap.of( 
SolrWriter.SolrProperties.COMMIT_WAIT_SEARCHER.name, expected)
+                    , Boolean.class));
+  }
+
+  @Test
+  public void configTest_commitWaitSearcherUnspecified() throws Exception {
+    
Assert.assertEquals(SolrWriter.SolrProperties.COMMIT_WAIT_SEARCHER.defaultValue.get(),
+    SolrWriter.SolrProperties.COMMIT_WAIT_SEARCHER.coerceOrDefaultOrExcept(
+                    new HashMap<>()
+                    , Boolean.class));
+    
Assert.assertEquals(SolrWriter.SolrProperties.COMMIT_WAIT_SEARCHER.defaultValue.get(),
+    SolrWriter.SolrProperties.COMMIT_WAIT_SEARCHER.coerceOrDefaultOrExcept(
+                    ImmutableMap.of( 
SolrWriter.SolrProperties.COMMIT_WAIT_SEARCHER.name, new DummyClass())
+                    , Boolean.class));
+  }
+
+  @Test
+  public void configTest_defaultCollectionSpecified() throws Exception {
+    Object expected = "mycollection";
+    Assert.assertEquals(expected,
+            
SolrWriter.SolrProperties.DEFAULT_COLLECTION.coerceOrDefaultOrExcept(
+                    ImmutableMap.of( 
SolrWriter.SolrProperties.DEFAULT_COLLECTION.name, expected)
+                    , String.class));
+  }
+
+  @Test
+  public void configTest_defaultCollectionUnspecified() throws Exception {
+    
Assert.assertEquals(SolrWriter.SolrProperties.DEFAULT_COLLECTION.defaultValue.get(),
+    SolrWriter.SolrProperties.DEFAULT_COLLECTION.coerceOrDefaultOrExcept(
+                    new HashMap<>()
+                    , String.class));
+  }
+
+  @Test
+  public void configTest_httpConfigSpecified() throws Exception {
+    Object expected = new HashMap<String, Object>() {{
+      put("name", "metron");
+    }};
+    Assert.assertEquals(expected,
+            SolrWriter.SolrProperties.HTTP_CONFIG.coerceOrDefaultOrExcept(
+                    ImmutableMap.of( 
SolrWriter.SolrProperties.HTTP_CONFIG.name, expected)
+                    , Map.class));
+  }
+
+  @Test
+  public void configTest_httpConfigUnspecified() throws Exception {
+    
Assert.assertEquals(SolrWriter.SolrProperties.HTTP_CONFIG.defaultValue.get(),
+    SolrWriter.SolrProperties.HTTP_CONFIG.coerceOrDefaultOrExcept(
+                    new HashMap<>()
+                    , Map.class));
+    
Assert.assertEquals(SolrWriter.SolrProperties.HTTP_CONFIG.defaultValue.get(),
+    SolrWriter.SolrProperties.HTTP_CONFIG.coerceOrDefaultOrExcept(
+                    ImmutableMap.of( 
SolrWriter.SolrProperties.HTTP_CONFIG.name, new DummyClass())
+                    , Map.class));
+  }
+
+  public static class DummyClass {}
 }

Reply via email to