http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyDefinition.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyDefinition.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyDefinition.java deleted file mode 100644 index 39eb0c9..0000000 --- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyDefinition.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.siddhi; - -import org.apache.eagle.alert.config.AbstractPolicyDefinition; - -/** - * siddhi policy definition has the following format - * { - "type":"SiddhiCEPEngine", - "expression" : "from every b1=HeapUsage[metric == 'eagle.metric.gc'] -> a1=FullGCEvent[eventName == 'full gc'] -> b2=HeapUsage[metric == b1.metric and host == b1.host and value >= b1.value * 0.8] within 100 sec select a1.eventName, b1.metric, b2.timestamp, 60 as timerange insert into GCMonitor; " - } - */ -public class SiddhiPolicyDefinition extends AbstractPolicyDefinition { - private String expression; - - public String getExpression() { - return expression; - } - public void setExpression(String expression) { - this.expression = expression; - } - - @Override - public String toString(){ - return expression; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyEvaluator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyEvaluator.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyEvaluator.java deleted file mode 100644 index 87eaf48..0000000 --- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyEvaluator.java +++ /dev/null @@ -1,269 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.siddhi; - -import java.lang.reflect.Field; -import java.util.*; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; - -import org.apache.eagle.alert.config.AbstractPolicyDefinition; -import org.apache.eagle.alert.entity.AlertStreamSchemaEntity; -import org.apache.eagle.alert.common.AlertConstants; -import org.apache.eagle.alert.policy.PolicyManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.wso2.siddhi.core.ExecutionPlanRuntime; -import org.wso2.siddhi.core.SiddhiManager; -import org.wso2.siddhi.core.query.output.callback.QueryCallback; -import org.wso2.siddhi.core.stream.input.InputHandler; -import org.wso2.siddhi.query.api.execution.query.Query; -import org.wso2.siddhi.query.api.execution.query.selection.OutputAttribute; - -import org.apache.eagle.alert.entity.AlertAPIEntity; -import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity; -import org.apache.eagle.alert.policy.PolicyEvaluator; -import org.apache.eagle.dataproc.core.JsonSerDeserUtils; -import org.apache.eagle.dataproc.core.ValuesArray; -import com.typesafe.config.Config; - -/** - * when policy is updated or deleted, SiddhiManager.shutdown should be invoked to release resources. - * during this time, synchronization is important - */ -public class SiddhiPolicyEvaluator implements PolicyEvaluator{ - private final static Logger LOG = LoggerFactory.getLogger(SiddhiPolicyEvaluator.class); - public static final int DEFAULT_QUEUE_SIZE = 1000; - private final BlockingQueue<AlertAPIEntity> queue = new ArrayBlockingQueue<AlertAPIEntity>(DEFAULT_QUEUE_SIZE); - private volatile SiddhiRuntime siddhiRuntime; - private String[] sourceStreams; - private boolean needValidation; - private String policyId; - private Config config; - private final static String EXECUTION_PLAN_NAME = "query"; - - /** - * everything dependent on policyDef should be together and switched in runtime - */ - public static class SiddhiRuntime{ - QueryCallback callback; - Map<String, InputHandler> siddhiInputHandlers; - SiddhiManager siddhiManager; - SiddhiPolicyDefinition policyDef; - List<String> outputFields; - String executionPlanName; - } - - public SiddhiPolicyEvaluator(Config config, String policyName, AbstractPolicyDefinition policyDef, String[] sourceStreams){ - this(config, policyName, policyDef, sourceStreams, false); - } - - public SiddhiPolicyEvaluator(Config config, String policyId, AbstractPolicyDefinition policyDef, String[] sourceStreams, boolean needValidation){ - this.config = config; - this.policyId = policyId; - this.needValidation = needValidation; - this.sourceStreams = sourceStreams; - init(policyDef); - } - - public void init(AbstractPolicyDefinition policyDef){ - siddhiRuntime = createSiddhiRuntime((SiddhiPolicyDefinition)policyDef); - } - - public static String addContextFieldIfNotExist(String expression) { - // select fieldA, fieldB --> select eagleAlertContext, fieldA, fieldB - int pos = expression.indexOf("select ") + 7; - int index = pos; - boolean isSelectStarPattern = true; - while(index < expression.length()) { - if (expression.charAt(index) == ' ') index++; - else if (expression.charAt(index) == '*') break; - else { - isSelectStarPattern = false; - break; - } - } - if (isSelectStarPattern) return expression; - StringBuilder sb = new StringBuilder(); - sb.append(expression.substring(0, pos)); - sb.append(SiddhiStreamMetadataUtils.EAGLE_ALERT_CONTEXT_FIELD + ","); - sb.append(expression.substring(pos, expression.length())); - return sb.toString(); - } - - private SiddhiRuntime createSiddhiRuntime(SiddhiPolicyDefinition policyDef){ - SiddhiManager siddhiManager = new SiddhiManager(); - Map<String, InputHandler> siddhiInputHandlers = new HashMap<String, InputHandler>(); - - StringBuilder sb = new StringBuilder(); - for(String sourceStream : sourceStreams){ - String streamDef = SiddhiStreamMetadataUtils.convertToStreamDef(sourceStream); - LOG.info("Siddhi stream definition : " + streamDef); - sb.append(streamDef); - } - - String expression = addContextFieldIfNotExist(policyDef.getExpression()); - String executionPlan = sb.toString() + " @info(name = '" + EXECUTION_PLAN_NAME + "') " + expression; - ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan); - - for(String sourceStream : sourceStreams){ - siddhiInputHandlers.put(sourceStream, executionPlanRuntime.getInputHandler(sourceStream)); - } - executionPlanRuntime.start(); - - QueryCallback callback = new SiddhiQueryCallbackImpl(config, this); - - LOG.info("Siddhi query: " + expression); - executionPlanRuntime.addCallback(EXECUTION_PLAN_NAME, callback); - - List<String> outputFields = new ArrayList<String>(); - try { - Field field = QueryCallback.class.getDeclaredField(EXECUTION_PLAN_NAME); - field.setAccessible(true); - Query query = (Query)field.get(callback); - List<OutputAttribute> list = query.getSelector().getSelectionList(); - for (OutputAttribute output : list) { - outputFields.add(output.getRename()); - } - } - catch (Exception ex) { - LOG.error("Got an Exception when initial outputFields ", ex); - } - SiddhiRuntime runtime = new SiddhiRuntime(); - runtime.siddhiInputHandlers = siddhiInputHandlers; - runtime.siddhiManager = siddhiManager; - runtime.callback = callback; - runtime.policyDef = policyDef; - runtime.outputFields = outputFields; - runtime.executionPlanName = executionPlanRuntime.getName(); - return runtime; - } - - /** - * 1. input has 3 fields, first is siddhi context, second is streamName, the last one is map of attribute name/value - * 2. runtime check for input data (This is very expensive, so we ignore for now) - * the size of input map should be equal to size of attributes which stream metadata defines - * the attribute names should be equal to attribute names which stream metadata defines - * the input field cannot be null - */ - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Override - public void evaluate(ValuesArray data) throws Exception { - if(LOG.isDebugEnabled()) LOG.debug("Siddhi policy evaluator consumers data :" + data); - Object siddhiAlertContext = data.get(0); - String streamName = (String)data.get(1); - SortedMap map = (SortedMap)data.get(2); - validateEventInRuntime(streamName, map); - synchronized(siddhiRuntime){ - //insert siddhiAlertContext into the first field - List<Object> input = new ArrayList<>(); - input.add(siddhiAlertContext); - putAttrsIntoInputStream(input, streamName, map); - siddhiRuntime.siddhiInputHandlers.get(streamName).send(input.toArray(new Object[0])); - } - } - - /** - * This is a heavy operation, we should avoid to use. - * - * This validation method will skip invalid fields in event which are not declared in stream schema otherwise it will cause exception for siddhi engine. - * - * @see <a href="https://issues.apache.org/jira/browse/EAGLE-49">https://issues.apache.org/jira/browse/EAGLE-49</a> - * - * @param sourceStream source steam id - * @param data input event - */ - private void validateEventInRuntime(String sourceStream, SortedMap data){ - if(!needValidation) - return; - SortedMap<String, AlertStreamSchemaEntity> map = StreamMetadataManager.getInstance().getMetadataEntityMapForStream(sourceStream); - if(!map.keySet().equals(data.keySet())){ - Set<Object> badKeys = new TreeSet<>(); - for(Object key:data.keySet()) if(!map.containsKey(key)) badKeys.add(key); - LOG.warn(String.format("Ignore invalid fields %s in event: %s from stream: %s, valid fields are: %s", badKeys.toString(),data.toString(), sourceStream,map.keySet().toString())); - for(Object key:badKeys) data.remove(key); - } - } - - private void putAttrsIntoInputStream(List<Object> input, String streamName, SortedMap map) { - if(!needValidation) { - input.addAll(map.values()); - return; - } - for (Object key : map.keySet()) { - Object value = map.get(key); - if (value == null) { - input.add(SiddhiStreamMetadataUtils.getAttrDefaultValue(streamName, (String)key)); - } - else input.add(value); - } - } - - @Override - public void onPolicyUpdate(AlertDefinitionAPIEntity newAlertDef) { - AbstractPolicyDefinition policyDef = null; - try { - policyDef = JsonSerDeserUtils.deserialize(newAlertDef.getPolicyDef(), - AbstractPolicyDefinition.class, PolicyManager.getInstance().getPolicyModules(newAlertDef.getTags().get(AlertConstants.POLICY_TYPE))); - } - catch (Exception ex) { - LOG.error("Initial policy def error, ", ex); - } - SiddhiRuntime previous = siddhiRuntime; - siddhiRuntime = createSiddhiRuntime((SiddhiPolicyDefinition)policyDef); - synchronized(previous){ - previous.siddhiManager.getExecutionPlanRuntime(previous.executionPlanName).shutdown(); - } - } - - @Override - public void onPolicyDelete(){ - synchronized(siddhiRuntime){ - LOG.info("Going to shutdown siddhi execution plan, planName: " + siddhiRuntime.executionPlanName); - siddhiRuntime.siddhiManager.getExecutionPlanRuntime(siddhiRuntime.executionPlanName).shutdown(); - LOG.info("Siddhi execution plan " + siddhiRuntime.executionPlanName + " is successfully shutdown "); - } - } - - @Override - public String toString(){ - // show the policyDef - return siddhiRuntime.policyDef.toString(); - } - - public String[] getStreamNames() { - return sourceStreams; - } - - public Map<String, String> getAdditionalContext() { - Map<String, String> context = new HashMap<String, String>(); - StringBuilder sourceStreams = new StringBuilder(); - for (String streamName : getStreamNames()) { - sourceStreams.append(streamName + ","); - } - if (sourceStreams.length() > 0) { - sourceStreams.deleteCharAt(sourceStreams.length() - 1); - } - context.put(AlertConstants.SOURCE_STREAMS, sourceStreams.toString()); - context.put(AlertConstants.POLICY_ID, policyId); - return context; - } - - public List<String> getOutputStreamAttrNameList() { - return siddhiRuntime.outputFields; - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyEvaluatorServiceProviderImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyEvaluatorServiceProviderImpl.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyEvaluatorServiceProviderImpl.java deleted file mode 100644 index 168b04f..0000000 --- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyEvaluatorServiceProviderImpl.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.siddhi; - -import java.util.Arrays; -import java.util.List; - -import org.apache.eagle.alert.common.AlertConstants; -import org.apache.eagle.alert.policy.PolicyEvaluator; -import org.apache.eagle.alert.policy.PolicyEvaluatorServiceProvider; -import com.fasterxml.jackson.databind.Module; -import com.fasterxml.jackson.databind.jsontype.NamedType; -import com.fasterxml.jackson.databind.module.SimpleModule; - -public class SiddhiPolicyEvaluatorServiceProviderImpl implements PolicyEvaluatorServiceProvider { - @Override - public String getPolicyType() { - return AlertConstants.policyType.siddhiCEPEngine.name(); - } - - @Override - public Class<? extends PolicyEvaluator> getPolicyEvaluator() { - return SiddhiPolicyEvaluator.class; - } - - @Override - public List<Module> getBindingModules() { - Module module1 = new SimpleModule(AlertConstants.POLICY_DEFINITION).registerSubtypes(new NamedType(SiddhiPolicyDefinition.class, getPolicyType())); - return Arrays.asList(module1); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiQueryCallbackImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiQueryCallbackImpl.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiQueryCallbackImpl.java deleted file mode 100644 index 44b9c77..0000000 --- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiQueryCallbackImpl.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.siddhi; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import org.apache.eagle.alert.entity.AlertAPIEntity; -import org.apache.eagle.executor.AlertExecutor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.wso2.siddhi.core.event.Event; -import org.wso2.siddhi.core.query.output.callback.QueryCallback; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.typesafe.config.Config; - -public class SiddhiQueryCallbackImpl extends QueryCallback{ - - private SiddhiPolicyEvaluator evaluator; - public static final Logger LOG = LoggerFactory.getLogger(SiddhiQueryCallbackImpl.class); - public static final ObjectMapper mapper = new ObjectMapper(); - public Config config; - - public SiddhiQueryCallbackImpl(Config config, SiddhiPolicyEvaluator evaluator) { - this.config = config; - this.evaluator = evaluator; - } - - public List<String> getOutputMessage(Event event) { - Object[] data = event.getData(); - List<String> rets = new ArrayList<String>(); - boolean isFirst = true; - for (Object object : data) { - // The first field is siddhiAlertContext, skip it - if (isFirst) { - isFirst = false; - continue; - } - String value = null; - if (object instanceof Double) { - value = String.valueOf((Double)object); - } - else if (object instanceof Integer) { - value = String.valueOf((Integer)object); - } - else if (object instanceof Long) { - value = String.valueOf((Long)object); - } - else if (object instanceof String) { - value = (String)object; - } - else if (object instanceof Boolean) { - value = String.valueOf((Boolean)object); - } - rets.add(value); - } - return rets; - } - - @Override - public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { - Object[] data = inEvents[0].getData(); - EagleAlertContext siddhiAlertContext = (EagleAlertContext)data[0]; - List<String> rets = getOutputMessage(inEvents[0]); - AlertAPIEntity alert = SiddhiAlertAPIEntityRendner.render(config, rets, siddhiAlertContext, timeStamp); - AlertExecutor alertExecutor = siddhiAlertContext.alertExecutor; - alertExecutor.onAlerts(siddhiAlertContext, Arrays.asList(alert)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiStreamMetadataUtils.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiStreamMetadataUtils.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiStreamMetadataUtils.java deleted file mode 100644 index 92394aa..0000000 --- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiStreamMetadataUtils.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.siddhi; - -import java.util.Map; -import java.util.SortedMap; - -import org.apache.eagle.alert.entity.AlertStreamSchemaEntity; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * convert metadata entities for a stream to stream definition for siddhi cep engine - * define stream HeapUsage (metric string, host string, value double, timestamp long) - */ -public class SiddhiStreamMetadataUtils { - private final static Logger LOG = LoggerFactory.getLogger(SiddhiStreamMetadataUtils.class); - - public final static String EAGLE_ALERT_CONTEXT_FIELD = "eagleAlertContext"; - - public static SortedMap<String, AlertStreamSchemaEntity> getAttrMap(String streamName) { - SortedMap<String, AlertStreamSchemaEntity> map = StreamMetadataManager.getInstance().getMetadataEntityMapForStream(streamName); - if(map == null || map.size() == 0){ - throw new IllegalStateException("alert stream schema should never be empty"); - } - return map; - } - - /** - * @see org.wso2.siddhi.query.api.definition.Attribute.Type - * make sure StreamMetadataManager.init is invoked before this method - * @param streamName - * @return - */ - public static String convertToStreamDef(String streamName){ - SortedMap<String, AlertStreamSchemaEntity> map = getAttrMap(streamName); - StringBuilder sb = new StringBuilder(); - sb.append(EAGLE_ALERT_CONTEXT_FIELD + " object,"); - for(Map.Entry<String, AlertStreamSchemaEntity> entry : map.entrySet()){ - appendAttributeNameType(sb, entry.getKey(), entry.getValue().getAttrType()); - } - if(sb.length() > 0){ - sb.deleteCharAt(sb.length()-1); - } - - String siddhiStreamDefFormat = "define stream " + streamName + "(" + "%s" + ");"; - return String.format(siddhiStreamDefFormat, sb.toString()); - } - - public static String convertToStreamDef(String streamName, Map<String, String> eventSchema){ - StringBuilder sb = new StringBuilder(); - sb.append("context" + " object,"); - for(Map.Entry<String, String> entry : eventSchema.entrySet()){ - appendAttributeNameType(sb, entry.getKey(), entry.getValue()); - } - if(sb.length() > 0){ - sb.deleteCharAt(sb.length()-1); - } - - String siddhiStreamDefFormat = "define stream " + streamName + "(" + "%s" + ");"; - return String.format(siddhiStreamDefFormat, sb.toString()); - } - - private static void appendAttributeNameType(StringBuilder sb, String attrName, String attrType){ - sb.append(attrName); - sb.append(" "); - if(attrType.equalsIgnoreCase(AttributeType.STRING.name())){ - sb.append("string"); - }else if(attrType.equalsIgnoreCase(AttributeType.INTEGER.name())){ - sb.append("int"); - }else if(attrType.equalsIgnoreCase(AttributeType.LONG.name())){ - sb.append("long"); - }else if(attrType.equalsIgnoreCase(AttributeType.BOOL.name())){ - sb.append("bool"); - }else if(attrType.equalsIgnoreCase(AttributeType.FLOAT.name())){ - sb.append("float"); - }else if(attrType.equalsIgnoreCase(AttributeType.DOUBLE.name())){ - sb.append("double"); - }else{ - LOG.warn("AttrType is not recognized, ignore : " + attrType); - } - sb.append(","); - } - - public static Object getAttrDefaultValue(String streamName, String attrName){ - SortedMap<String, AlertStreamSchemaEntity> map = getAttrMap(streamName); - AlertStreamSchemaEntity entity = map.get(attrName); - if (entity.getDefaultValue() != null) { - return entity.getDefaultValue(); - } - else { - String attrType = entity.getAttrType(); - if (attrType.equalsIgnoreCase(AttributeType.STRING.name())) { - return "NA"; - } else if (attrType.equalsIgnoreCase(AttributeType.INTEGER.name()) || attrType.equalsIgnoreCase(AttributeType.LONG.name())) { - return -1; - } else if (attrType.equalsIgnoreCase(AttributeType.BOOL.name())) { - return true; - } else { - LOG.warn("AttrType is not recognized: " + attrType + ", treat it as string"); - return "N/A"; - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/StreamMetadataManager.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/StreamMetadataManager.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/StreamMetadataManager.java deleted file mode 100644 index 618d245..0000000 --- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/StreamMetadataManager.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.siddhi; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.SortedMap; -import java.util.TreeMap; - -import org.apache.eagle.alert.common.AlertConstants; -import org.apache.eagle.alert.entity.AlertStreamSchemaEntity; -import com.typesafe.config.Config; -import org.apache.eagle.alert.dao.AlertStreamSchemaDAO; -import org.apache.eagle.common.config.EagleConfigConstants; -import org.apache.commons.collections.map.UnmodifiableMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * centralized memory where all stream metadata sit on, it is not mutable data - */ -public class StreamMetadataManager { - private static final Logger LOG = LoggerFactory.getLogger(StreamMetadataManager.class); - - private static StreamMetadataManager instance = new StreamMetadataManager(); - private Map<String, List<AlertStreamSchemaEntity>> map = new HashMap<String, List<AlertStreamSchemaEntity>>(); - private Map<String, SortedMap<String, AlertStreamSchemaEntity>> map2 = new HashMap<String, SortedMap<String, AlertStreamSchemaEntity>>(); - private volatile boolean initialized = false; - - private StreamMetadataManager(){ - } - - public static StreamMetadataManager getInstance(){ - return instance; - } - - private void internalInit(Config config, AlertStreamSchemaDAO dao){ - try{ - String dataSource = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.DATA_SOURCE); - List<AlertStreamSchemaEntity> list = dao.findAlertStreamSchemaByDataSource(dataSource); - if(list == null) - return; - for (AlertStreamSchemaEntity entity : list) { - String streamName = entity.getTags().get(AlertConstants.STREAM_NAME); - if (map.get(streamName) == null) { - map.put(streamName, new ArrayList<AlertStreamSchemaEntity>()); - map2.put(streamName, new TreeMap<String, AlertStreamSchemaEntity>()); - } - map.get(streamName).add(entity); - map2.get(streamName).put(entity.getTags().get(AlertConstants.ATTR_NAME), entity); - } - }catch(Exception ex){ - LOG.error("Fail building metadata manger", ex); - throw new IllegalStateException(ex); - } - } - - /** - * singleton with init would be good for unit test as well, and it ensures that - * initialization happens only once before you use it. - * @param config - * @param dao - */ - public void init(Config config, AlertStreamSchemaDAO dao){ - if(!initialized){ - synchronized(this){ - if(!initialized){ - if(LOG.isDebugEnabled()) LOG.debug("Initializing ..."); - internalInit(config, dao); - initialized = true; - LOG.info("Successfully initialized"); - } - } - }else{ - LOG.info("Already initialized, skip"); - } - } - - // Only for unit test purpose - public void reset() { - synchronized (this) { - initialized = false; - map.clear(); - map2.clear(); - } - } - - private void ensureInitialized(){ - if(!initialized) - throw new IllegalStateException("StreamMetadataManager should be initialized before using it"); - } - - public List<AlertStreamSchemaEntity> getMetadataEntitiesForStream(String streamName){ - ensureInitialized(); - return getMetadataEntitiesForAllStreams().get(streamName); - } - - public Map<String, List<AlertStreamSchemaEntity>> getMetadataEntitiesForAllStreams(){ - ensureInitialized(); - return UnmodifiableMap.decorate(map); - } - - public SortedMap<String, AlertStreamSchemaEntity> getMetadataEntityMapForStream(String streamName){ - ensureInitialized(); - return getMetadataEntityMapForAllStreams().get(streamName); - } - - public Map<String, SortedMap<String, AlertStreamSchemaEntity>> getMetadataEntityMapForAllStreams(){ - ensureInitialized(); - return UnmodifiableMap.decorate(map2); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/ContainsIgnoreCaseExtension.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/ContainsIgnoreCaseExtension.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/ContainsIgnoreCaseExtension.java deleted file mode 100644 index cf76134..0000000 --- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/ContainsIgnoreCaseExtension.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.alert.siddhi.extension; - -import org.wso2.siddhi.core.config.ExecutionPlanContext; -import org.wso2.siddhi.core.exception.ExecutionPlanRuntimeException; -import org.wso2.siddhi.core.executor.ExpressionExecutor; -import org.wso2.siddhi.core.executor.function.FunctionExecutor; -import org.wso2.siddhi.query.api.definition.Attribute; -import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException; - -public class ContainsIgnoreCaseExtension extends FunctionExecutor { - - Attribute.Type returnType = Attribute.Type.BOOL; - - @Override - protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) { - if (attributeExpressionExecutors.length != 2) { - throw new ExecutionPlanValidationException("Invalid no of arguments passed to str:containsIgnoreCase() function, required 2, " + - "but found " + attributeExpressionExecutors.length); - } - if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.STRING) { - throw new ExecutionPlanValidationException("Invalid parameter type found for the first argument of str:containsIgnoreCase() function, " + - "required "+ Attribute.Type.STRING+", but found "+attributeExpressionExecutors[0].getReturnType().toString()); - } - if (attributeExpressionExecutors[1].getReturnType() != Attribute.Type.STRING) { - throw new ExecutionPlanValidationException("Invalid parameter type found for the second argument of str:containsIgnoreCase() function, " + - "required "+ Attribute.Type.STRING+", but found "+attributeExpressionExecutors[1].getReturnType().toString()); - } - } - - @Override - protected Object execute(Object[] data) { - if (data[0] == null) { - throw new ExecutionPlanRuntimeException("Invalid input given to str:containsIgnoreCase() function. First argument cannot be null"); - } - if (data[1] == null) { - throw new ExecutionPlanRuntimeException("Invalid input given to str:containsIgnoreCase() function. Second argument cannot be null"); - } - String str1 = (String)data[0]; - String str2 = (String)data[1]; - return str1.toUpperCase().contains(str2.toUpperCase()); - } - - @Override - protected Object execute(Object data) { - return null; //Since the containsIgnoreCase function takes in 2 parameters, this method does not get called. Hence, not implemented. - } - - @Override - public void start() { - //Nothing to start - } - - @Override - public void stop() { - //Nothing to stop - } - - @Override - public Attribute.Type getReturnType() { - return returnType; - } - - @Override - public Object[] currentState() { - return new Object[]{}; - } - - @Override - public void restoreState(Object[] state) { - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/EqualsIgnoreCaseExtension.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/EqualsIgnoreCaseExtension.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/EqualsIgnoreCaseExtension.java deleted file mode 100644 index 0b6e7ec..0000000 --- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/EqualsIgnoreCaseExtension.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.alert.siddhi.extension; - -import org.wso2.siddhi.core.config.ExecutionPlanContext; -import org.wso2.siddhi.core.exception.ExecutionPlanRuntimeException; -import org.wso2.siddhi.core.executor.ConstantExpressionExecutor; -import org.wso2.siddhi.core.executor.ExpressionExecutor; -import org.wso2.siddhi.core.executor.function.FunctionExecutor; -import org.wso2.siddhi.query.api.definition.Attribute; -import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException; - -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -public class EqualsIgnoreCaseExtension extends FunctionExecutor { - - Attribute.Type returnType = Attribute.Type.BOOL; - - @Override - protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) { - if (attributeExpressionExecutors.length != 2) { - throw new ExecutionPlanValidationException("Invalid no of arguments passed to str:equalsIgnoreCase() function, required 2, " + - "but found " + attributeExpressionExecutors.length); - } - if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.STRING) { - throw new ExecutionPlanValidationException("Invalid parameter type found for the first argument of str:equalsIgnoreCase() function, " + - "required "+ Attribute.Type.STRING+", but found "+attributeExpressionExecutors[0].getReturnType().toString()); - } - if (attributeExpressionExecutors[1].getReturnType() != Attribute.Type.STRING) { - throw new ExecutionPlanValidationException("Invalid parameter type found for the second argument of str:equalsIgnoreCase() function, " + - "required "+ Attribute.Type.STRING+", but found "+attributeExpressionExecutors[1].getReturnType().toString()); - } - } - - @Override - protected Object execute(Object[] data) { - if (data[0] == null) { - throw new ExecutionPlanRuntimeException("Invalid input given to str:equalsIgnoreCase() function. First argument cannot be null"); - } - if (data[1] == null) { - throw new ExecutionPlanRuntimeException("Invalid input given to str:equalsIgnoreCase() function. Second argument cannot be null"); - } - String str1 = (String)data[0]; - String str2 = (String)data[1]; - return str1.equalsIgnoreCase(str2); - } - - @Override - protected Object execute(Object data) { - return null; //Since the equalsIgnoreCase function takes in 2 parameters, this method does not get called. Hence, not implemented. - } - - @Override - public void start() { - //Nothing to start - } - - @Override - public void stop() { - //Nothing to stop - } - - @Override - public Attribute.Type getReturnType() { - return returnType; - } - - @Override - public Object[] currentState() { - return new Object[]{}; - } - - @Override - public void restoreState(Object[] state) { - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java deleted file mode 100644 index 0bf80de..0000000 --- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.alert.siddhi.extension; - -import org.wso2.siddhi.core.config.ExecutionPlanContext; -import org.wso2.siddhi.core.exception.ExecutionPlanRuntimeException; -import org.wso2.siddhi.core.executor.ConstantExpressionExecutor; -import org.wso2.siddhi.core.executor.ExpressionExecutor; -import org.wso2.siddhi.extension.string.RegexpFunctionExtension; -import org.wso2.siddhi.query.api.definition.Attribute; -import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException; - -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -/** - * regexpIgnoreCase(string, regex) - * Tells whether or not this 'string' matches the given regular expression 'regex'. - * Accept Type(s): (STRING,STRING) - * Return Type(s): BOOLEAN - */ -public class RegexpIgnoreCaseFunctionExtension extends RegexpFunctionExtension { - - //state-variables - boolean isRegexConstant = false; - String regexConstant; - Pattern patternConstant; - - @Override - protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) { - if (attributeExpressionExecutors.length != 2) { - throw new ExecutionPlanValidationException("Invalid no of arguments passed to str:regexpIgnoreCase() function, required 2, " + - "but found " + attributeExpressionExecutors.length); - } - if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.STRING) { - throw new ExecutionPlanValidationException("Invalid parameter type found for the first argument of str:regexpIgnoreCase() function, " + - "required "+ Attribute.Type.STRING+", but found "+attributeExpressionExecutors[0].getReturnType().toString()); - } - if (attributeExpressionExecutors[1].getReturnType() != Attribute.Type.STRING) { - throw new ExecutionPlanValidationException("Invalid parameter type found for the second argument of str:regexpIgnoreCase() function, " + - "required "+ Attribute.Type.STRING+", but found "+attributeExpressionExecutors[1].getReturnType().toString()); - } - if(attributeExpressionExecutors[1] instanceof ConstantExpressionExecutor){ - isRegexConstant = true; - regexConstant = (String) ((ConstantExpressionExecutor) attributeExpressionExecutors[1]).getValue(); - patternConstant = Pattern.compile(regexConstant, Pattern.CASE_INSENSITIVE); - } - } - - @Override - protected Object execute(Object[] data) { - String regex; - Pattern pattern; - Matcher matcher; - - if (data[0] == null) { - throw new ExecutionPlanRuntimeException("Invalid input given to str:regexpIgnoreCase() function. First argument cannot be null"); - } - if (data[1] == null) { - throw new ExecutionPlanRuntimeException("Invalid input given to str:regexpIgnoreCase() function. Second argument cannot be null"); - } - String source = (String) data[0]; - - if(!isRegexConstant){ - regex = (String) data[1]; - pattern = Pattern.compile(regex, Pattern.CASE_INSENSITIVE); - matcher = pattern.matcher(source); - return matcher.matches(); - - } else { - matcher = patternConstant.matcher(source); - return matcher.matches(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java deleted file mode 100644 index 44014da..0000000 --- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java +++ /dev/null @@ -1,395 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.executor; - -import com.codahale.metrics.MetricRegistry; -import com.sun.jersey.client.impl.CopyOnWriteHashMap; -import com.typesafe.config.Config; -import org.apache.commons.lang3.time.DateUtils; -import org.apache.eagle.alert.common.AlertConstants; -import org.apache.eagle.alert.config.AbstractPolicyDefinition; -import org.apache.eagle.alert.dao.AlertDefinitionDAO; -import org.apache.eagle.alert.dao.AlertStreamSchemaDAO; -import org.apache.eagle.alert.dao.AlertStreamSchemaDAOImpl; -import org.apache.eagle.alert.entity.AlertAPIEntity; -import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity; -import org.apache.eagle.alert.policy.*; -import org.apache.eagle.alert.siddhi.EagleAlertContext; -import org.apache.eagle.alert.siddhi.SiddhiAlertHandler; -import org.apache.eagle.alert.siddhi.StreamMetadataManager; -import org.apache.eagle.common.config.EagleConfigConstants; -import org.apache.eagle.dataproc.core.JsonSerDeserUtils; -import org.apache.eagle.dataproc.core.ValuesArray; -import org.apache.eagle.datastream.Collector; -import org.apache.eagle.datastream.JavaStormStreamExecutor2; -import org.apache.eagle.datastream.Tuple2; -import org.apache.eagle.metric.reportor.EagleCounterMetric; -import org.apache.eagle.metric.reportor.EagleMetricListener; -import org.apache.eagle.metric.reportor.EagleServiceReporterMetricListener; -import org.apache.eagle.metric.reportor.MetricKeyCodeDecoder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.lang.management.ManagementFactory; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - -public class AlertExecutor extends JavaStormStreamExecutor2<String, AlertAPIEntity> implements PolicyLifecycleMethods, SiddhiAlertHandler, PolicyDistributionReportMethods { - private static final long serialVersionUID = 1L; - - private static final Logger LOG = LoggerFactory.getLogger(AlertExecutor.class); - - private String alertExecutorId; - private volatile CopyOnWriteHashMap<String, PolicyEvaluator> policyEvaluators; - private PolicyPartitioner partitioner; - private int numPartitions; - private int partitionSeq; - private Config config; - private Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs; - private AlertDefinitionDAO alertDefinitionDao; - private String[] sourceStreams; - private static String EAGLE_EVENT_COUNT = "eagle.event.count"; - private static String EAGLE_POLICY_EVAL_COUNT = "eagle.policy.eval.count"; - private static String EAGLE_POLICY_EVAL_FAIL_COUNT = "eagle.policy.eval.fail.count"; - private static String EAGLE_ALERT_COUNT = "eagle.alert.count"; - private static String EAGLE_ALERT_FAIL_COUNT = "eagle.alert.fail.count"; - private static long MERITE_GRANULARITY = DateUtils.MILLIS_PER_MINUTE; - private Map<String, Map<String, String>> dimensionsMap; // cache it for performance - private Map<String, String> baseDimensions; - private MetricRegistry registry; - private EagleMetricListener listener; - - public AlertExecutor(String alertExecutorId, PolicyPartitioner partitioner, int numPartitions, int partitionSeq, - AlertDefinitionDAO alertDefinitionDao, String[] sourceStreams){ - this.alertExecutorId = alertExecutorId; - this.partitioner = partitioner; - this.numPartitions = numPartitions; - this.partitionSeq = partitionSeq; - this.alertDefinitionDao = alertDefinitionDao; - this.sourceStreams = sourceStreams; - } - - public String getAlertExecutorId(){ - return this.alertExecutorId; - } - - public int getNumPartitions() { - return this.numPartitions; - } - - public int getPartitionSeq(){ - return this.partitionSeq; - } - - public PolicyPartitioner getPolicyPartitioner() { - return this.partitioner; - } - - public Map<String, Map<String, AlertDefinitionAPIEntity>> getInitialAlertDefs() { - return this.initialAlertDefs; - } - - public AlertDefinitionDAO getAlertDefinitionDao() { - return alertDefinitionDao; - } - - public Map<String, PolicyEvaluator> getPolicyEvaluators(){ - return policyEvaluators; - } - - @Override - public void prepareConfig(Config config) { - this.config = config; - } - - private void initMetricReportor() { - String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST); - int port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT); - - String username = config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME) ? - config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME) : null; - String password = config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD) ? - config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD) : null; - - //TODO: need to it replace it with batch flush listener - registry = new MetricRegistry(); - listener = new EagleServiceReporterMetricListener(host, port, username, password); - - baseDimensions = new HashMap<>(); - baseDimensions.put(AlertConstants.ALERT_EXECUTOR_ID, alertExecutorId); - baseDimensions.put(AlertConstants.PARTITIONSEQ, String.valueOf(partitionSeq)); - baseDimensions.put(AlertConstants.SOURCE, ManagementFactory.getRuntimeMXBean().getName()); - baseDimensions.put(EagleConfigConstants.DATA_SOURCE, config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.DATA_SOURCE)); - baseDimensions.put(EagleConfigConstants.SITE, config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.SITE)); - dimensionsMap = new HashMap<>(); - } - - /** - * for unit test purpose only - * @param config - * @return - */ - public AlertStreamSchemaDAO getAlertStreamSchemaDAO(Config config){ - return new AlertStreamSchemaDAOImpl(config); - } - - @Override - public void init() { - // initialize StreamMetadataManager before it is used - StreamMetadataManager.getInstance().init(config, getAlertStreamSchemaDAO(config)); - // for each AlertDefinition, to create a PolicyEvaluator - Map<String, PolicyEvaluator> tmpPolicyEvaluators = new HashMap<String, PolicyEvaluator>(); - - String site = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.SITE); - String dataSource = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.DATA_SOURCE); - try { - initialAlertDefs = alertDefinitionDao.findActiveAlertDefsGroupbyAlertExecutorId(site, dataSource); - } - catch (Exception ex) { - LOG.error("fail to initialize initialAlertDefs: ", ex); - throw new IllegalStateException("fail to initialize initialAlertDefs: ", ex); - } - if(initialAlertDefs == null || initialAlertDefs.isEmpty()){ - LOG.warn("No alert definitions was found for site: " + site + ", dataSource: " + dataSource); - } - else if (initialAlertDefs.get(alertExecutorId) != null) { - for(AlertDefinitionAPIEntity alertDef : initialAlertDefs.get(alertExecutorId).values()){ - int part = partitioner.partition(numPartitions, alertDef.getTags().get(AlertConstants.POLICY_TYPE), alertDef.getTags().get(AlertConstants.POLICY_ID)); - if (part == partitionSeq) { - tmpPolicyEvaluators.put(alertDef.getTags().get(AlertConstants.POLICY_ID), createPolicyEvaluator(alertDef)); - } - } - } - - policyEvaluators = new CopyOnWriteHashMap<>(); - // for efficiency, we don't put single policy evaluator - policyEvaluators.putAll(tmpPolicyEvaluators); - DynamicPolicyLoader policyLoader = DynamicPolicyLoader.getInstance(); - - policyLoader.init(initialAlertDefs, alertDefinitionDao, config); - String fullQualifiedAlertExecutorId = alertExecutorId + "_" + partitionSeq; - policyLoader.addPolicyChangeListener(fullQualifiedAlertExecutorId, this); - policyLoader.addPolicyDistributionReporter(fullQualifiedAlertExecutorId, this); - LOG.info("Alert Executor created, partitionSeq: " + partitionSeq + " , numPartitions: " + numPartitions); - LOG.info("All policy evaluators: " + policyEvaluators); - - initMetricReportor(); - } - - /** - * Create PolicyEvaluator instance according to policyType-mapped policy evaluator class - * - * @param alertDef alert definition - * @return PolicyEvaluator instance - */ - private PolicyEvaluator createPolicyEvaluator(AlertDefinitionAPIEntity alertDef){ - String policyType = alertDef.getTags().get(AlertConstants.POLICY_TYPE); - Class<? extends PolicyEvaluator> evalCls = PolicyManager.getInstance().getPolicyEvaluator(policyType); - if(evalCls == null){ - String msg = "No policy evaluator defined for policy type : " + policyType; - LOG.error(msg); - throw new IllegalStateException(msg); - } - - // check out whether strong incoming data validation is necessary - String needValidationConfigKey= AlertConstants.ALERT_EXECUTOR_CONFIGS + "." + alertExecutorId + ".needValidation"; - - // Default: true - boolean needValidation = !config.hasPath(needValidationConfigKey) || config.getBoolean(needValidationConfigKey); - - AbstractPolicyDefinition policyDef = null; - try { - policyDef = JsonSerDeserUtils.deserialize(alertDef.getPolicyDef(), AbstractPolicyDefinition.class, PolicyManager.getInstance().getPolicyModules(policyType)); - } catch (Exception ex) { - LOG.error("Fail initial alert policy def: "+alertDef.getPolicyDef(), ex); - } - PolicyEvaluator pe; - try{ - // Create evaluator instances - pe = evalCls.getConstructor(Config.class, String.class, AbstractPolicyDefinition.class, String[].class, boolean.class).newInstance(config, alertDef.getTags().get("policyId"), policyDef, sourceStreams, needValidation); - }catch(Exception ex){ - LOG.error("Fail creating new policyEvaluator", ex); - LOG.warn("Broken policy definition and stop running : " + alertDef.getPolicyDef()); - throw new IllegalStateException(ex); - } - return pe; - } - - /** - * verify both alertExecutor logic name and partition id - * @param alertDef alert definition - * - * @return whether accept the alert definition - */ - private boolean accept(AlertDefinitionAPIEntity alertDef){ - if(!alertDef.getTags().get("alertExecutorId").equals(alertExecutorId)) { - if(LOG.isDebugEnabled()){ - LOG.debug("alertDef does not belong to this alertExecutorId : " + alertExecutorId + ", alertDef : " + alertDef); - } - return false; - } - int targetPartitionSeq = partitioner.partition(numPartitions, alertDef.getTags().get(AlertConstants.POLICY_TYPE), alertDef.getTags().get(AlertConstants.POLICY_ID)); - if(targetPartitionSeq == partitionSeq) - return true; - return false; - } - - private long trim(long value, long granularity) { - return value / granularity * granularity; - } - - private void updateCounter(String name, Map<String, String> dimensions, double value) { - long current = System.currentTimeMillis(); - String metricName = MetricKeyCodeDecoder.codeMetricKey(name, dimensions); - if (registry.getMetrics().get(metricName) == null) { - EagleCounterMetric metric = new EagleCounterMetric(current, metricName, value, MERITE_GRANULARITY); - metric.registerListener(listener); - registry.register(metricName, metric); - } else { - EagleCounterMetric metric = (EagleCounterMetric) registry.getMetrics().get(metricName); - metric.update(value, current); - //TODO: need remove unused metric from registry - } - } - - private void updateCounter(String name, Map<String, String> dimensions) { - updateCounter(name, dimensions, 1.0); - } - - protected Map<String, String> getDimensions(String policyId) { - if (dimensionsMap.get(policyId) == null) { - Map<String, String> newDimensions = new HashMap<String, String>(baseDimensions); - newDimensions.put(AlertConstants.POLICY_ID, policyId); - dimensionsMap.put(policyId, newDimensions); - } - return dimensionsMap.get(policyId); - } - - /** - * within this single executor, execute all PolicyEvaluator sequentially - * the contract for input: - * 1. total # of fields for input is 3, which is fixed - * 2. the first field is key - * 3. the second field is stream name - * 4. the third field is value which is java SortedMap - */ - @Override - public void flatMap(java.util.List<Object> input, Collector<Tuple2<String, AlertAPIEntity>> outputCollector){ - if(input.size() != 3) - throw new IllegalStateException("AlertExecutor always consumes exactly 3 fields: key, stream name and value(SortedMap)"); - if(LOG.isDebugEnabled()) LOG.debug("Msg is coming " + input.get(2)); - if(LOG.isDebugEnabled()) LOG.debug("Current policyEvaluators: " + policyEvaluators.keySet().toString()); - - updateCounter(EAGLE_EVENT_COUNT, baseDimensions); - try{ - synchronized(this.policyEvaluators) { - for(Entry<String, PolicyEvaluator> entry : policyEvaluators.entrySet()){ - String policyId = entry.getKey(); - PolicyEvaluator evaluator = entry.getValue(); - updateCounter(EAGLE_POLICY_EVAL_COUNT, getDimensions(policyId)); - try { - EagleAlertContext siddhiAlertContext = new EagleAlertContext(); - siddhiAlertContext.alertExecutor = this; - siddhiAlertContext.policyId = policyId; - siddhiAlertContext.evaluator = evaluator; - siddhiAlertContext.outputCollector = outputCollector; - evaluator.evaluate(new ValuesArray(siddhiAlertContext, input.get(1), input.get(2))); - } - catch (Exception ex) { - LOG.error("Got an exception, but continue to run " + input.get(2).toString(), ex); - updateCounter(EAGLE_POLICY_EVAL_FAIL_COUNT, getDimensions(policyId)); - } - } - } - } catch(Exception ex){ - LOG.error(alertExecutorId + ", partition " + partitionSeq + ", error fetching alerts, but continue to run", ex); - updateCounter(EAGLE_ALERT_FAIL_COUNT, baseDimensions); - } - } - - @Override - public void onPolicyCreated(Map<String, AlertDefinitionAPIEntity> added) { - if(LOG.isDebugEnabled()) LOG.debug(alertExecutorId + ", partition " + partitionSeq + " policy added : " + added + " policyEvaluators " + policyEvaluators); - for(AlertDefinitionAPIEntity alertDef : added.values()){ - if(!accept(alertDef)) - continue; - LOG.info(alertExecutorId + ", partition " + partitionSeq + " policy really added " + alertDef); - PolicyEvaluator newEvaluator = createPolicyEvaluator(alertDef); - if(newEvaluator != null){ - synchronized(this.policyEvaluators) { - policyEvaluators.put(alertDef.getTags().get(AlertConstants.POLICY_ID), newEvaluator); - } - } - } - } - - @Override - public void onPolicyChanged(Map<String, AlertDefinitionAPIEntity> changed) { - if(LOG.isDebugEnabled()) LOG.debug(alertExecutorId + ", partition " + partitionSeq + " policy changed : " + changed); - for(AlertDefinitionAPIEntity alertDef : changed.values()){ - if(!accept(alertDef)) - continue; - LOG.info(alertExecutorId + ", partition " + partitionSeq + " policy really changed " + alertDef); - synchronized(this.policyEvaluators) { - PolicyEvaluator pe = policyEvaluators.get(alertDef.getTags().get(AlertConstants.POLICY_ID)); - pe.onPolicyUpdate(alertDef); - } - } - } - - @Override - public void onPolicyDeleted(Map<String, AlertDefinitionAPIEntity> deleted) { - if(LOG.isDebugEnabled()) LOG.debug(alertExecutorId + ", partition " + partitionSeq + " policy deleted : " + deleted); - for(AlertDefinitionAPIEntity alertDef : deleted.values()){ - if(!accept(alertDef)) - continue; - LOG.info(alertExecutorId + ", partition " + partitionSeq + " policy really deleted " + alertDef); - String policyId = alertDef.getTags().get(AlertConstants.POLICY_ID); - synchronized(this.policyEvaluators) { - if (policyEvaluators.containsKey(policyId)) { - PolicyEvaluator pe = policyEvaluators.remove(alertDef.getTags().get(AlertConstants.POLICY_ID)); - pe.onPolicyDelete(); - } - } - } - } - - @Override - public void onAlerts(EagleAlertContext context, List<AlertAPIEntity> alerts) { - if(alerts != null && !alerts.isEmpty()){ - String policyId = context.policyId; - LOG.info(String.format("Detected %s alerts for policy %s",alerts.size(),policyId)); - Collector outputCollector = context.outputCollector; - PolicyEvaluator evaluator = context.evaluator; - updateCounter(EAGLE_ALERT_COUNT, getDimensions(policyId), alerts.size()); - for (AlertAPIEntity entity : alerts) { - synchronized(this) { - outputCollector.collect(new Tuple2(policyId, entity)); - } - if(LOG.isDebugEnabled()) LOG.debug("A new alert is triggered: "+alertExecutorId + ", partition " + partitionSeq + ", Got an alert with output context: " + entity.getAlertContext() + ", for policy " + evaluator); - } - } - } - - @Override - public void report() { - PolicyDistroStatsLogReporter appender = new PolicyDistroStatsLogReporter(); - appender.reportPolicyMembership(alertExecutorId + "_" + partitionSeq, policyEvaluators.keySet()); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutorCreationUtils.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutorCreationUtils.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutorCreationUtils.java deleted file mode 100644 index 547e39d..0000000 --- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutorCreationUtils.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.executor; - -import org.apache.eagle.alert.common.AlertConstants; -import org.apache.eagle.alert.dao.AlertDefinitionDAO; -import org.apache.eagle.alert.dao.AlertDefinitionDAOImpl; -import org.apache.eagle.alert.dao.AlertExecutorDAOImpl; -import org.apache.eagle.alert.entity.AlertExecutorEntity; -import org.apache.eagle.alert.policy.DefaultPolicyPartitioner; -import org.apache.eagle.alert.policy.PolicyPartitioner; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigValue; -import org.apache.eagle.common.config.EagleConfigConstants; -import org.apache.eagle.service.client.EagleServiceConnector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -/** - * Create alert executors and provide callback for programmer to link alert executor to immediate parent executors - * - * <br/><br/> - * Explanations for programId, alertExecutorId and policy<br/><br/> - * - programId - distributed or single-process program for example one storm topology<br/> - * - alertExecutorId - one process/thread which executes multiple policies<br/> - * - policy - some rules to be evaluated<br/> - * - * <br/> - * - * Normally the mapping is like following: - * <pre> - * programId (1:N) alertExecutorId - * alertExecutorId (1:N) policy - * </pre> - */ -public class AlertExecutorCreationUtils { - private final static Logger LOG = LoggerFactory.getLogger(AlertExecutorCreationUtils.class); - - public static AlertExecutor[] createAlertExecutors(Config config, String alertExecutorId) throws Exception{ - // Read site and dataSource from configuration. - String dataSource = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.DATA_SOURCE); - LOG.info("Loading alerting definitions for dataSource: " + dataSource); - - // Get map from alertExecutorId to alert stream - // (dataSource) => Map[alertExecutorId:String,streamName:List[String]] - List<String> streamNames = new ArrayList<String>(); - AlertExecutorDAOImpl alertExecutorDAO = new AlertExecutorDAOImpl(new EagleServiceConnector(config)); - List<AlertExecutorEntity> alertExecutorEntities = alertExecutorDAO.findAlertExecutor(dataSource, alertExecutorId); - for(AlertExecutorEntity entity : alertExecutorEntities){ - streamNames.add(entity.getTags().get(AlertConstants.STREAM_NAME)); - } - - if(streamNames.isEmpty()){ - throw new IllegalStateException("upstream names should not be empty for alert " + alertExecutorId); - } - return createAlertExecutors(config, new AlertDefinitionDAOImpl(new EagleServiceConnector(config)), - streamNames, alertExecutorId); - } - - /** - * Build DAG Tasks based on persisted alert definition and schemas from eagle store. - * - * <h3>Require configuration:</h3> - * - * <ul> - * <li>eagleProps.site: program site id.</li> - * <li>eagleProps.dataSource: program data source.</li> - * <li>alertExecutorConfigs: only configured executor will be built into execution tasks.</li> - * </ul> - * - * <h3>Steps:</h3> - * - * <ol> - * <li>(upstreamTasks) => Map[streamName:String,upstreamTask:Task]</li> - * <li>(dataSource) => Map[alertExecutorId:String,streamName:List[String]]</li> - * <li>(site,dataSource) => Map[alertExecutorId,Map[policyId,alertDefinition]]</li> - * <li>(config["alertExecutorConfigs"]) => AlertExecutor(alertExecutorID, partitioner, numPartitions, partitionSeq, alertDefs, alertDefDAO, sourceStreams)[]</li> - * </ol> - */ - public static AlertExecutor[] createAlertExecutors(Config config, AlertDefinitionDAO alertDefDAO, - List<String> streamNames, String alertExecutorId) throws Exception{ - // Read `alertExecutorConfigs` from configuration and get config for this alertExecutorId - int numPartitions =1; - String partitionerCls = DefaultPolicyPartitioner.class.getCanonicalName(); - String alertExecutorConfigsKey = "alertExecutorConfigs"; - if(config.hasPath(alertExecutorConfigsKey)) { - Map<String, ConfigValue> alertExecutorConfigs = config.getObject(alertExecutorConfigsKey); - if(alertExecutorConfigs !=null && alertExecutorConfigs.containsKey(alertExecutorId)) { - Map<String, Object> alertExecutorConfig = (Map<String, Object>) alertExecutorConfigs.get(alertExecutorId).unwrapped(); - int parts = 0; - if(alertExecutorConfig.containsKey("parallelism")) parts = (int) (alertExecutorConfig.get("parallelism")); - numPartitions = parts == 0 ? 1 : parts; - if(alertExecutorConfig.containsKey("partitioner")) partitionerCls = (String) alertExecutorConfig.get("partitioner"); - } - } - - return createAlertExecutors(alertDefDAO, streamNames, alertExecutorId, numPartitions, partitionerCls); - } - - /** - * Build alert executors and assign alert definitions between these executors by partitioner (alertExecutorConfigs["${alertExecutorId}"]["partitioner"]) - */ - public static AlertExecutor[] createAlertExecutors(AlertDefinitionDAO alertDefDAO, List<String> sourceStreams, - String alertExecutorID, int numPartitions, String partitionerCls) throws Exception{ - LOG.info("Creating alert executors with alertExecutorID: " + alertExecutorID + ", numPartitions: " + numPartitions + ", Partition class is: "+ partitionerCls); - - PolicyPartitioner partitioner = (PolicyPartitioner)Class.forName(partitionerCls).newInstance(); - AlertExecutor[] alertExecutors = new AlertExecutor[numPartitions]; - String[] _sourceStreams = sourceStreams.toArray(new String[0]); - - for(int i = 0; i < numPartitions; i++){ - alertExecutors[i] = new AlertExecutor(alertExecutorID, partitioner, numPartitions, i, alertDefDAO,_sourceStreams); - } - return alertExecutors; - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/META-INF/services/org.apache.eagle.alert.policy.PolicyEvaluatorServiceProvider ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/META-INF/services/org.apache.eagle.alert.policy.PolicyEvaluatorServiceProvider b/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/META-INF/services/org.apache.eagle.alert.policy.PolicyEvaluatorServiceProvider deleted file mode 100644 index dc4e0a1..0000000 --- a/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/META-INF/services/org.apache.eagle.alert.policy.PolicyEvaluatorServiceProvider +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -org.apache.eagle.alert.siddhi.SiddhiPolicyEvaluatorServiceProviderImpl \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider b/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider new file mode 100644 index 0000000..eac2bfd --- /dev/null +++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.eagle.policy.siddhi.SiddhiPolicyEvaluatorServiceProviderImpl \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/str.siddhiext ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/str.siddhiext b/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/str.siddhiext deleted file mode 100644 index 4d5e237..0000000 --- a/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/str.siddhiext +++ /dev/null @@ -1,35 +0,0 @@ -# -# Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -charAt=org.wso2.siddhi.extension.string.CharAtFunctionExtension -coalesce=org.wso2.siddhi.extension.string.CoalesceFunctionExtension -concat=org.wso2.siddhi.extension.string.ConcatFunctionExtension -length=org.wso2.siddhi.extension.string.LengthFunctionExtension -lower=org.wso2.siddhi.extension.string.LowerFunctionExtension -regexp=org.wso2.siddhi.extension.string.RegexpFunctionExtension -repeat=org.wso2.siddhi.extension.string.RepeatFunctionExtension -replaceAll=org.wso2.siddhi.extension.string.ReplaceAllFunctionExtension -replaceFirst=org.wso2.siddhi.extension.string.ReplaceFirstFunctionExtension -reverse=org.wso2.siddhi.extension.string.ReverseFunctionExtension -strcmp=org.wso2.siddhi.extension.string.StrcmpFunctionExtension -substr=org.wso2.siddhi.extension.string.SubstrFunctionExtension -trim=org.wso2.siddhi.extension.string.TrimFunctionExtension -upper=org.wso2.siddhi.extension.string.UpperFunctionExtension -hex=org.wso2.siddhi.extension.string.HexFunctionExtension -unhex=org.wso2.siddhi.extension.string.UnhexFunctionExtension -equalsIgnoreCase=org.apache.eagle.alert.siddhi.extension.EqualsIgnoreCaseExtension -containsIgnoreCase=org.apache.eagle.alert.siddhi.extension.ContainsIgnoreCaseExtension -regexpIgnoreCase=org.apache.eagle.alert.siddhi.extension.RegexpIgnoreCaseFunctionExtension http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java index 521790a..3d69887 100644 --- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java +++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java @@ -18,22 +18,23 @@ package org.apache.eagle.alert.cep; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; -import org.apache.eagle.alert.dao.AlertDefinitionDAO; -import org.apache.eagle.alert.dao.AlertDefinitionDAOImpl; -import org.apache.eagle.alert.dao.AlertStreamSchemaDAO; -import org.apache.eagle.alert.dao.AlertStreamSchemaDAOImpl; +import junit.framework.Assert; import org.apache.eagle.alert.entity.AlertAPIEntity; import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity; import org.apache.eagle.alert.entity.AlertStreamSchemaEntity; -import org.apache.eagle.alert.siddhi.EagleAlertContext; -import org.apache.eagle.alert.siddhi.SiddhiPolicyDefinition; -import org.apache.eagle.alert.siddhi.SiddhiPolicyEvaluator; -import org.apache.eagle.alert.siddhi.StreamMetadataManager; +import org.apache.eagle.alert.executor.AlertExecutor; +import org.apache.eagle.alert.siddhi.SiddhiAlertAPIEntityRender; import org.apache.eagle.dataproc.core.ValuesArray; import org.apache.eagle.datastream.Collector; import org.apache.eagle.datastream.Tuple2; -import org.apache.eagle.executor.AlertExecutor; -import junit.framework.Assert; +import org.apache.eagle.policy.PolicyEvaluationContext; +import org.apache.eagle.policy.dao.AlertDefinitionDAOImpl; +import org.apache.eagle.policy.dao.AlertStreamSchemaDAO; +import org.apache.eagle.policy.dao.AlertStreamSchemaDAOImpl; +import org.apache.eagle.policy.dao.PolicyDefinitionDAO; +import org.apache.eagle.policy.siddhi.SiddhiPolicyDefinition; +import org.apache.eagle.policy.siddhi.SiddhiPolicyEvaluator; +import org.apache.eagle.policy.siddhi.StreamMetadataManager; import org.apache.eagle.service.client.EagleServiceConnector; import org.junit.Test; @@ -92,26 +93,28 @@ public class TestSiddhiEvaluator { "select * " + "insert into outputStream ;"; policyDef.setExpression(expression); - SiddhiPolicyEvaluator evaluator = new SiddhiPolicyEvaluator(config, "testPolicy", policyDef, new String[]{"hdfsAuditLogEventStream"}); - EagleAlertContext context = new EagleAlertContext(); + SiddhiPolicyEvaluator<AlertDefinitionAPIEntity, AlertAPIEntity> evaluator = new SiddhiPolicyEvaluator<AlertDefinitionAPIEntity, AlertAPIEntity>(config, "testPolicy", policyDef, new String[]{"hdfsAuditLogEventStream"}); + PolicyEvaluationContext context = new PolicyEvaluationContext(); - AlertDefinitionDAO alertDao = new AlertDefinitionDAOImpl(new EagleServiceConnector(null, null)) { + PolicyDefinitionDAO alertDao = new AlertDefinitionDAOImpl(new EagleServiceConnector(null, null)) { @Override - public Map<String, Map<String, AlertDefinitionAPIEntity>> findActiveAlertDefsGroupbyAlertExecutorId(String site, String dataSource) throws Exception { + public Map<String, Map<String, AlertDefinitionAPIEntity>> findActivePoliciesGroupbyExecutorId(String site, String dataSource) throws Exception { return null; } }; - context.alertExecutor = new AlertExecutor("alertExecutorId", null, 3, 1, alertDao, new String[]{"hdfsAuditLogEventStream"}) { + AlertExecutor alertExecutor = new AlertExecutor("alertExecutorId", null, 3, 1, alertDao, new String[]{"hdfsAuditLogEventStream"}) { @Override protected Map<String, String> getDimensions(String policyId) { return new HashMap<String, String>(); } }; - context.alertExecutor.prepareConfig(config); - context.alertExecutor.init(); + alertExecutor.prepareConfig(config); + alertExecutor.init(); + context.alertExecutor = alertExecutor; context.evaluator = evaluator; context.policyId = "testPolicy"; + context.resultRender = new SiddhiAlertAPIEntityRender(); context.outputCollector = new Collector<Tuple2<String, AlertAPIEntity>> () { @Override public void collect(Tuple2<String, AlertAPIEntity> stringAlertAPIEntityTuple2) { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestAlertDefinitionDAOImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestAlertDefinitionDAOImpl.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestAlertDefinitionDAOImpl.java index 70853b4..783abea 100644 --- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestAlertDefinitionDAOImpl.java +++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestAlertDefinitionDAOImpl.java @@ -18,8 +18,10 @@ package org.apache.eagle.alert.dao; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import org.apache.eagle.policy.dao.AlertDefinitionDAOImpl; import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity; import org.apache.eagle.common.config.EagleConfigConstants; +import org.apache.eagle.policy.dao.PolicyDefinitionDAO; import org.apache.eagle.service.client.EagleServiceConnector; import org.junit.Assert; import org.junit.Test; @@ -52,9 +54,9 @@ public class TestAlertDefinitionDAOImpl { String site = "sandbox"; String dataSource = "UnitTest"; - AlertDefinitionDAO dao = new AlertDefinitionDAOImpl(new EagleServiceConnector(eagleServiceHost, eagleServicePort)) { + PolicyDefinitionDAO dao = new AlertDefinitionDAOImpl(new EagleServiceConnector(eagleServiceHost, eagleServicePort)) { @Override - public List<AlertDefinitionAPIEntity> findActiveAlertDefs(String site, String dataSource) throws Exception { + public List<AlertDefinitionAPIEntity> findActivePolicies(String site, String dataSource) throws Exception { List<AlertDefinitionAPIEntity> list = new ArrayList<AlertDefinitionAPIEntity>(); list.add(buildTestAlertDefEntity(site, dataSource, "TestExecutor1", "TestPolicyIDA", "TestPolicyTypeA")); list.add(buildTestAlertDefEntity(site, dataSource, "TestExecutor1", "TestPolicyIDB", "TestPolicyTypeB")); @@ -64,7 +66,7 @@ public class TestAlertDefinitionDAOImpl { } }; - Map<String, Map<String, AlertDefinitionAPIEntity>> retMap = dao.findActiveAlertDefsGroupbyAlertExecutorId(site, dataSource); + Map<String, Map<String, AlertDefinitionAPIEntity>> retMap = dao.findActivePoliciesGroupbyExecutorId(site, dataSource); Assert.assertEquals(2, retMap.size()); Assert.assertEquals(2, retMap.get("TestExecutor1").size()); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java index b7f40fa..82aa89f 100644 --- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java +++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java @@ -24,9 +24,10 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import junit.framework.Assert; +import org.apache.eagle.policy.dao.AlertStreamSchemaDAO; import org.apache.eagle.alert.entity.AlertStreamSchemaEntity; -import org.apache.eagle.alert.siddhi.SiddhiStreamMetadataUtils; -import org.apache.eagle.alert.siddhi.StreamMetadataManager; +import org.apache.eagle.policy.siddhi.SiddhiStreamMetadataUtils; +import org.apache.eagle.policy.siddhi.StreamMetadataManager; import org.junit.Test; public class TestSiddhiStreamMetadataUtils { @@ -43,7 +44,7 @@ public class TestSiddhiStreamMetadataUtils { } }); String siddhiStreamDef = SiddhiStreamMetadataUtils.convertToStreamDef("testStreamName"); - Assert.assertEquals("define stream " + "testStreamName" + "(eagleAlertContext object,attrName1 string,attrName2 long);", siddhiStreamDef); + Assert.assertEquals("define stream " + "testStreamName" + "(eagleAlertContext object, attrName1 string,attrName2 long);", siddhiStreamDef); StreamMetadataManager.getInstance().reset(); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestStreamDefinitionDAOImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestStreamDefinitionDAOImpl.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestStreamDefinitionDAOImpl.java index 80aacd6..91c5332 100644 --- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestStreamDefinitionDAOImpl.java +++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestStreamDefinitionDAOImpl.java @@ -18,8 +18,10 @@ package org.apache.eagle.alert.dao; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import org.apache.eagle.policy.dao.AlertStreamSchemaDAO; +import org.apache.eagle.policy.dao.AlertStreamSchemaDAOImpl; import org.apache.eagle.alert.entity.AlertStreamSchemaEntity; -import org.apache.eagle.alert.siddhi.StreamMetadataManager; +import org.apache.eagle.policy.siddhi.StreamMetadataManager; import org.junit.Assert; import org.junit.Test;