[GitHub] metron pull request #940: METRON-1460: Create a complementary non-split-join...

2018-03-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/metron/pull/940


---


[GitHub] metron pull request #940: METRON-1460: Create a complementary non-split-join...

2018-03-06 Thread cestella
Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/940#discussion_r172711543
  
--- Diff: 
metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/ParallelEnricher.java
 ---
@@ -0,0 +1,281 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.parallel;
+
+import com.github.benmanes.caffeine.cache.stats.CacheStats;
+import org.apache.metron.common.Constants;
+import 
org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import 
org.apache.metron.common.configuration.enrichment.handler.ConfigHandler;
+import org.apache.metron.common.performance.PerformanceLogger;
+import org.apache.metron.common.utils.MessageUtils;
+import org.apache.metron.enrichment.bolt.CacheKey;
+import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
+import org.apache.metron.enrichment.utils.EnrichmentUtils;
+import org.json.simple.JSONObject;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BinaryOperator;
+import java.util.function.Supplier;
+
+/**
+ * This is an independent component which will accept a message and a set 
of enrichment adapters as well as a config which defines
+ * how those enrichments should be performed and fully enrich the message. 
 The result will be the enriched message
+ * unified together and a list of errors which happened.
+ */
+public class ParallelEnricher {
+
+  private Map enrichmentsByType = new 
HashMap<>();
+  private EnumMap cacheStats = new 
EnumMap<>(EnrichmentStrategies.class);
+
+  /**
+   * The result of an enrichment.
+   */
+  public static class EnrichmentResult {
+private JSONObject result;
+private List> enrichmentErrors;
+
+public EnrichmentResult(JSONObject result, List> enrichmentErrors) {
+  this.result = result;
+  this.enrichmentErrors = enrichmentErrors;
+}
+
+/**
+ * The unified fully enriched result.
+ * @return
+ */
+public JSONObject getResult() {
+  return result;
+}
+
+/**
+ * The errors that happened in the course of enriching.
+ * @return
+ */
+public List> getEnrichmentErrors() {
+  return enrichmentErrors;
+}
+  }
+
+  private ConcurrencyContext concurrencyContext;
+
+  /**
+   * Construct a parallel enricher with a set of enrichment adapters 
associated with their enrichment types.
+   * @param enrichmentsByType
+   */
+  public ParallelEnricher( Map 
enrichmentsByType
+ , ConcurrencyContext concurrencyContext
+ , boolean logStats
+ )
+  {
+this.enrichmentsByType = enrichmentsByType;
+this.concurrencyContext = concurrencyContext;
+if(logStats) {
+  for(EnrichmentStrategies s : EnrichmentStrategies.values()) {
+cacheStats.put(s, null);
+  }
+}
+  }
+
+  /**
+   * Fully enriches a message.  Each enrichment is done in parallel via a 
threadpool.
+   * Each enrichment is fronted with a LRU cache.
+   *
+   * @param message the message to enrich
+   * @param strategy The enrichment strategy to use (e.g. enrichment or 
threat intel)
+   * @param config The sensor enrichment config
+   * @param perfLog The performance logger.  We log the performance for 
this 

[GitHub] metron pull request #940: METRON-1460: Create a complementary non-split-join...

2018-03-06 Thread nickwallen
Github user nickwallen commented on a diff in the pull request:

https://github.com/apache/metron/pull/940#discussion_r172694248
  
--- Diff: 
metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/ParallelEnricher.java
 ---
@@ -0,0 +1,281 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.parallel;
+
+import com.github.benmanes.caffeine.cache.stats.CacheStats;
+import org.apache.metron.common.Constants;
+import 
org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import 
org.apache.metron.common.configuration.enrichment.handler.ConfigHandler;
+import org.apache.metron.common.performance.PerformanceLogger;
+import org.apache.metron.common.utils.MessageUtils;
+import org.apache.metron.enrichment.bolt.CacheKey;
+import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
+import org.apache.metron.enrichment.utils.EnrichmentUtils;
+import org.json.simple.JSONObject;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BinaryOperator;
+import java.util.function.Supplier;
+
+/**
+ * This is an independent component which will accept a message and a set 
of enrichment adapters as well as a config which defines
+ * how those enrichments should be performed and fully enrich the message. 
 The result will be the enriched message
+ * unified together and a list of errors which happened.
+ */
+public class ParallelEnricher {
+
+  private Map enrichmentsByType = new 
HashMap<>();
+  private EnumMap cacheStats = new 
EnumMap<>(EnrichmentStrategies.class);
+
+  /**
+   * The result of an enrichment.
+   */
+  public static class EnrichmentResult {
+private JSONObject result;
+private List> enrichmentErrors;
+
+public EnrichmentResult(JSONObject result, List> enrichmentErrors) {
+  this.result = result;
+  this.enrichmentErrors = enrichmentErrors;
+}
+
+/**
+ * The unified fully enriched result.
+ * @return
+ */
+public JSONObject getResult() {
+  return result;
+}
+
+/**
+ * The errors that happened in the course of enriching.
+ * @return
+ */
+public List> getEnrichmentErrors() {
+  return enrichmentErrors;
+}
+  }
+
+  private ConcurrencyContext concurrencyContext;
+
+  /**
+   * Construct a parallel enricher with a set of enrichment adapters 
associated with their enrichment types.
+   * @param enrichmentsByType
+   */
+  public ParallelEnricher( Map 
enrichmentsByType
+ , ConcurrencyContext concurrencyContext
+ , boolean logStats
+ )
+  {
+this.enrichmentsByType = enrichmentsByType;
+this.concurrencyContext = concurrencyContext;
+if(logStats) {
+  for(EnrichmentStrategies s : EnrichmentStrategies.values()) {
+cacheStats.put(s, null);
+  }
+}
+  }
+
+  /**
+   * Fully enriches a message.  Each enrichment is done in parallel via a 
threadpool.
+   * Each enrichment is fronted with a LRU cache.
+   *
+   * @param message the message to enrich
+   * @param strategy The enrichment strategy to use (e.g. enrichment or 
threat intel)
+   * @param config The sensor enrichment config
+   * @param perfLog The performance logger.  We log the performance for 

[GitHub] metron pull request #940: METRON-1460: Create a complementary non-split-join...

2018-03-06 Thread cestella
Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/940#discussion_r172656809
  
--- Diff: 
metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/ParallelEnricher.java
 ---
@@ -0,0 +1,281 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.parallel;
+
+import com.github.benmanes.caffeine.cache.stats.CacheStats;
+import org.apache.metron.common.Constants;
+import 
org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import 
org.apache.metron.common.configuration.enrichment.handler.ConfigHandler;
+import org.apache.metron.common.performance.PerformanceLogger;
+import org.apache.metron.common.utils.MessageUtils;
+import org.apache.metron.enrichment.bolt.CacheKey;
+import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
+import org.apache.metron.enrichment.utils.EnrichmentUtils;
+import org.json.simple.JSONObject;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BinaryOperator;
+import java.util.function.Supplier;
+
+/**
+ * This is an independent component which will accept a message and a set 
of enrichment adapters as well as a config which defines
+ * how those enrichments should be performed and fully enrich the message. 
 The result will be the enriched message
+ * unified together and a list of errors which happened.
+ */
+public class ParallelEnricher {
+
+  private Map enrichmentsByType = new 
HashMap<>();
+  private EnumMap cacheStats = new 
EnumMap<>(EnrichmentStrategies.class);
+
+  /**
+   * The result of an enrichment.
+   */
+  public static class EnrichmentResult {
+private JSONObject result;
+private List> enrichmentErrors;
+
+public EnrichmentResult(JSONObject result, List> enrichmentErrors) {
+  this.result = result;
+  this.enrichmentErrors = enrichmentErrors;
+}
+
+/**
+ * The unified fully enriched result.
+ * @return
+ */
+public JSONObject getResult() {
+  return result;
+}
+
+/**
+ * The errors that happened in the course of enriching.
+ * @return
+ */
+public List> getEnrichmentErrors() {
+  return enrichmentErrors;
+}
+  }
+
+  private ConcurrencyContext concurrencyContext;
+
+  /**
+   * Construct a parallel enricher with a set of enrichment adapters 
associated with their enrichment types.
+   * @param enrichmentsByType
+   */
+  public ParallelEnricher( Map 
enrichmentsByType
+ , ConcurrencyContext concurrencyContext
+ , boolean logStats
+ )
+  {
+this.enrichmentsByType = enrichmentsByType;
+this.concurrencyContext = concurrencyContext;
+if(logStats) {
+  for(EnrichmentStrategies s : EnrichmentStrategies.values()) {
+cacheStats.put(s, null);
+  }
+}
+  }
+
+  /**
+   * Fully enriches a message.  Each enrichment is done in parallel via a 
threadpool.
+   * Each enrichment is fronted with a LRU cache.
+   *
+   * @param message the message to enrich
+   * @param strategy The enrichment strategy to use (e.g. enrichment or 
threat intel)
+   * @param config The sensor enrichment config
+   * @param perfLog The performance logger.  We log the performance for 
this 

[GitHub] metron pull request #940: METRON-1460: Create a complementary non-split-join...

2018-03-06 Thread nickwallen
Github user nickwallen commented on a diff in the pull request:

https://github.com/apache/metron/pull/940#discussion_r172595029
  
--- Diff: 
metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/ParallelEnricher.java
 ---
@@ -0,0 +1,281 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.parallel;
+
+import com.github.benmanes.caffeine.cache.stats.CacheStats;
+import org.apache.metron.common.Constants;
+import 
org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import 
org.apache.metron.common.configuration.enrichment.handler.ConfigHandler;
+import org.apache.metron.common.performance.PerformanceLogger;
+import org.apache.metron.common.utils.MessageUtils;
+import org.apache.metron.enrichment.bolt.CacheKey;
+import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
+import org.apache.metron.enrichment.utils.EnrichmentUtils;
+import org.json.simple.JSONObject;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BinaryOperator;
+import java.util.function.Supplier;
+
+/**
+ * This is an independent component which will accept a message and a set 
of enrichment adapters as well as a config which defines
+ * how those enrichments should be performed and fully enrich the message. 
 The result will be the enriched message
+ * unified together and a list of errors which happened.
+ */
+public class ParallelEnricher {
+
+  private Map enrichmentsByType = new 
HashMap<>();
+  private EnumMap cacheStats = new 
EnumMap<>(EnrichmentStrategies.class);
+
+  /**
+   * The result of an enrichment.
+   */
+  public static class EnrichmentResult {
+private JSONObject result;
+private List> enrichmentErrors;
+
+public EnrichmentResult(JSONObject result, List> enrichmentErrors) {
+  this.result = result;
+  this.enrichmentErrors = enrichmentErrors;
+}
+
+/**
+ * The unified fully enriched result.
+ * @return
+ */
+public JSONObject getResult() {
+  return result;
+}
+
+/**
+ * The errors that happened in the course of enriching.
+ * @return
+ */
+public List> getEnrichmentErrors() {
+  return enrichmentErrors;
+}
+  }
+
+  private ConcurrencyContext concurrencyContext;
+
+  /**
+   * Construct a parallel enricher with a set of enrichment adapters 
associated with their enrichment types.
+   * @param enrichmentsByType
+   */
+  public ParallelEnricher( Map 
enrichmentsByType
+ , ConcurrencyContext concurrencyContext
+ , boolean logStats
+ )
+  {
+this.enrichmentsByType = enrichmentsByType;
+this.concurrencyContext = concurrencyContext;
+if(logStats) {
+  for(EnrichmentStrategies s : EnrichmentStrategies.values()) {
+cacheStats.put(s, null);
+  }
+}
+  }
+
+  /**
+   * Fully enriches a message.  Each enrichment is done in parallel via a 
threadpool.
+   * Each enrichment is fronted with a LRU cache.
+   *
+   * @param message the message to enrich
+   * @param strategy The enrichment strategy to use (e.g. enrichment or 
threat intel)
+   * @param config The sensor enrichment config
+   * @param perfLog The performance logger.  We log the performance for 

[GitHub] metron pull request #940: METRON-1460: Create a complementary non-split-join...

2018-03-05 Thread cestella
Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/940#discussion_r172383791
  
--- Diff: 
metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/Strategy.java
 ---
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.parallel;
+
+import org.apache.metron.common.Constants;
+import 
org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import 
org.apache.metron.common.configuration.enrichment.handler.ConfigHandler;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+
+import java.util.Map;
+
+/**
+ * Enrichment strategy.  This interface provides a mechanism to interface 
with the enrichment config and any
+ * post processing steps that are needed to be done after-the-fact.
+ *
+ * The reasoning behind this is that the key difference between 
enrichments and threat intel is that they pull
+ * their configurations from different parts of the SensorEnrichmentConfig 
object and as a post-join step, they differ
+ * slightly.
+ *
+ */
+public interface Strategy {
+  Constants.ErrorType getErrorType();
--- End diff --

done


---


[GitHub] metron pull request #940: METRON-1460: Create a complementary non-split-join...

2018-03-05 Thread cestella
Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/940#discussion_r172383810
  
--- Diff: 
metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/UnifiedEnrichmentBolt.java
 ---
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.bolt;
+
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.bolt.ConfiguredEnrichmentBolt;
+import org.apache.metron.common.configuration.ConfigurationType;
+import 
org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import org.apache.metron.common.error.MetronError;
+import org.apache.metron.common.performance.PerformanceLogger;
+import org.apache.metron.common.utils.ErrorUtils;
+import org.apache.metron.common.utils.MessageUtils;
+import org.apache.metron.enrichment.adapters.geo.GeoLiteDatabase;
+import org.apache.metron.enrichment.configuration.Enrichment;
+import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
+import org.apache.metron.enrichment.parallel.EnrichmentContext;
+import org.apache.metron.enrichment.parallel.EnrichmentStrategies;
+import org.apache.metron.enrichment.parallel.ParallelEnricher;
+import org.apache.metron.enrichment.parallel.WorkerPoolStrategy;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.StellarFunction;
+import org.apache.metron.stellar.dsl.StellarFunctions;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This bolt is a unified enrichment/threat intel bolt.  In contrast to 
the split/enrich/join
+ * bolts above, this handles the entire enrichment lifecycle in one bolt 
using a threadpool to
+ * enrich in parallel.
+ *
+ * From an architectural perspective, this is a divergence from the 
polymorphism based strategy we have
+ * used in the split/join bolts.  Rather, this bolt is provided a strategy 
to use, either enrichment or threat intel,
+ * through composition.  This allows us to move most of the implementation 
into components independent
+ * from Storm.  This will greater facilitate reuse.
+ */
+public class UnifiedEnrichmentBolt extends ConfiguredEnrichmentBolt {
+
+  public static class Perf {} // used for performance logging
+  private PerformanceLogger perfLog; // not static bc multiple bolts may 
exist in same worker
+
+  protected static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  public static final String STELLAR_CONTEXT_CONF = "stellarContext";
+
+  /**
+   * The number of threads in the threadpool.  One threadpool is created 
per process.
+   * This is a topology-level configuration
+   */
+  public static final String THREADPOOL_NUM_THREADS_TOPOLOGY_CONF = 
"metron.threadpool.size";
+  /**
+   * The type of threadpool to create. This is a topology-level 
configuration.
+   */
+  public static final String THREADPOOL_TYPE_TOPOLOGY_CONF = 
"metron.threadpool.type";
+
+  /**
+   * The enricher implementation to use.  This will do the parallel 
enrichment via a thread pool.
+   */
+  protected ParallelEnricher enricher;
+
+  /**
+   * The strategy to use for this enrichment bolt.  Practically speaking 
this is either
+   * enrichment or threat 

[GitHub] metron pull request #940: METRON-1460: Create a complementary non-split-join...

2018-03-05 Thread cestella
Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/940#discussion_r172383754
  
--- Diff: 
metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/EnrichmentStrategies.java
 ---
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.parallel;
+
+import org.apache.metron.common.Constants;
+import 
org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import 
org.apache.metron.common.configuration.enrichment.handler.ConfigHandler;
+import org.apache.metron.enrichment.bolt.CacheKey;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+public enum EnrichmentStrategies implements Strategy {
--- End diff --

I decided that this is too onerous of an abstraction and rethought it a 
bit.  Give it another look, please.


---


[GitHub] metron pull request #940: METRON-1460: Create a complementary non-split-join...

2018-03-05 Thread cestella
Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/940#discussion_r172377136
  
--- Diff: 
metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/EnrichmentStrategies.java
 ---
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.parallel;
+
+import org.apache.metron.common.Constants;
+import 
org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import 
org.apache.metron.common.configuration.enrichment.handler.ConfigHandler;
+import org.apache.metron.enrichment.bolt.CacheKey;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+public enum EnrichmentStrategies implements Strategy {
--- End diff --

I might be answering a question that you're not asking, but this bit of 
awkwardness arises because we have merged the concepts of threat intel and 
enrichment, which differ really only in post-processing.  The approach 
presented here, in contrast to the inheritance-based approach in the bolts, 
allows for an abstraction through composition whereby we localize all the 
interactions with the sensor enrichment config in a strategy rather than bind 
the abstraction to Storm, our distributed processing engine.  That is the 
rationale behind this approach at least.


---


[GitHub] metron pull request #940: METRON-1460: Create a complementary non-split-join...

2018-03-05 Thread cestella
Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/940#discussion_r172373203
  
--- Diff: 
metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/UnifiedEnrichmentBolt.java
 ---
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.bolt;
+
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.bolt.ConfiguredEnrichmentBolt;
+import org.apache.metron.common.configuration.ConfigurationType;
+import 
org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import org.apache.metron.common.error.MetronError;
+import org.apache.metron.common.performance.PerformanceLogger;
+import org.apache.metron.common.utils.ErrorUtils;
+import org.apache.metron.common.utils.MessageUtils;
+import org.apache.metron.enrichment.adapters.geo.GeoLiteDatabase;
+import org.apache.metron.enrichment.configuration.Enrichment;
+import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
+import org.apache.metron.enrichment.parallel.EnrichmentContext;
+import org.apache.metron.enrichment.parallel.EnrichmentStrategies;
+import org.apache.metron.enrichment.parallel.ParallelEnricher;
+import org.apache.metron.enrichment.parallel.WorkerPoolStrategy;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.StellarFunction;
+import org.apache.metron.stellar.dsl.StellarFunctions;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This bolt is a unified enrichment/threat intel bolt.  In contrast to 
the split/enrich/join
+ * bolts above, this handles the entire enrichment lifecycle in one bolt 
using a threadpool to
+ * enrich in parallel.
+ *
+ * From an architectural perspective, this is a divergence from the 
polymorphism based strategy we have
+ * used in the split/join bolts.  Rather, this bolt is provided a strategy 
to use, either enrichment or threat intel,
+ * through composition.  This allows us to move most of the implementation 
into components independent
+ * from Storm.  This will greater facilitate reuse.
+ */
+public class UnifiedEnrichmentBolt extends ConfiguredEnrichmentBolt {
+
+  public static class Perf {} // used for performance logging
+  private PerformanceLogger perfLog; // not static bc multiple bolts may 
exist in same worker
+
+  protected static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  public static final String STELLAR_CONTEXT_CONF = "stellarContext";
+
+  /**
+   * The number of threads in the threadpool.  One threadpool is created 
per process.
+   * This is a topology-level configuration
+   */
+  public static final String THREADPOOL_NUM_THREADS_TOPOLOGY_CONF = 
"metron.threadpool.size";
+  /**
+   * The type of threadpool to create. This is a topology-level 
configuration.
+   */
+  public static final String THREADPOOL_TYPE_TOPOLOGY_CONF = 
"metron.threadpool.type";
+
+  /**
+   * The enricher implementation to use.  This will do the parallel 
enrichment via a thread pool.
+   */
+  protected ParallelEnricher enricher;
+
+  /**
+   * The strategy to use for this enrichment bolt.  Practically speaking 
this is either
+   * enrichment or threat 

[GitHub] metron pull request #940: METRON-1460: Create a complementary non-split-join...

2018-03-05 Thread cestella
Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/940#discussion_r172369461
  
--- Diff: 
metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/EnrichmentStrategies.java
 ---
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.parallel;
+
+import org.apache.metron.common.Constants;
+import 
org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import 
org.apache.metron.common.configuration.enrichment.handler.ConfigHandler;
+import org.apache.metron.enrichment.bolt.CacheKey;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+public enum EnrichmentStrategies implements Strategy {
--- End diff --

This is a strategy pattern using an enum.  The purpose of this class is to 
resolve the specific strategies possible.  It's broadly in line with other 
strategy patterns (e.g. Extractors). 


---


[GitHub] metron pull request #940: METRON-1460: Create a complementary non-split-join...

2018-03-05 Thread cestella
Github user cestella commented on a diff in the pull request:

https://github.com/apache/metron/pull/940#discussion_r172369480
  
--- Diff: 
metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/Strategy.java
 ---
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.parallel;
+
+import org.apache.metron.common.Constants;
+import 
org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import 
org.apache.metron.common.configuration.enrichment.handler.ConfigHandler;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+
+import java.util.Map;
+
+/**
+ * Enrichment strategy.  This interface provides a mechanism to interface 
with the enrichment config and any
+ * post processing steps that are needed to be done after-the-fact.
+ *
+ * The reasoning behind this is that the key difference between 
enrichments and threat intel is that they pull
+ * their configurations from different parts of the SensorEnrichmentConfig 
object and as a post-join step, they differ
+ * slightly.
+ *
+ */
+public interface Strategy {
+  Constants.ErrorType getErrorType();
--- End diff --

Sure thing.


---


[GitHub] metron pull request #940: METRON-1460: Create a complementary non-split-join...

2018-03-05 Thread nickwallen
Github user nickwallen commented on a diff in the pull request:

https://github.com/apache/metron/pull/940#discussion_r172359339
  
--- Diff: 
metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/Strategy.java
 ---
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.parallel;
+
+import org.apache.metron.common.Constants;
+import 
org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import 
org.apache.metron.common.configuration.enrichment.handler.ConfigHandler;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+
+import java.util.Map;
+
+/**
+ * Enrichment strategy.  This interface provides a mechanism to interface 
with the enrichment config and any
+ * post processing steps that are needed to be done after-the-fact.
+ *
+ * The reasoning behind this is that the key difference between 
enrichments and threat intel is that they pull
+ * their configurations from different parts of the SensorEnrichmentConfig 
object and as a post-join step, they differ
+ * slightly.
+ *
+ */
+public interface Strategy {
+  Constants.ErrorType getErrorType();
--- End diff --

Can we javadoc each method?  This seems like an important interface.


---


[GitHub] metron pull request #940: METRON-1460: Create a complementary non-split-join...

2018-03-05 Thread nickwallen
Github user nickwallen commented on a diff in the pull request:

https://github.com/apache/metron/pull/940#discussion_r172353404
  
--- Diff: 
metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/UnifiedEnrichmentBolt.java
 ---
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.bolt;
+
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.bolt.ConfiguredEnrichmentBolt;
+import org.apache.metron.common.configuration.ConfigurationType;
+import 
org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import org.apache.metron.common.error.MetronError;
+import org.apache.metron.common.performance.PerformanceLogger;
+import org.apache.metron.common.utils.ErrorUtils;
+import org.apache.metron.common.utils.MessageUtils;
+import org.apache.metron.enrichment.adapters.geo.GeoLiteDatabase;
+import org.apache.metron.enrichment.configuration.Enrichment;
+import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
+import org.apache.metron.enrichment.parallel.EnrichmentContext;
+import org.apache.metron.enrichment.parallel.EnrichmentStrategies;
+import org.apache.metron.enrichment.parallel.ParallelEnricher;
+import org.apache.metron.enrichment.parallel.WorkerPoolStrategy;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.StellarFunction;
+import org.apache.metron.stellar.dsl.StellarFunctions;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This bolt is a unified enrichment/threat intel bolt.  In contrast to 
the split/enrich/join
+ * bolts above, this handles the entire enrichment lifecycle in one bolt 
using a threadpool to
+ * enrich in parallel.
+ *
+ * From an architectural perspective, this is a divergence from the 
polymorphism based strategy we have
+ * used in the split/join bolts.  Rather, this bolt is provided a strategy 
to use, either enrichment or threat intel,
+ * through composition.  This allows us to move most of the implementation 
into components independent
+ * from Storm.  This will greater facilitate reuse.
+ */
+public class UnifiedEnrichmentBolt extends ConfiguredEnrichmentBolt {
+
+  public static class Perf {} // used for performance logging
+  private PerformanceLogger perfLog; // not static bc multiple bolts may 
exist in same worker
+
+  protected static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  public static final String STELLAR_CONTEXT_CONF = "stellarContext";
+
+  /**
+   * The number of threads in the threadpool.  One threadpool is created 
per process.
+   * This is a topology-level configuration
+   */
+  public static final String THREADPOOL_NUM_THREADS_TOPOLOGY_CONF = 
"metron.threadpool.size";
+  /**
+   * The type of threadpool to create. This is a topology-level 
configuration.
+   */
+  public static final String THREADPOOL_TYPE_TOPOLOGY_CONF = 
"metron.threadpool.type";
+
+  /**
+   * The enricher implementation to use.  This will do the parallel 
enrichment via a thread pool.
+   */
+  protected ParallelEnricher enricher;
+
+  /**
+   * The strategy to use for this enrichment bolt.  Practically speaking 
this is either
+   * enrichment or threat 

[GitHub] metron pull request #940: METRON-1460: Create a complementary non-split-join...

2018-03-05 Thread nickwallen
Github user nickwallen commented on a diff in the pull request:

https://github.com/apache/metron/pull/940#discussion_r172363362
  
--- Diff: 
metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/EnrichmentStrategies.java
 ---
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.parallel;
+
+import org.apache.metron.common.Constants;
+import 
org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import 
org.apache.metron.common.configuration.enrichment.handler.ConfigHandler;
+import org.apache.metron.enrichment.bolt.CacheKey;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+public enum EnrichmentStrategies implements Strategy {
--- End diff --

I don't understand the purpose of this class.  Why have an 
`EnrichmentStrategy`, a `ThreatIntelStrategy`, and `EnrichmentStrategies`?


---