http://git-wip-us.apache.org/repos/asf/metron/blob/1d95b831/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/WorkerPoolStrategies.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/WorkerPoolStrategies.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/WorkerPoolStrategies.java new file mode 100644 index 0000000..5f82b1c --- /dev/null +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/WorkerPoolStrategies.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.enrichment.parallel; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.Function; + +/** + * The strategy to use to construct the thread pool. + */ +public enum WorkerPoolStrategies { + /** + * Fixed thread pool + */ + FIXED(numThreads -> Executors.newFixedThreadPool(numThreads)), + /** + * Work stealing thread pool. + */ + WORK_STEALING(numThreads -> Executors.newWorkStealingPool(numThreads)) + ; + Function<Integer, ExecutorService> creator; + WorkerPoolStrategies(Function<Integer, ExecutorService> creator) { + this.creator = creator; + } + + public ExecutorService create(int numThreads) { + return creator.apply(numThreads); + } +}
http://git-wip-us.apache.org/repos/asf/metron/blob/1d95b831/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java index ab3d462..63d39c5 100644 --- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java @@ -21,6 +21,7 @@ import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Splitter; import com.google.common.collect.Iterables; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.metron.common.configuration.enrichment.EnrichmentConfig; import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig; @@ -28,6 +29,7 @@ import org.apache.metron.enrichment.lookup.EnrichmentLookup; import org.apache.metron.enrichment.lookup.handler.KeyWithContext; import org.apache.metron.hbase.TableProvider; import org.apache.metron.enrichment.converter.EnrichmentKey; +import org.json.simple.JSONObject; import sun.management.Sensor; import javax.annotation.Nullable; @@ -118,4 +120,18 @@ public class EnrichmentUtils { } } + public static JSONObject adjustKeys(JSONObject enrichedMessage, JSONObject enrichedField, String field, String prefix) { + if ( !enrichedField.isEmpty()) { + for (Object enrichedKey : enrichedField.keySet()) { + if(!StringUtils.isEmpty(prefix)) { + enrichedMessage.put(field + "." + enrichedKey, enrichedField.get(enrichedKey)); + } + else { + enrichedMessage.put(enrichedKey, enrichedField.get(enrichedKey)); + } + } + } + return enrichedMessage; + } + } http://git-wip-us.apache.org/repos/asf/metron/blob/1d95b831/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/ThreatIntelUtils.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/ThreatIntelUtils.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/ThreatIntelUtils.java index 7898ccd..870d709 100644 --- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/ThreatIntelUtils.java +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/ThreatIntelUtils.java @@ -18,15 +18,142 @@ package org.apache.metron.enrichment.utils; import com.google.common.base.Joiner; +import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig; +import org.apache.metron.common.configuration.enrichment.threatintel.RuleScore; +import org.apache.metron.common.configuration.enrichment.threatintel.ThreatScore; +import org.apache.metron.common.configuration.enrichment.threatintel.ThreatTriageConfig; +import org.apache.metron.common.utils.MessageUtils; +import org.apache.metron.stellar.common.utils.ConversionUtils; +import org.apache.metron.stellar.dsl.Context; +import org.apache.metron.stellar.dsl.functions.resolver.FunctionResolver; +import org.apache.metron.threatintel.triage.ThreatTriageProcessor; +import org.json.simple.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.invoke.MethodHandles; public class ThreatIntelUtils { + public static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); public static final String KEY_PREFIX = "threatintels"; + /** + * The message key under which the overall threat triage score is stored. + */ + public static final String THREAT_TRIAGE_SCORE_KEY = "threat.triage.score"; + + /** + * The prefix of the message keys that record the threat triage rules that fired. + */ + public static final String THREAT_TRIAGE_RULES_KEY = "threat.triage.rules"; + + /** + * The portion of the message key used to record the 'name' field of a rule. + */ + public static final String THREAT_TRIAGE_RULE_NAME = "name"; + + /** + * The portion of the message key used to record the 'comment' field of a rule. + */ + public static final String THREAT_TRIAGE_RULE_COMMENT = "comment"; + + /** + * The portion of the message key used to record the 'score' field of a rule. + */ + public static final String THREAT_TRIAGE_RULE_SCORE = "score"; + + /** + * The portion of the message key used to record the 'reason' field of a rule. + */ + public static final String THREAT_TRIAGE_RULE_REASON = "reason"; + public static String getThreatIntelKey(String threatIntelName, String field) { return Joiner.on(".").join(new String[]{KEY_PREFIX, threatIntelName, field}); } +public static JSONObject triage(JSONObject ret, SensorEnrichmentConfig config, FunctionResolver functionResolver, Context stellarContext) { + LOG.trace("Received joined messages: {}", ret); + boolean isAlert = ret.containsKey("is_alert"); + if(!isAlert) { + for (Object key : ret.keySet()) { + if (key.toString().startsWith("threatintels") && !key.toString().endsWith(".ts")) { + isAlert = true; + break; + } + } + } + else { + Object isAlertObj = ret.get("is_alert"); + isAlert = ConversionUtils.convert(isAlertObj, Boolean.class); + if(!isAlert) { + ret.remove("is_alert"); + } + } + if(isAlert) { + ret.put("is_alert" , "true"); + String sourceType = MessageUtils.getSensorType(ret); + ThreatTriageConfig triageConfig = null; + if(config != null) { + triageConfig = config.getThreatIntel().getTriageConfig(); + if(LOG.isDebugEnabled()) { + LOG.debug("{}: Found sensor enrichment config.", sourceType); + } + } + else { + LOG.debug("{}: Unable to find threat config.", sourceType ); + } + if(triageConfig != null) { + if(LOG.isDebugEnabled()) { + LOG.debug("{}: Found threat triage config: {}", sourceType, triageConfig); + } + + if(LOG.isDebugEnabled() && (triageConfig.getRiskLevelRules() == null || triageConfig.getRiskLevelRules().isEmpty())) { + LOG.debug("{}: Empty rules!", sourceType); + } + + // triage the threat + ThreatTriageProcessor threatTriageProcessor = new ThreatTriageProcessor(config, functionResolver, stellarContext); + ThreatScore score = threatTriageProcessor.apply(ret); + + if(LOG.isDebugEnabled()) { + String rules = Joiner.on('\n').join(triageConfig.getRiskLevelRules()); + LOG.debug("Marked {} as triage level {} with rules {}", sourceType, score.getScore(), + rules); + } + + // attach the triage threat score to the message + if(score.getRuleScores().size() > 0) { + appendThreatScore(score, ret); + } + } + else { + LOG.debug("{}: Unable to find threat triage config!", sourceType); + } + } + + return ret; + } + + /** + * Appends the threat score to the telemetry message. + * @param threatScore The threat triage score + * @param message The telemetry message being triaged. + */ + private static void appendThreatScore(ThreatScore threatScore, JSONObject message) { + // append the overall threat score + message.put(THREAT_TRIAGE_SCORE_KEY, threatScore.getScore()); + + // append each of the rules - each rule is 'flat' + Joiner joiner = Joiner.on("."); + int i = 0; + for(RuleScore score: threatScore.getRuleScores()) { + message.put(joiner.join(THREAT_TRIAGE_RULES_KEY, i, THREAT_TRIAGE_RULE_NAME), score.getRule().getName()); + message.put(joiner.join(THREAT_TRIAGE_RULES_KEY, i, THREAT_TRIAGE_RULE_COMMENT), score.getRule().getComment()); + message.put(joiner.join(THREAT_TRIAGE_RULES_KEY, i, THREAT_TRIAGE_RULE_SCORE), score.getRule().getScore()); + message.put(joiner.join(THREAT_TRIAGE_RULES_KEY, i++, THREAT_TRIAGE_RULE_REASON), score.getReason()); + } + } } http://git-wip-us.apache.org/repos/asf/metron/blob/1d95b831/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java index 828f4e3..267ca62 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java @@ -17,13 +17,6 @@ */ package org.apache.metron.enrichment.integration; -import static org.apache.metron.enrichment.bolt.ThreatIntelJoinBolt.THREAT_TRIAGE_RULES_KEY; -import static org.apache.metron.enrichment.bolt.ThreatIntelJoinBolt.THREAT_TRIAGE_RULE_COMMENT; -import static org.apache.metron.enrichment.bolt.ThreatIntelJoinBolt.THREAT_TRIAGE_RULE_NAME; -import static org.apache.metron.enrichment.bolt.ThreatIntelJoinBolt.THREAT_TRIAGE_RULE_REASON; -import static org.apache.metron.enrichment.bolt.ThreatIntelJoinBolt.THREAT_TRIAGE_RULE_SCORE; -import static org.apache.metron.enrichment.bolt.ThreatIntelJoinBolt.THREAT_TRIAGE_SCORE_KEY; - import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Predicate; @@ -54,6 +47,7 @@ import org.apache.metron.enrichment.integration.components.ConfigUploadComponent import org.apache.metron.enrichment.lookup.LookupKV; import org.apache.metron.enrichment.lookup.accesstracker.PersistentBloomTrackerCreator; import org.apache.metron.enrichment.stellar.SimpleHBaseEnrichmentFunctions; +import org.apache.metron.enrichment.utils.ThreatIntelUtils; import org.apache.metron.hbase.mock.MockHBaseTableProvider; import org.apache.metron.hbase.mock.MockHTable; import org.apache.metron.integration.BaseIntegrationTest; @@ -89,13 +83,15 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest { public static final String DEFAULT_DMACODE= "test dmaCode"; public static final String DEFAULT_LOCATION_POINT= Joiner.on(',').join(DEFAULT_LATITUDE,DEFAULT_LONGITUDE); - protected String fluxPath = "../metron-enrichment/src/main/flux/enrichment/remote.yaml"; protected String templatePath = "../metron-enrichment/src/main/config/enrichment.properties.j2"; protected String sampleParsedPath = TestConstants.SAMPLE_DATA_PARSED_PATH + "TestExampleParsed"; private final List<byte[]> inputMessages = getInputMessages(sampleParsedPath); private static File geoHdfsFile; + protected String fluxPath() { + return "../metron-enrichment/src/main/flux/enrichment/remote.yaml"; + } private static List<byte[]> getInputMessages(String path){ try{ @@ -190,7 +186,7 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest { }}); FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder() - .withTopologyLocation(new File(fluxPath)) + .withTopologyLocation(new File(fluxPath())) .withTopologyName("test") .withTemplateLocation(new File(templatePath)) .withTopologyProperties(topologyProperties) @@ -247,8 +243,8 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest { protected void validateErrors(List<Map<String, Object>> errors) { for(Map<String, Object> error : errors) { - Assert.assertEquals("java.lang.ArithmeticException: / by zero", error.get(Constants.ErrorFields.MESSAGE.getName())); - Assert.assertEquals("com.google.common.util.concurrent.UncheckedExecutionException: java.lang.ArithmeticException: / by zero", error.get(Constants.ErrorFields.EXCEPTION.getName())); + Assert.assertTrue(error.get(Constants.ErrorFields.MESSAGE.getName()).toString(), error.get(Constants.ErrorFields.MESSAGE.getName()).toString().contains("/ by zero") ); + Assert.assertTrue(error.get(Constants.ErrorFields.EXCEPTION.getName()).toString().contains("/ by zero")); Assert.assertEquals(Constants.ErrorType.ENRICHMENT_ERROR.getType(), error.get(Constants.ErrorFields.ERROR_TYPE.getName())); Assert.assertEquals("{\"error_test\":{},\"source.type\":\"test\"}", error.get(Constants.ErrorFields.RAW_MESSAGE.getName())); } @@ -399,17 +395,17 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest { Assert.assertEquals(indexedDoc.getOrDefault("is_alert",""), "true"); // validate threat triage score - Assert.assertTrue(indexedDoc.containsKey(THREAT_TRIAGE_SCORE_KEY)); - Double score = (Double) indexedDoc.get(THREAT_TRIAGE_SCORE_KEY); + Assert.assertTrue(indexedDoc.containsKey(ThreatIntelUtils.THREAT_TRIAGE_SCORE_KEY)); + Double score = (Double) indexedDoc.get(ThreatIntelUtils.THREAT_TRIAGE_SCORE_KEY); Assert.assertEquals(score, 10d, 1e-7); // validate threat triage rules Joiner joiner = Joiner.on("."); Stream.of( - joiner.join(THREAT_TRIAGE_RULES_KEY, 0, THREAT_TRIAGE_RULE_NAME), - joiner.join(THREAT_TRIAGE_RULES_KEY, 0, THREAT_TRIAGE_RULE_COMMENT), - joiner.join(THREAT_TRIAGE_RULES_KEY, 0, THREAT_TRIAGE_RULE_REASON), - joiner.join(THREAT_TRIAGE_RULES_KEY, 0, THREAT_TRIAGE_RULE_SCORE)) + joiner.join(ThreatIntelUtils.THREAT_TRIAGE_RULES_KEY, 0, ThreatIntelUtils.THREAT_TRIAGE_RULE_NAME), + joiner.join(ThreatIntelUtils.THREAT_TRIAGE_RULES_KEY, 0, ThreatIntelUtils.THREAT_TRIAGE_RULE_COMMENT), + joiner.join(ThreatIntelUtils.THREAT_TRIAGE_RULES_KEY, 0, ThreatIntelUtils.THREAT_TRIAGE_RULE_REASON), + joiner.join(ThreatIntelUtils.THREAT_TRIAGE_RULES_KEY, 0, ThreatIntelUtils.THREAT_TRIAGE_RULE_SCORE)) .forEach(key -> Assert.assertTrue(String.format("Missing expected key: '%s'", key), indexedDoc.containsKey(key))); } @@ -471,11 +467,11 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest { enriched = true; } if (ips.contains(indexedDoc.get(DST_IP))) { - Assert.assertTrue(Predicates.and(HostEnrichments.LOCAL_LOCATION + boolean isEnriched = Predicates.and(HostEnrichments.LOCAL_LOCATION ,HostEnrichments.IMPORTANT ,HostEnrichments.PRINTER_TYPE - ).apply(new EvaluationPayload(indexedDoc, DST_IP)) - ); + ).apply(new EvaluationPayload(indexedDoc, DST_IP)); + Assert.assertTrue(isEnriched); enriched = true; } } @@ -492,11 +488,11 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest { enriched = true; } if (ips.contains(indexedDoc.get(DST_IP))) { - Assert.assertTrue(Predicates.and(HostEnrichments.LOCAL_LOCATION + boolean isEnriched = Predicates.and(HostEnrichments.LOCAL_LOCATION ,HostEnrichments.IMPORTANT ,HostEnrichments.WEBSERVER_TYPE - ).apply(new EvaluationPayload(indexedDoc, DST_IP)) - ); + ).apply(new EvaluationPayload(indexedDoc, DST_IP)); + Assert.assertTrue(isEnriched); enriched = true; } } http://git-wip-us.apache.org/repos/asf/metron/blob/1d95b831/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/UnifiedEnrichmentIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/UnifiedEnrichmentIntegrationTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/UnifiedEnrichmentIntegrationTest.java new file mode 100644 index 0000000..1f06733 --- /dev/null +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/UnifiedEnrichmentIntegrationTest.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.enrichment.integration; + +public class UnifiedEnrichmentIntegrationTest extends EnrichmentIntegrationTest { + @Override + public String fluxPath() { + return "../metron-enrichment/src/main/flux/enrichment/remote-unified.yaml"; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/1d95b831/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/parallel/ParallelEnricherTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/parallel/ParallelEnricherTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/parallel/ParallelEnricherTest.java new file mode 100644 index 0000000..c3a3109 --- /dev/null +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/parallel/ParallelEnricherTest.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.enrichment.parallel; + +import com.google.common.collect.ImmutableMap; +import org.adrianwalker.multilinestring.Multiline; +import org.apache.metron.common.Constants; +import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig; +import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.enrichment.adapters.stellar.StellarAdapter; +import org.apache.metron.enrichment.bolt.CacheKey; +import org.apache.metron.stellar.dsl.Context; +import org.apache.metron.stellar.dsl.StellarFunctions; +import org.json.simple.JSONObject; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.HashMap; +import java.util.concurrent.atomic.AtomicInteger; + +public class ParallelEnricherTest { + /** + * { + "enrichment": { + "fieldMap": { + "stellar" : { + "config" : { + "numeric" : { + "map" : "{ 'blah' : 1}" + ,"one" : "MAP_GET('blah', map)" + ,"foo": "1 + 1" + } + ,"ALL_CAPS" : "TO_UPPER(source.type)" + } + } + } + ,"fieldToTypeMap": { } + }, + "threatIntel": { } +} + */ + @Multiline + public static String goodConfig; + + private static ParallelEnricher enricher; + private static Context stellarContext; + private static AtomicInteger numAccesses = new AtomicInteger(0); + @BeforeClass + public static void setup() { + ConcurrencyContext infrastructure = new ConcurrencyContext(); + infrastructure.initialize(5, 100, 10, null, null, false); + stellarContext = new Context.Builder() + .build(); + StellarFunctions.initialize(stellarContext); + StellarAdapter adapter = new StellarAdapter(){ + @Override + public void logAccess(CacheKey value) { + numAccesses.incrementAndGet(); + } + }.ofType("ENRICHMENT"); + adapter.initializeAdapter(new HashMap<>()); + enricher = new ParallelEnricher(ImmutableMap.of("stellar", adapter), infrastructure, false); + } + + @Test + public void testCacheHit() throws Exception { + numAccesses.set(0); + JSONObject message = new JSONObject() {{ + put(Constants.SENSOR_TYPE, "test"); + }}; + for(int i = 0;i < 10;++i) { + SensorEnrichmentConfig config = JSONUtils.INSTANCE.load(goodConfig, SensorEnrichmentConfig.class); + config.getConfiguration().putIfAbsent("stellarContext", stellarContext); + ParallelEnricher.EnrichmentResult result = enricher.apply(message, EnrichmentStrategies.ENRICHMENT, config, null); + } + //we only want 2 actual instances of the adapter.enrich being run due to the cache. + Assert.assertTrue(2 >= numAccesses.get()); + } + + @Test + public void testGoodConfig() throws Exception { + SensorEnrichmentConfig config = JSONUtils.INSTANCE.load(goodConfig, SensorEnrichmentConfig.class); + config.getConfiguration().putIfAbsent("stellarContext", stellarContext); + JSONObject message = new JSONObject() {{ + put(Constants.SENSOR_TYPE, "test"); + }}; + ParallelEnricher.EnrichmentResult result = enricher.apply(message, EnrichmentStrategies.ENRICHMENT, config, null); + JSONObject ret = result.getResult(); + Assert.assertEquals("Got the wrong result count: " + ret, 8, ret.size()); + Assert.assertEquals(1, ret.get("map.blah")); + Assert.assertEquals("test", ret.get("source.type")); + Assert.assertEquals(1, ret.get("one")); + Assert.assertEquals(2, ret.get("foo")); + Assert.assertEquals("TEST", ret.get("ALL_CAPS")); + Assert.assertEquals(0, result.getEnrichmentErrors().size()); + } + + /** + * { + "enrichment": { + "fieldMap": { + "stellar" : { + "config" : { + "numeric" : [ + "map := { 'blah' : 1}" + ,"one := MAP_GET('blah', map)" + ,"foo := 1 + 1" + ] + ,"ALL_CAPS" : "TO_UPPER(source.type)" + ,"errors" : [ + "error := 1/0" + ] + } + } + } + ,"fieldToTypeMap": { } + }, + "threatIntel": { } +} + */ + @Multiline + public static String badConfig; + + @Test + public void testBadConfig() throws Exception { + SensorEnrichmentConfig config = JSONUtils.INSTANCE.load(badConfig, SensorEnrichmentConfig.class); + config.getConfiguration().putIfAbsent("stellarContext", stellarContext); + JSONObject message = new JSONObject() {{ + put(Constants.SENSOR_TYPE, "test"); + }}; + ParallelEnricher.EnrichmentResult result = enricher.apply(message, EnrichmentStrategies.ENRICHMENT, config, null); + JSONObject ret = result.getResult(); + Assert.assertEquals(ret + " is not what I expected", 8, ret.size()); + Assert.assertEquals(1, ret.get("map.blah")); + Assert.assertEquals("test", ret.get("source.type")); + Assert.assertEquals(1, ret.get("one")); + Assert.assertEquals(2, ret.get("foo")); + Assert.assertEquals("TEST", ret.get("ALL_CAPS")); + Assert.assertEquals(1, result.getEnrichmentErrors().size()); + } +}