http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyDefinition.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyDefinition.java
 
b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyDefinition.java
deleted file mode 100644
index 39eb0c9..0000000
--- 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyDefinition.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.siddhi;
-
-import org.apache.eagle.alert.config.AbstractPolicyDefinition;
-
-/**
- * siddhi policy definition has the following format
- * {
-        "type":"SiddhiCEPEngine",
-               "expression" : "from every b1=HeapUsage[metric == 
'eagle.metric.gc'] -> a1=FullGCEvent[eventName == 'full gc'] -> 
b2=HeapUsage[metric == b1.metric and host == b1.host and value >= b1.value * 
0.8] within 100 sec select a1.eventName, b1.metric, b2.timestamp, 60 as 
timerange insert into GCMonitor; "
-       }
- */
-public class SiddhiPolicyDefinition extends AbstractPolicyDefinition {
-       private String expression;
-
-       public String getExpression() {
-               return expression;
-       }
-       public void setExpression(String expression) {
-               this.expression = expression;
-       }
-       
-       @Override
-       public String toString(){
-               return expression;
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyEvaluator.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyEvaluator.java
 
b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyEvaluator.java
deleted file mode 100644
index 87eaf48..0000000
--- 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyEvaluator.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.siddhi;
-
-import java.lang.reflect.Field;
-import java.util.*;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-
-import org.apache.eagle.alert.config.AbstractPolicyDefinition;
-import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
-import org.apache.eagle.alert.common.AlertConstants;
-import org.apache.eagle.alert.policy.PolicyManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.query.output.callback.QueryCallback;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-import org.wso2.siddhi.query.api.execution.query.Query;
-import org.wso2.siddhi.query.api.execution.query.selection.OutputAttribute;
-
-import org.apache.eagle.alert.entity.AlertAPIEntity;
-import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-import org.apache.eagle.alert.policy.PolicyEvaluator;
-import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
-import org.apache.eagle.dataproc.core.ValuesArray;
-import com.typesafe.config.Config;
-
-/**
- * when policy is updated or deleted, SiddhiManager.shutdown should be invoked 
to release resources.
- * during this time, synchronization is important
- */
-public class SiddhiPolicyEvaluator implements PolicyEvaluator{
-       private final static Logger LOG = 
LoggerFactory.getLogger(SiddhiPolicyEvaluator.class); 
-       public static final int DEFAULT_QUEUE_SIZE = 1000;
-       private final BlockingQueue<AlertAPIEntity> queue = new 
ArrayBlockingQueue<AlertAPIEntity>(DEFAULT_QUEUE_SIZE);
-       private volatile SiddhiRuntime siddhiRuntime;
-       private String[] sourceStreams;
-       private boolean needValidation;
-       private String policyId;
-       private Config config;
-       private final static String EXECUTION_PLAN_NAME = "query";
-       
-       /**
-        * everything dependent on policyDef should be together and switched in 
runtime
-        */
-       public static class SiddhiRuntime{
-               QueryCallback callback;
-               Map<String, InputHandler> siddhiInputHandlers;
-               SiddhiManager siddhiManager;
-               SiddhiPolicyDefinition policyDef;
-               List<String> outputFields;
-               String executionPlanName;
-       }
-       
-       public SiddhiPolicyEvaluator(Config config, String policyName, 
AbstractPolicyDefinition policyDef, String[] sourceStreams){
-               this(config, policyName, policyDef, sourceStreams, false);
-       }
-       
-       public SiddhiPolicyEvaluator(Config config, String policyId, 
AbstractPolicyDefinition policyDef, String[] sourceStreams, boolean 
needValidation){
-               this.config = config;
-               this.policyId = policyId;
-               this.needValidation = needValidation;
-               this.sourceStreams = sourceStreams; 
-               init(policyDef);
-       }
-       
-       public void init(AbstractPolicyDefinition policyDef){                   
-               siddhiRuntime = 
createSiddhiRuntime((SiddhiPolicyDefinition)policyDef);
-       }
-
-       public static String addContextFieldIfNotExist(String expression) {     
        
-               // select fieldA, fieldB --> select eagleAlertContext, fieldA, 
fieldB
-               int pos = expression.indexOf("select ") + 7;
-               int index = pos;
-               boolean isSelectStarPattern = true;
-               while(index < expression.length()) {
-                       if (expression.charAt(index) == ' ') index++;
-                       else if (expression.charAt(index) == '*') break;
-                       else {
-                               isSelectStarPattern = false;
-                               break;
-                       }
-               }
-               if (isSelectStarPattern) return expression;
-               StringBuilder sb = new StringBuilder();
-               sb.append(expression.substring(0, pos));
-               sb.append(SiddhiStreamMetadataUtils.EAGLE_ALERT_CONTEXT_FIELD + 
",");
-               sb.append(expression.substring(pos, expression.length()));
-               return sb.toString();
-       }
-
-       private SiddhiRuntime createSiddhiRuntime(SiddhiPolicyDefinition 
policyDef){
-               SiddhiManager siddhiManager = new SiddhiManager();
-               Map<String, InputHandler> siddhiInputHandlers = new 
HashMap<String, InputHandler>();
-
-               StringBuilder sb = new StringBuilder();         
-               for(String sourceStream : sourceStreams){
-                       String streamDef = 
SiddhiStreamMetadataUtils.convertToStreamDef(sourceStream);
-                       LOG.info("Siddhi stream definition : " + streamDef);
-                       sb.append(streamDef);
-               }
-               
-               String expression = 
addContextFieldIfNotExist(policyDef.getExpression());
-               String executionPlan = sb.toString() + " @info(name = '" + 
EXECUTION_PLAN_NAME + "') " +  expression;
-               ExecutionPlanRuntime executionPlanRuntime = 
siddhiManager.createExecutionPlanRuntime(executionPlan);
-               
-               for(String sourceStream : sourceStreams){                       
-                       siddhiInputHandlers.put(sourceStream, 
executionPlanRuntime.getInputHandler(sourceStream));
-               }
-               executionPlanRuntime.start();
-
-               QueryCallback callback = new SiddhiQueryCallbackImpl(config, 
this);             
-
-               LOG.info("Siddhi query: " + expression);
-               executionPlanRuntime.addCallback(EXECUTION_PLAN_NAME, callback);
-
-               List<String> outputFields = new ArrayList<String>();
-               try {
-               Field field = 
QueryCallback.class.getDeclaredField(EXECUTION_PLAN_NAME);
-               field.setAccessible(true);
-               Query query = (Query)field.get(callback);
-               List<OutputAttribute> list = 
query.getSelector().getSelectionList();
-               for (OutputAttribute output : list) {                   
-                       outputFields.add(output.getRename());
-               }
-               }
-               catch (Exception ex) {
-                       LOG.error("Got an Exception when initial outputFields 
", ex);
-               }
-               SiddhiRuntime runtime = new SiddhiRuntime();
-               runtime.siddhiInputHandlers = siddhiInputHandlers;
-               runtime.siddhiManager = siddhiManager;
-               runtime.callback = callback;
-               runtime.policyDef = policyDef;
-               runtime.outputFields = outputFields;
-               runtime.executionPlanName = executionPlanRuntime.getName();
-               return runtime;
-       }
-       
-       /**
-        * 1. input has 3 fields, first is siddhi context, second is 
streamName, the last one is map of attribute name/value
-        * 2. runtime check for input data (This is very expensive, so we 
ignore for now)
-        *     the size of input map should be equal to size of attributes 
which stream metadata defines
-        *     the attribute names should be equal to attribute names which 
stream metadata defines
-        *     the input field cannot be null
-        */
-       @SuppressWarnings({ "rawtypes", "unchecked" })
-       @Override
-       public void evaluate(ValuesArray data) throws Exception {
-               if(LOG.isDebugEnabled()) LOG.debug("Siddhi policy evaluator 
consumers data :" + data);
-        Object siddhiAlertContext = data.get(0);
-               String streamName = (String)data.get(1);
-               SortedMap map = (SortedMap)data.get(2);
-               validateEventInRuntime(streamName, map);
-               synchronized(siddhiRuntime){
-                       //insert siddhiAlertContext into the first field
-                       List<Object> input = new ArrayList<>();
-                       input.add(siddhiAlertContext);
-                       putAttrsIntoInputStream(input, streamName, map);
-                       
siddhiRuntime.siddhiInputHandlers.get(streamName).send(input.toArray(new 
Object[0]));
-               }
-       }
-
-       /**
-        * This is a heavy operation, we should avoid to use.
-     *
-     * This validation method will skip invalid fields in event which are not 
declared in stream schema otherwise it will cause exception for siddhi engine.
-     *
-     * @see <a 
href="https://issues.apache.org/jira/browse/EAGLE-49";>https://issues.apache.org/jira/browse/EAGLE-49</a>
-     *
-        * @param sourceStream source steam id
-        * @param data input event
-        */
-       private void validateEventInRuntime(String sourceStream, SortedMap 
data){
-               if(!needValidation)
-                       return;
-               SortedMap<String, AlertStreamSchemaEntity> map = 
StreamMetadataManager.getInstance().getMetadataEntityMapForStream(sourceStream);
-        if(!map.keySet().equals(data.keySet())){
-            Set<Object> badKeys = new TreeSet<>();
-            for(Object key:data.keySet()) if(!map.containsKey(key)) 
badKeys.add(key);
-            LOG.warn(String.format("Ignore invalid fields %s in event: %s from 
stream: %s, valid fields are: %s", badKeys.toString(),data.toString(), 
sourceStream,map.keySet().toString()));
-            for(Object key:badKeys) data.remove(key);
-        }
-       }
-
-       private void putAttrsIntoInputStream(List<Object> input, String 
streamName, SortedMap map) {
-               if(!needValidation) {
-                       input.addAll(map.values());
-                       return;
-               }
-               for (Object key : map.keySet()) {
-                       Object value = map.get(key);
-                       if (value == null) {
-                               
input.add(SiddhiStreamMetadataUtils.getAttrDefaultValue(streamName, 
(String)key));
-                       }
-                       else input.add(value);
-               }
-       }
-
-       @Override
-       public void onPolicyUpdate(AlertDefinitionAPIEntity newAlertDef) {
-               AbstractPolicyDefinition policyDef = null;
-               try {
-                       policyDef = 
JsonSerDeserUtils.deserialize(newAlertDef.getPolicyDef(), 
-                                       AbstractPolicyDefinition.class, 
PolicyManager.getInstance().getPolicyModules(newAlertDef.getTags().get(AlertConstants.POLICY_TYPE)));
-               }
-               catch (Exception ex) {
-                       LOG.error("Initial policy def error, ", ex);
-               }
-               SiddhiRuntime previous = siddhiRuntime;
-               siddhiRuntime = 
createSiddhiRuntime((SiddhiPolicyDefinition)policyDef);
-               synchronized(previous){
-                       
previous.siddhiManager.getExecutionPlanRuntime(previous.executionPlanName).shutdown();
-               }
-       }
-       
-       @Override
-       public void onPolicyDelete(){
-               synchronized(siddhiRuntime){
-                       LOG.info("Going to shutdown siddhi execution plan, 
planName: " + siddhiRuntime.executionPlanName);
-                       
siddhiRuntime.siddhiManager.getExecutionPlanRuntime(siddhiRuntime.executionPlanName).shutdown();
-                       LOG.info("Siddhi execution plan " + 
siddhiRuntime.executionPlanName + " is successfully shutdown ");
-               }
-       }
-       
-       @Override
-       public String toString(){
-               // show the policyDef
-               return siddhiRuntime.policyDef.toString();
-       }
-       
-       public String[] getStreamNames() {
-               return sourceStreams;
-       }
-
-       public Map<String, String> getAdditionalContext() {
-               Map<String, String> context = new HashMap<String, String>();
-               StringBuilder sourceStreams = new StringBuilder();
-               for (String streamName : getStreamNames()) {
-                       sourceStreams.append(streamName + ",");
-               }
-               if (sourceStreams.length() > 0) {
-                       sourceStreams.deleteCharAt(sourceStreams.length() - 1);
-               }
-               context.put(AlertConstants.SOURCE_STREAMS, 
sourceStreams.toString());
-               context.put(AlertConstants.POLICY_ID, policyId);
-               return context;
-       }
-
-       public List<String> getOutputStreamAttrNameList() {
-               return siddhiRuntime.outputFields;
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyEvaluatorServiceProviderImpl.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyEvaluatorServiceProviderImpl.java
 
b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyEvaluatorServiceProviderImpl.java
deleted file mode 100644
index 168b04f..0000000
--- 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyEvaluatorServiceProviderImpl.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.siddhi;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.eagle.alert.common.AlertConstants;
-import org.apache.eagle.alert.policy.PolicyEvaluator;
-import org.apache.eagle.alert.policy.PolicyEvaluatorServiceProvider;
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.jsontype.NamedType;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-
-public class SiddhiPolicyEvaluatorServiceProviderImpl implements 
PolicyEvaluatorServiceProvider {
-       @Override
-       public String getPolicyType() {
-               return AlertConstants.policyType.siddhiCEPEngine.name();
-       }
-
-       @Override
-       public Class<? extends PolicyEvaluator> getPolicyEvaluator() {
-               return SiddhiPolicyEvaluator.class;
-       }
-
-       @Override
-       public List<Module> getBindingModules() {
-               Module module1 = new 
SimpleModule(AlertConstants.POLICY_DEFINITION).registerSubtypes(new 
NamedType(SiddhiPolicyDefinition.class, getPolicyType()));
-               return Arrays.asList(module1);
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiQueryCallbackImpl.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiQueryCallbackImpl.java
 
b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiQueryCallbackImpl.java
deleted file mode 100644
index 44b9c77..0000000
--- 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiQueryCallbackImpl.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.siddhi;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.eagle.alert.entity.AlertAPIEntity;
-import org.apache.eagle.executor.AlertExecutor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.query.output.callback.QueryCallback;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.typesafe.config.Config;
-
-public class SiddhiQueryCallbackImpl extends QueryCallback{
-
-       private SiddhiPolicyEvaluator evaluator;
-       public static final Logger LOG = 
LoggerFactory.getLogger(SiddhiQueryCallbackImpl.class);
-       public static final ObjectMapper mapper = new ObjectMapper();   
-       public Config config;
-       
-       public SiddhiQueryCallbackImpl(Config config, SiddhiPolicyEvaluator 
evaluator) {
-               this.config = config;           
-               this.evaluator = evaluator;
-       }
-       
-       public List<String> getOutputMessage(Event event) {
-               Object[] data = event.getData();
-               List<String> rets = new ArrayList<String>();
-               boolean isFirst = true;
-               for (Object object : data) {
-                       // The first field is siddhiAlertContext, skip it
-                       if (isFirst) {
-                               isFirst = false;
-                               continue;
-                       }
-                       String value = null;
-                       if (object instanceof Double) {
-                               value = String.valueOf((Double)object);
-                       }
-                       else if (object instanceof Integer) {
-                               value = String.valueOf((Integer)object);
-                       }
-                       else if (object instanceof Long) {
-                               value = String.valueOf((Long)object);
-                       }
-                       else if (object instanceof String) {
-                               value = (String)object;
-                       }
-                       else if (object instanceof Boolean) {
-                               value = String.valueOf((Boolean)object);
-                       }
-                       rets.add(value);
-               }
-               return rets;
-       }
-       
-       @Override
-       public void receive(long timeStamp, Event[] inEvents, Event[] 
removeEvents) {
-               Object[] data = inEvents[0].getData();
-               EagleAlertContext siddhiAlertContext = 
(EagleAlertContext)data[0];
-               List<String> rets = getOutputMessage(inEvents[0]);
-               AlertAPIEntity alert = 
SiddhiAlertAPIEntityRendner.render(config, rets, siddhiAlertContext, timeStamp);
-               AlertExecutor alertExecutor = siddhiAlertContext.alertExecutor;
-               alertExecutor.onAlerts(siddhiAlertContext, 
Arrays.asList(alert));
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiStreamMetadataUtils.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiStreamMetadataUtils.java
 
b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiStreamMetadataUtils.java
deleted file mode 100644
index 92394aa..0000000
--- 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiStreamMetadataUtils.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.siddhi;
-
-import java.util.Map;
-import java.util.SortedMap;
-
-import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * convert metadata entities for a stream to stream definition for siddhi cep 
engine
- * define stream HeapUsage (metric string, host string, value double, 
timestamp long)
- */
-public class SiddhiStreamMetadataUtils {
-       private final static Logger LOG = 
LoggerFactory.getLogger(SiddhiStreamMetadataUtils.class);
-       
-       public final static String EAGLE_ALERT_CONTEXT_FIELD = 
"eagleAlertContext";
-
-       public static SortedMap<String, AlertStreamSchemaEntity> 
getAttrMap(String streamName) {
-               SortedMap<String, AlertStreamSchemaEntity> map = 
StreamMetadataManager.getInstance().getMetadataEntityMapForStream(streamName);
-               if(map == null || map.size() == 0){
-                       throw new IllegalStateException("alert stream schema 
should never be empty");
-               }
-               return map;
-       }
-
-       /**
-     * @see org.wso2.siddhi.query.api.definition.Attribute.Type
-     * make sure StreamMetadataManager.init is invoked before this method
-        * @param streamName
-        * @return
-        */
-       public static String convertToStreamDef(String streamName){
-               SortedMap<String, AlertStreamSchemaEntity> map = 
getAttrMap(streamName);
-               StringBuilder sb = new StringBuilder();         
-               sb.append(EAGLE_ALERT_CONTEXT_FIELD + " object,");
-               for(Map.Entry<String, AlertStreamSchemaEntity> entry : 
map.entrySet()){
-            appendAttributeNameType(sb, entry.getKey(), 
entry.getValue().getAttrType());
-               }
-               if(sb.length() > 0){
-                       sb.deleteCharAt(sb.length()-1);
-               }
-               
-               String siddhiStreamDefFormat = "define stream " + streamName + 
"(" + "%s" + ");";
-               return String.format(siddhiStreamDefFormat, sb.toString());
-       }
-
-    public static String convertToStreamDef(String streamName, Map<String, 
String> eventSchema){
-        StringBuilder sb = new StringBuilder();
-        sb.append("context" + " object,");
-        for(Map.Entry<String, String> entry : eventSchema.entrySet()){
-            appendAttributeNameType(sb, entry.getKey(), entry.getValue());
-        }
-        if(sb.length() > 0){
-            sb.deleteCharAt(sb.length()-1);
-        }
-
-        String siddhiStreamDefFormat = "define stream " + streamName + "(" + 
"%s" + ");";
-        return String.format(siddhiStreamDefFormat, sb.toString());
-    }
-
-    private static void appendAttributeNameType(StringBuilder sb, String 
attrName, String attrType){
-        sb.append(attrName);
-        sb.append(" ");
-        if(attrType.equalsIgnoreCase(AttributeType.STRING.name())){
-            sb.append("string");
-        }else if(attrType.equalsIgnoreCase(AttributeType.INTEGER.name())){
-            sb.append("int");
-        }else if(attrType.equalsIgnoreCase(AttributeType.LONG.name())){
-            sb.append("long");
-        }else if(attrType.equalsIgnoreCase(AttributeType.BOOL.name())){
-            sb.append("bool");
-        }else if(attrType.equalsIgnoreCase(AttributeType.FLOAT.name())){
-            sb.append("float");
-        }else if(attrType.equalsIgnoreCase(AttributeType.DOUBLE.name())){
-            sb.append("double");
-        }else{
-            LOG.warn("AttrType is not recognized, ignore : " + attrType);
-        }
-        sb.append(",");
-    }
-
-       public static Object getAttrDefaultValue(String streamName, String 
attrName){
-               SortedMap<String, AlertStreamSchemaEntity> map = 
getAttrMap(streamName);
-               AlertStreamSchemaEntity entity = map.get(attrName);
-               if (entity.getDefaultValue() != null) {
-                       return entity.getDefaultValue();
-               }
-               else {
-                       String attrType = entity.getAttrType();
-                       if 
(attrType.equalsIgnoreCase(AttributeType.STRING.name())) {
-                               return "NA";
-                       } else if 
(attrType.equalsIgnoreCase(AttributeType.INTEGER.name()) || 
attrType.equalsIgnoreCase(AttributeType.LONG.name())) {
-                               return -1;
-                       } else if 
(attrType.equalsIgnoreCase(AttributeType.BOOL.name())) {
-                               return true;
-                       } else {
-                               LOG.warn("AttrType is not recognized: " + 
attrType + ", treat it as string");
-                               return "N/A";
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/StreamMetadataManager.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/StreamMetadataManager.java
 
b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/StreamMetadataManager.java
deleted file mode 100644
index 618d245..0000000
--- 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/StreamMetadataManager.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.siddhi;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
-
-import org.apache.eagle.alert.common.AlertConstants;
-import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
-import com.typesafe.config.Config;
-import org.apache.eagle.alert.dao.AlertStreamSchemaDAO;
-import org.apache.eagle.common.config.EagleConfigConstants;
-import org.apache.commons.collections.map.UnmodifiableMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * centralized memory where all stream metadata sit on, it is not mutable data
- */
-public class StreamMetadataManager {
-       private static final Logger LOG = 
LoggerFactory.getLogger(StreamMetadataManager.class);
-       
-       private static StreamMetadataManager instance = new 
StreamMetadataManager();
-       private Map<String, List<AlertStreamSchemaEntity>> map = new 
HashMap<String, List<AlertStreamSchemaEntity>>();
-       private Map<String, SortedMap<String, AlertStreamSchemaEntity>> map2 = 
new HashMap<String, SortedMap<String, AlertStreamSchemaEntity>>();
-       private volatile boolean initialized = false;
-       
-       private StreamMetadataManager(){
-       }
-       
-       public static StreamMetadataManager getInstance(){
-               return instance;
-       }
-       
-       private void internalInit(Config config, AlertStreamSchemaDAO dao){
-               try{
-                       String dataSource = 
config.getString(EagleConfigConstants.EAGLE_PROPS + "." + 
EagleConfigConstants.DATA_SOURCE);
-                       List<AlertStreamSchemaEntity> list = 
dao.findAlertStreamSchemaByDataSource(dataSource);
-                       if(list == null)
-                               return;
-                       for (AlertStreamSchemaEntity entity : list) {
-                               String streamName = 
entity.getTags().get(AlertConstants.STREAM_NAME);
-                               if (map.get(streamName) == null) {
-                                       map.put(streamName, new 
ArrayList<AlertStreamSchemaEntity>());
-                                       map2.put(streamName, new 
TreeMap<String, AlertStreamSchemaEntity>());
-                               }
-                               map.get(streamName).add(entity);
-                               
map2.get(streamName).put(entity.getTags().get(AlertConstants.ATTR_NAME), 
entity);
-                       }
-               }catch(Exception ex){
-                       LOG.error("Fail building metadata manger", ex);
-                       throw new IllegalStateException(ex);
-               }
-       }
-       
-       /**
-        * singleton with init would be good for unit test as well, and it 
ensures that
-        * initialization happens only once before you use it.  
-        * @param config
-        * @param dao
-        */
-       public void init(Config config, AlertStreamSchemaDAO dao){
-               if(!initialized){
-                       synchronized(this){
-                               if(!initialized){
-                    if(LOG.isDebugEnabled()) LOG.debug("Initializing ...");
-                                       internalInit(config, dao);
-                                       initialized = true;
-                    LOG.info("Successfully initialized");
-                               }
-                       }
-               }else{
-            LOG.info("Already initialized, skip");
-        }
-       }
-
-       // Only for unit test purpose
-       public void reset() {
-               synchronized (this) {
-                       initialized = false;
-                       map.clear();
-                       map2.clear();
-               }
-       }
-
-       private void ensureInitialized(){
-               if(!initialized)
-                       throw new IllegalStateException("StreamMetadataManager 
should be initialized before using it");
-       }
-       
-       public List<AlertStreamSchemaEntity> 
getMetadataEntitiesForStream(String streamName){
-               ensureInitialized();
-               return getMetadataEntitiesForAllStreams().get(streamName);
-       }
-       
-       public Map<String, List<AlertStreamSchemaEntity>> 
getMetadataEntitiesForAllStreams(){
-               ensureInitialized();
-               return UnmodifiableMap.decorate(map);
-       }
-       
-       public SortedMap<String, AlertStreamSchemaEntity> 
getMetadataEntityMapForStream(String streamName){
-               ensureInitialized();
-               return getMetadataEntityMapForAllStreams().get(streamName);
-       }
-       
-       public Map<String, SortedMap<String, AlertStreamSchemaEntity>> 
getMetadataEntityMapForAllStreams(){
-               ensureInitialized();
-               return UnmodifiableMap.decorate(map2);
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/ContainsIgnoreCaseExtension.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/ContainsIgnoreCaseExtension.java
 
b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/ContainsIgnoreCaseExtension.java
deleted file mode 100644
index cf76134..0000000
--- 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/ContainsIgnoreCaseExtension.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.alert.siddhi.extension;
-
-import org.wso2.siddhi.core.config.ExecutionPlanContext;
-import org.wso2.siddhi.core.exception.ExecutionPlanRuntimeException;
-import org.wso2.siddhi.core.executor.ExpressionExecutor;
-import org.wso2.siddhi.core.executor.function.FunctionExecutor;
-import org.wso2.siddhi.query.api.definition.Attribute;
-import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
-
-public class ContainsIgnoreCaseExtension extends FunctionExecutor {
-
-    Attribute.Type returnType = Attribute.Type.BOOL;
-
-    @Override
-    protected void init(ExpressionExecutor[] attributeExpressionExecutors, 
ExecutionPlanContext executionPlanContext) {
-        if (attributeExpressionExecutors.length != 2) {
-            throw new ExecutionPlanValidationException("Invalid no of 
arguments passed to str:containsIgnoreCase() function, required 2, " +
-                    "but found " + attributeExpressionExecutors.length);
-        }
-        if (attributeExpressionExecutors[0].getReturnType() != 
Attribute.Type.STRING) {
-            throw new ExecutionPlanValidationException("Invalid parameter type 
found for the first argument of str:containsIgnoreCase() function, " +
-                    "required "+ Attribute.Type.STRING+", but found 
"+attributeExpressionExecutors[0].getReturnType().toString());
-        }
-        if (attributeExpressionExecutors[1].getReturnType() != 
Attribute.Type.STRING) {
-            throw new ExecutionPlanValidationException("Invalid parameter type 
found for the second argument of str:containsIgnoreCase() function, " +
-                    "required "+ Attribute.Type.STRING+", but found 
"+attributeExpressionExecutors[1].getReturnType().toString());
-        }
-    }
-
-    @Override
-    protected Object execute(Object[] data) {
-        if (data[0] == null) {
-            throw new ExecutionPlanRuntimeException("Invalid input given to 
str:containsIgnoreCase() function. First argument cannot be null");
-        }
-        if (data[1] == null) {
-            throw new ExecutionPlanRuntimeException("Invalid input given to 
str:containsIgnoreCase() function. Second argument cannot be null");
-        }
-        String str1 = (String)data[0];
-        String str2 = (String)data[1];
-        return str1.toUpperCase().contains(str2.toUpperCase());
-    }
-
-    @Override
-    protected Object execute(Object data) {
-        return null; //Since the containsIgnoreCase function takes in 2 
parameters, this method does not get called. Hence, not implemented.
-    }
-
-    @Override
-    public void start() {
-        //Nothing to start
-    }
-
-    @Override
-    public void stop() {
-        //Nothing to stop
-    }
-
-    @Override
-    public Attribute.Type getReturnType() {
-        return returnType;
-    }
-
-    @Override
-    public Object[] currentState() {
-        return new Object[]{};
-    }
-
-    @Override
-    public void restoreState(Object[] state) {
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/EqualsIgnoreCaseExtension.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/EqualsIgnoreCaseExtension.java
 
b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/EqualsIgnoreCaseExtension.java
deleted file mode 100644
index 0b6e7ec..0000000
--- 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/EqualsIgnoreCaseExtension.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.alert.siddhi.extension;
-
-import org.wso2.siddhi.core.config.ExecutionPlanContext;
-import org.wso2.siddhi.core.exception.ExecutionPlanRuntimeException;
-import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
-import org.wso2.siddhi.core.executor.ExpressionExecutor;
-import org.wso2.siddhi.core.executor.function.FunctionExecutor;
-import org.wso2.siddhi.query.api.definition.Attribute;
-import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
-
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-public class EqualsIgnoreCaseExtension extends FunctionExecutor {
-
-    Attribute.Type returnType = Attribute.Type.BOOL;
-
-    @Override
-    protected void init(ExpressionExecutor[] attributeExpressionExecutors, 
ExecutionPlanContext executionPlanContext) {
-        if (attributeExpressionExecutors.length != 2) {
-            throw new ExecutionPlanValidationException("Invalid no of 
arguments passed to str:equalsIgnoreCase() function, required 2, " +
-                    "but found " + attributeExpressionExecutors.length);
-        }
-        if (attributeExpressionExecutors[0].getReturnType() != 
Attribute.Type.STRING) {
-            throw new ExecutionPlanValidationException("Invalid parameter type 
found for the first argument of str:equalsIgnoreCase() function, " +
-                    "required "+ Attribute.Type.STRING+", but found 
"+attributeExpressionExecutors[0].getReturnType().toString());
-        }
-        if (attributeExpressionExecutors[1].getReturnType() != 
Attribute.Type.STRING) {
-            throw new ExecutionPlanValidationException("Invalid parameter type 
found for the second argument of str:equalsIgnoreCase() function, " +
-                    "required "+ Attribute.Type.STRING+", but found 
"+attributeExpressionExecutors[1].getReturnType().toString());
-        }
-    }
-
-    @Override
-    protected Object execute(Object[] data) {
-        if (data[0] == null) {
-            throw new ExecutionPlanRuntimeException("Invalid input given to 
str:equalsIgnoreCase() function. First argument cannot be null");
-        }
-        if (data[1] == null) {
-            throw new ExecutionPlanRuntimeException("Invalid input given to 
str:equalsIgnoreCase() function. Second argument cannot be null");
-        }
-        String str1 = (String)data[0];
-        String str2 = (String)data[1];
-        return str1.equalsIgnoreCase(str2);
-    }
-
-    @Override
-    protected Object execute(Object data) {
-        return null; //Since the equalsIgnoreCase function takes in 2 
parameters, this method does not get called. Hence, not implemented.
-    }
-
-    @Override
-    public void start() {
-        //Nothing to start
-    }
-
-    @Override
-    public void stop() {
-        //Nothing to stop
-    }
-
-    @Override
-    public Attribute.Type getReturnType() {
-        return returnType;
-    }
-
-    @Override
-    public Object[] currentState() {
-        return new Object[]{};
-    }
-
-    @Override
-    public void restoreState(Object[] state) {
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java
 
b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java
deleted file mode 100644
index 0bf80de..0000000
--- 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.alert.siddhi.extension;
-
-import org.wso2.siddhi.core.config.ExecutionPlanContext;
-import org.wso2.siddhi.core.exception.ExecutionPlanRuntimeException;
-import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
-import org.wso2.siddhi.core.executor.ExpressionExecutor;
-import org.wso2.siddhi.extension.string.RegexpFunctionExtension;
-import org.wso2.siddhi.query.api.definition.Attribute;
-import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
-
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * regexpIgnoreCase(string, regex)
- * Tells whether or not this 'string' matches the given regular expression 
'regex'.
- * Accept Type(s): (STRING,STRING)
- * Return Type(s): BOOLEAN
- */
-public class RegexpIgnoreCaseFunctionExtension extends RegexpFunctionExtension 
{
-
-    //state-variables
-    boolean isRegexConstant = false;
-    String regexConstant;
-    Pattern patternConstant;
-
-    @Override
-    protected void init(ExpressionExecutor[] attributeExpressionExecutors, 
ExecutionPlanContext executionPlanContext) {
-        if (attributeExpressionExecutors.length != 2) {
-            throw new ExecutionPlanValidationException("Invalid no of 
arguments passed to str:regexpIgnoreCase() function, required 2, " +
-                    "but found " + attributeExpressionExecutors.length);
-        }
-        if (attributeExpressionExecutors[0].getReturnType() != 
Attribute.Type.STRING) {
-            throw new ExecutionPlanValidationException("Invalid parameter type 
found for the first argument of str:regexpIgnoreCase() function, " +
-                    "required "+ Attribute.Type.STRING+", but found 
"+attributeExpressionExecutors[0].getReturnType().toString());
-        }
-        if (attributeExpressionExecutors[1].getReturnType() != 
Attribute.Type.STRING) {
-            throw new ExecutionPlanValidationException("Invalid parameter type 
found for the second argument of str:regexpIgnoreCase() function, " +
-                    "required "+ Attribute.Type.STRING+", but found 
"+attributeExpressionExecutors[1].getReturnType().toString());
-        }
-        if(attributeExpressionExecutors[1] instanceof 
ConstantExpressionExecutor){
-            isRegexConstant = true;
-            regexConstant = (String) ((ConstantExpressionExecutor) 
attributeExpressionExecutors[1]).getValue();
-            patternConstant = Pattern.compile(regexConstant, 
Pattern.CASE_INSENSITIVE);
-        }
-    }
-
-    @Override
-    protected Object execute(Object[] data) {
-        String regex;
-        Pattern pattern;
-        Matcher matcher;
-
-        if (data[0] == null) {
-            throw new ExecutionPlanRuntimeException("Invalid input given to 
str:regexpIgnoreCase() function. First argument cannot be null");
-        }
-        if (data[1] == null) {
-            throw new ExecutionPlanRuntimeException("Invalid input given to 
str:regexpIgnoreCase() function. Second argument cannot be null");
-        }
-        String source = (String) data[0];
-
-        if(!isRegexConstant){
-            regex = (String) data[1];
-            pattern = Pattern.compile(regex, Pattern.CASE_INSENSITIVE);
-            matcher = pattern.matcher(source);
-            return matcher.matches();
-
-        } else {
-            matcher = patternConstant.matcher(source);
-            return matcher.matches();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
 
b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
deleted file mode 100644
index 44014da..0000000
--- 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
+++ /dev/null
@@ -1,395 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.executor;
-
-import com.codahale.metrics.MetricRegistry;
-import com.sun.jersey.client.impl.CopyOnWriteHashMap;
-import com.typesafe.config.Config;
-import org.apache.commons.lang3.time.DateUtils;
-import org.apache.eagle.alert.common.AlertConstants;
-import org.apache.eagle.alert.config.AbstractPolicyDefinition;
-import org.apache.eagle.alert.dao.AlertDefinitionDAO;
-import org.apache.eagle.alert.dao.AlertStreamSchemaDAO;
-import org.apache.eagle.alert.dao.AlertStreamSchemaDAOImpl;
-import org.apache.eagle.alert.entity.AlertAPIEntity;
-import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-import org.apache.eagle.alert.policy.*;
-import org.apache.eagle.alert.siddhi.EagleAlertContext;
-import org.apache.eagle.alert.siddhi.SiddhiAlertHandler;
-import org.apache.eagle.alert.siddhi.StreamMetadataManager;
-import org.apache.eagle.common.config.EagleConfigConstants;
-import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
-import org.apache.eagle.dataproc.core.ValuesArray;
-import org.apache.eagle.datastream.Collector;
-import org.apache.eagle.datastream.JavaStormStreamExecutor2;
-import org.apache.eagle.datastream.Tuple2;
-import org.apache.eagle.metric.reportor.EagleCounterMetric;
-import org.apache.eagle.metric.reportor.EagleMetricListener;
-import org.apache.eagle.metric.reportor.EagleServiceReporterMetricListener;
-import org.apache.eagle.metric.reportor.MetricKeyCodeDecoder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.management.ManagementFactory;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-public class AlertExecutor extends JavaStormStreamExecutor2<String, 
AlertAPIEntity> implements PolicyLifecycleMethods, SiddhiAlertHandler, 
PolicyDistributionReportMethods {
-       private static final long serialVersionUID = 1L;
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(AlertExecutor.class);
-
-       private String alertExecutorId;
-       private volatile CopyOnWriteHashMap<String, PolicyEvaluator> 
policyEvaluators;
-       private PolicyPartitioner partitioner;
-       private int numPartitions;
-       private int partitionSeq;
-       private Config config;
-       private Map<String, Map<String, AlertDefinitionAPIEntity>> 
initialAlertDefs;
-       private AlertDefinitionDAO alertDefinitionDao;
-       private String[] sourceStreams;
-       private static String EAGLE_EVENT_COUNT = "eagle.event.count";
-       private static String EAGLE_POLICY_EVAL_COUNT = 
"eagle.policy.eval.count";
-       private static String EAGLE_POLICY_EVAL_FAIL_COUNT = 
"eagle.policy.eval.fail.count";
-       private static String EAGLE_ALERT_COUNT = "eagle.alert.count";
-       private static String EAGLE_ALERT_FAIL_COUNT = "eagle.alert.fail.count";
-       private static long MERITE_GRANULARITY = DateUtils.MILLIS_PER_MINUTE;
-       private Map<String, Map<String, String>> dimensionsMap; // cache it for 
performance
-       private Map<String, String> baseDimensions;
-       private MetricRegistry registry;
-       private EagleMetricListener listener;
-
-       public AlertExecutor(String alertExecutorId, PolicyPartitioner 
partitioner, int numPartitions, int partitionSeq,
-                         AlertDefinitionDAO alertDefinitionDao, String[] 
sourceStreams){
-               this.alertExecutorId = alertExecutorId;
-               this.partitioner = partitioner;
-               this.numPartitions = numPartitions;
-               this.partitionSeq = partitionSeq;
-               this.alertDefinitionDao = alertDefinitionDao;
-               this.sourceStreams = sourceStreams;
-       }
-       
-       public String getAlertExecutorId(){
-               return this.alertExecutorId;
-       }
-
-       public int getNumPartitions() {
-               return this.numPartitions;
-       }
-       
-       public int getPartitionSeq(){
-               return this.partitionSeq;
-       }
-       
-       public PolicyPartitioner getPolicyPartitioner() {
-               return this.partitioner;
-       }
-       
-       public Map<String, Map<String, AlertDefinitionAPIEntity>> 
getInitialAlertDefs() {
-               return this.initialAlertDefs;
-       }
-               
-       public AlertDefinitionDAO getAlertDefinitionDao() {
-               return alertDefinitionDao;
-       }
-
-    public Map<String, PolicyEvaluator> getPolicyEvaluators(){
-        return policyEvaluators;
-    }
-       
-       @Override
-       public void prepareConfig(Config config) {
-               this.config = config;
-       }
-       
-       private void initMetricReportor() {
-               String host = config.getString(EagleConfigConstants.EAGLE_PROPS 
+ "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
-               int port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." 
+ EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
-
-               String username = 
config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + 
EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME) ?
-                                         
config.getString(EagleConfigConstants.EAGLE_PROPS + "." + 
EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME) : 
null;
-               String password = 
config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + 
EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD) ?
-                                         
config.getString(EagleConfigConstants.EAGLE_PROPS + "." + 
EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD) : 
null;
-
-               //TODO: need to it replace it with batch flush listener
-               registry = new MetricRegistry();
-               listener = new EagleServiceReporterMetricListener(host, port, 
username, password);
-
-               baseDimensions = new HashMap<>();
-               baseDimensions.put(AlertConstants.ALERT_EXECUTOR_ID, 
alertExecutorId);
-               baseDimensions.put(AlertConstants.PARTITIONSEQ, 
String.valueOf(partitionSeq));
-               baseDimensions.put(AlertConstants.SOURCE, 
ManagementFactory.getRuntimeMXBean().getName());
-               baseDimensions.put(EagleConfigConstants.DATA_SOURCE, 
config.getString(EagleConfigConstants.EAGLE_PROPS + "." + 
EagleConfigConstants.DATA_SOURCE));
-               baseDimensions.put(EagleConfigConstants.SITE, 
config.getString(EagleConfigConstants.EAGLE_PROPS + "." + 
EagleConfigConstants.SITE));
-               dimensionsMap = new HashMap<>();
-       }
-
-    /**
-     * for unit test purpose only
-     * @param config
-     * @return
-     */
-    public AlertStreamSchemaDAO getAlertStreamSchemaDAO(Config config){
-        return new AlertStreamSchemaDAOImpl(config);
-    }
-
-       @Override
-       public void init() {
-               // initialize StreamMetadataManager before it is used
-               StreamMetadataManager.getInstance().init(config, 
getAlertStreamSchemaDAO(config));
-               // for each AlertDefinition, to create a PolicyEvaluator
-               Map<String, PolicyEvaluator> tmpPolicyEvaluators = new 
HashMap<String, PolicyEvaluator>();
-               
-        String site = config.getString(EagleConfigConstants.EAGLE_PROPS + "." 
+ EagleConfigConstants.SITE);
-               String dataSource = 
config.getString(EagleConfigConstants.EAGLE_PROPS + "." + 
EagleConfigConstants.DATA_SOURCE);
-               try {
-                       initialAlertDefs = 
alertDefinitionDao.findActiveAlertDefsGroupbyAlertExecutorId(site, dataSource);
-               }
-               catch (Exception ex) {
-                       LOG.error("fail to initialize initialAlertDefs: ", ex);
-            throw new IllegalStateException("fail to initialize 
initialAlertDefs: ", ex);
-               }
-        if(initialAlertDefs == null || initialAlertDefs.isEmpty()){
-            LOG.warn("No alert definitions was found for site: " + site + ", 
dataSource: " + dataSource);
-        }
-        else if (initialAlertDefs.get(alertExecutorId) != null) { 
-                       for(AlertDefinitionAPIEntity alertDef : 
initialAlertDefs.get(alertExecutorId).values()){
-                               int part = partitioner.partition(numPartitions, 
alertDef.getTags().get(AlertConstants.POLICY_TYPE), 
alertDef.getTags().get(AlertConstants.POLICY_ID));
-                               if (part == partitionSeq) {
-                                       
tmpPolicyEvaluators.put(alertDef.getTags().get(AlertConstants.POLICY_ID), 
createPolicyEvaluator(alertDef));
-                               }
-                       }
-               }
-               
-               policyEvaluators = new CopyOnWriteHashMap<>();
-               // for efficiency, we don't put single policy evaluator
-               policyEvaluators.putAll(tmpPolicyEvaluators);
-               DynamicPolicyLoader policyLoader = 
DynamicPolicyLoader.getInstance();
-               
-               policyLoader.init(initialAlertDefs, alertDefinitionDao, config);
-        String fullQualifiedAlertExecutorId = alertExecutorId + "_" + 
partitionSeq;
-               
policyLoader.addPolicyChangeListener(fullQualifiedAlertExecutorId, this);
-        
policyLoader.addPolicyDistributionReporter(fullQualifiedAlertExecutorId, this);
-               LOG.info("Alert Executor created, partitionSeq: " + 
partitionSeq + " , numPartitions: " + numPartitions);
-        LOG.info("All policy evaluators: " + policyEvaluators);
-               
-               initMetricReportor();
-       }
-
-    /**
-     * Create PolicyEvaluator instance according to policyType-mapped policy 
evaluator class
-     *
-     * @param alertDef alert definition
-     * @return PolicyEvaluator instance
-     */
-       private PolicyEvaluator createPolicyEvaluator(AlertDefinitionAPIEntity 
alertDef){
-               String policyType = 
alertDef.getTags().get(AlertConstants.POLICY_TYPE);
-               Class<? extends PolicyEvaluator> evalCls = 
PolicyManager.getInstance().getPolicyEvaluator(policyType);
-               if(evalCls == null){
-                       String msg = "No policy evaluator defined for policy 
type : " + policyType;
-                       LOG.error(msg);
-                       throw new IllegalStateException(msg);
-               }
-               
-               // check out whether strong incoming data validation is 
necessary
-        String needValidationConfigKey= AlertConstants.ALERT_EXECUTOR_CONFIGS 
+ "." + alertExecutorId + ".needValidation";
-
-        // Default: true
-        boolean needValidation = !config.hasPath(needValidationConfigKey) || 
config.getBoolean(needValidationConfigKey);
-
-               AbstractPolicyDefinition policyDef = null;
-               try {
-                       policyDef = 
JsonSerDeserUtils.deserialize(alertDef.getPolicyDef(), 
AbstractPolicyDefinition.class, 
PolicyManager.getInstance().getPolicyModules(policyType));
-               } catch (Exception ex) {
-                       LOG.error("Fail initial alert policy def: 
"+alertDef.getPolicyDef(), ex);
-               }
-               PolicyEvaluator pe;
-               try{
-            // Create evaluator instances
-                       pe = evalCls.getConstructor(Config.class, String.class, 
AbstractPolicyDefinition.class, String[].class, 
boolean.class).newInstance(config, alertDef.getTags().get("policyId"), 
policyDef, sourceStreams, needValidation);
-               }catch(Exception ex){
-                       LOG.error("Fail creating new policyEvaluator", ex);
-                       LOG.warn("Broken policy definition and stop running : " 
+ alertDef.getPolicyDef());
-                       throw new IllegalStateException(ex);
-               }
-               return pe;
-       }
-
-    /**
-     * verify both alertExecutor logic name and partition id
-     * @param alertDef alert definition
-     *
-     * @return whether accept the alert definition
-     */
-       private boolean accept(AlertDefinitionAPIEntity alertDef){
-        if(!alertDef.getTags().get("alertExecutorId").equals(alertExecutorId)) 
{
-            if(LOG.isDebugEnabled()){
-                LOG.debug("alertDef does not belong to this alertExecutorId : 
" + alertExecutorId + ", alertDef : " + alertDef);
-            }
-            return false;
-        }
-               int targetPartitionSeq = partitioner.partition(numPartitions, 
alertDef.getTags().get(AlertConstants.POLICY_TYPE), 
alertDef.getTags().get(AlertConstants.POLICY_ID));
-               if(targetPartitionSeq == partitionSeq)
-                       return true;
-               return false;
-       }
-       
-       private long trim(long value, long granularity) {
-               return value / granularity * granularity;
-       }
-
-       private void updateCounter(String name, Map<String, String> dimensions, 
double value) {
-               long current = System.currentTimeMillis();
-               String metricName = MetricKeyCodeDecoder.codeMetricKey(name, 
dimensions);
-               if (registry.getMetrics().get(metricName) == null) {
-                       EagleCounterMetric metric = new 
EagleCounterMetric(current, metricName, value, MERITE_GRANULARITY);
-                       metric.registerListener(listener);
-                       registry.register(metricName, metric);
-               } else {
-                       EagleCounterMetric metric = (EagleCounterMetric) 
registry.getMetrics().get(metricName);
-                       metric.update(value, current);
-                       //TODO: need remove unused metric from registry
-               }
-       }
-       
-       private void updateCounter(String name, Map<String, String> dimensions) 
{
-               updateCounter(name, dimensions, 1.0);
-       }
-       
-       protected Map<String, String> getDimensions(String policyId) {
-               if (dimensionsMap.get(policyId) == null) {
-                       Map<String, String> newDimensions = new HashMap<String, 
String>(baseDimensions);
-                       newDimensions.put(AlertConstants.POLICY_ID, policyId);
-                       dimensionsMap.put(policyId, newDimensions);
-               }
-               return dimensionsMap.get(policyId);
-       }
-
-    /**
-     * within this single executor, execute all PolicyEvaluator sequentially
-     * the contract for input:
-     * 1. total # of fields for input is 3, which is fixed
-     * 2. the first field is key
-     * 3. the second field is stream name
-     * 4. the third field is value which is java SortedMap
-     */
-    @Override
-    public void flatMap(java.util.List<Object> input, Collector<Tuple2<String, 
AlertAPIEntity>> outputCollector){
-        if(input.size() != 3)
-            throw new IllegalStateException("AlertExecutor always consumes 
exactly 3 fields: key, stream name and value(SortedMap)");
-        if(LOG.isDebugEnabled()) LOG.debug("Msg is coming " + input.get(2));
-        if(LOG.isDebugEnabled()) LOG.debug("Current policyEvaluators: " + 
policyEvaluators.keySet().toString());
-
-        updateCounter(EAGLE_EVENT_COUNT, baseDimensions);
-        try{
-            synchronized(this.policyEvaluators) {
-                for(Entry<String, PolicyEvaluator> entry : 
policyEvaluators.entrySet()){
-                    String policyId = entry.getKey();
-                    PolicyEvaluator evaluator = entry.getValue();
-                    updateCounter(EAGLE_POLICY_EVAL_COUNT, 
getDimensions(policyId));
-                    try {
-                        EagleAlertContext siddhiAlertContext = new 
EagleAlertContext();
-                        siddhiAlertContext.alertExecutor = this;
-                        siddhiAlertContext.policyId = policyId;
-                        siddhiAlertContext.evaluator = evaluator;
-                        siddhiAlertContext.outputCollector = outputCollector;
-                        evaluator.evaluate(new ValuesArray(siddhiAlertContext, 
input.get(1), input.get(2)));
-                    }
-                    catch (Exception ex) {
-                        LOG.error("Got an exception, but continue to run " + 
input.get(2).toString(), ex);
-                        updateCounter(EAGLE_POLICY_EVAL_FAIL_COUNT, 
getDimensions(policyId));
-                    }
-                }
-            }
-        } catch(Exception ex){
-            LOG.error(alertExecutorId + ", partition " + partitionSeq + ", 
error fetching alerts, but continue to run", ex);
-            updateCounter(EAGLE_ALERT_FAIL_COUNT, baseDimensions);
-        }
-    }
-
-       @Override
-       public void onPolicyCreated(Map<String, AlertDefinitionAPIEntity> 
added) {
-               if(LOG.isDebugEnabled()) LOG.debug(alertExecutorId + ", 
partition " + partitionSeq + " policy added : " + added + " policyEvaluators " 
+ policyEvaluators);
-               for(AlertDefinitionAPIEntity alertDef : added.values()){
-                       if(!accept(alertDef))
-                               continue;
-                       LOG.info(alertExecutorId + ", partition " + 
partitionSeq + " policy really added " + alertDef);
-                       PolicyEvaluator newEvaluator = 
createPolicyEvaluator(alertDef);
-                       if(newEvaluator != null){
-                               synchronized(this.policyEvaluators) {
-                                       
policyEvaluators.put(alertDef.getTags().get(AlertConstants.POLICY_ID), 
newEvaluator);
-                               }
-                       }
-               }
-       }
-
-       @Override
-       public void onPolicyChanged(Map<String, AlertDefinitionAPIEntity> 
changed) {
-               if(LOG.isDebugEnabled()) LOG.debug(alertExecutorId + ", 
partition " + partitionSeq + " policy changed : " + changed);
-               for(AlertDefinitionAPIEntity alertDef : changed.values()){
-                       if(!accept(alertDef))
-                               continue;
-                       LOG.info(alertExecutorId + ", partition " + 
partitionSeq + " policy really changed " + alertDef);
-                       synchronized(this.policyEvaluators) {
-                               PolicyEvaluator pe = 
policyEvaluators.get(alertDef.getTags().get(AlertConstants.POLICY_ID));
-                               pe.onPolicyUpdate(alertDef);
-                       }
-               }
-       }
-
-       @Override
-       public void onPolicyDeleted(Map<String, AlertDefinitionAPIEntity> 
deleted) {
-               if(LOG.isDebugEnabled()) LOG.debug(alertExecutorId + ", 
partition " + partitionSeq + " policy deleted : " + deleted);
-               for(AlertDefinitionAPIEntity alertDef : deleted.values()){
-                       if(!accept(alertDef))
-                               continue;
-                       LOG.info(alertExecutorId + ", partition " + 
partitionSeq + " policy really deleted " + alertDef);
-                       String policyId = 
alertDef.getTags().get(AlertConstants.POLICY_ID);
-                       synchronized(this.policyEvaluators) {                   
-                               if (policyEvaluators.containsKey(policyId)) {
-                                       PolicyEvaluator pe = 
policyEvaluators.remove(alertDef.getTags().get(AlertConstants.POLICY_ID));
-                                       pe.onPolicyDelete();
-                               }
-                       }
-               }
-       }
-
-       @Override
-       public void onAlerts(EagleAlertContext context, List<AlertAPIEntity> 
alerts) {
-               if(alerts != null && !alerts.isEmpty()){
-                       String policyId = context.policyId;
-            LOG.info(String.format("Detected %s alerts for policy 
%s",alerts.size(),policyId));
-                       Collector outputCollector = context.outputCollector;
-                       PolicyEvaluator evaluator = context.evaluator;
-                       updateCounter(EAGLE_ALERT_COUNT, 
getDimensions(policyId), alerts.size());
-                       for (AlertAPIEntity entity : alerts) {
-                               synchronized(this) {
-                                       outputCollector.collect(new 
Tuple2(policyId, entity));
-                               }
-                               if(LOG.isDebugEnabled()) LOG.debug("A new alert 
is triggered: "+alertExecutorId + ", partition " + partitionSeq + ", Got an 
alert with output context: " + entity.getAlertContext() + ", for policy " + 
evaluator);
-                       }
-               }
-       }
-
-    @Override
-    public void report() {
-        PolicyDistroStatsLogReporter appender = new 
PolicyDistroStatsLogReporter();
-        appender.reportPolicyMembership(alertExecutorId + "_" + partitionSeq, 
policyEvaluators.keySet());
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutorCreationUtils.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutorCreationUtils.java
 
b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutorCreationUtils.java
deleted file mode 100644
index 547e39d..0000000
--- 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutorCreationUtils.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.executor;
-
-import org.apache.eagle.alert.common.AlertConstants;
-import org.apache.eagle.alert.dao.AlertDefinitionDAO;
-import org.apache.eagle.alert.dao.AlertDefinitionDAOImpl;
-import org.apache.eagle.alert.dao.AlertExecutorDAOImpl;
-import org.apache.eagle.alert.entity.AlertExecutorEntity;
-import org.apache.eagle.alert.policy.DefaultPolicyPartitioner;
-import org.apache.eagle.alert.policy.PolicyPartitioner;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigValue;
-import org.apache.eagle.common.config.EagleConfigConstants;
-import org.apache.eagle.service.client.EagleServiceConnector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Create alert executors and provide callback for programmer to link alert 
executor to immediate parent executors
- *
- * <br/><br/>
- * Explanations for programId, alertExecutorId and policy<br/><br/>
- * - programId - distributed or single-process program for example one storm 
topology<br/>
- * - alertExecutorId - one process/thread which executes multiple policies<br/>
- * - policy - some rules to be evaluated<br/>
- *
- * <br/>
- *
- * Normally the mapping is like following:
- * <pre>
- * programId (1:N) alertExecutorId
- * alertExecutorId (1:N) policy
- * </pre>
- */
-public class AlertExecutorCreationUtils {
-       private final static Logger LOG = 
LoggerFactory.getLogger(AlertExecutorCreationUtils.class);
-
-    public static AlertExecutor[] createAlertExecutors(Config config, String 
alertExecutorId) throws Exception{
-        // Read site and dataSource from configuration.
-        String dataSource = config.getString(EagleConfigConstants.EAGLE_PROPS 
+ "." + EagleConfigConstants.DATA_SOURCE);
-        LOG.info("Loading alerting definitions for dataSource: " + dataSource);
-
-        // Get map from alertExecutorId to alert stream
-        // (dataSource) => Map[alertExecutorId:String,streamName:List[String]]
-        List<String> streamNames = new ArrayList<String>();
-        AlertExecutorDAOImpl alertExecutorDAO = new AlertExecutorDAOImpl(new 
EagleServiceConnector(config));
-        List<AlertExecutorEntity> alertExecutorEntities = 
alertExecutorDAO.findAlertExecutor(dataSource, alertExecutorId);
-        for(AlertExecutorEntity entity : alertExecutorEntities){
-            streamNames.add(entity.getTags().get(AlertConstants.STREAM_NAME));
-        }
-
-        if(streamNames.isEmpty()){
-            throw new IllegalStateException("upstream names should not be 
empty for alert " + alertExecutorId);
-        }
-        return createAlertExecutors(config, new AlertDefinitionDAOImpl(new 
EagleServiceConnector(config)),
-                streamNames, alertExecutorId);
-    }
-
-    /**
-     * Build DAG Tasks based on persisted alert definition and schemas from 
eagle store.
-     *
-     * <h3>Require configuration:</h3>
-     *
-     * <ul>
-     * <li>eagleProps.site: program site id.</li>
-     * <li>eagleProps.dataSource: program data source.</li>
-     * <li>alertExecutorConfigs: only configured executor will be built into 
execution tasks.</li>
-     * </ul>
-     *
-     * <h3>Steps:</h3>
-     *
-     * <ol>
-     * <li>(upstreamTasks) => Map[streamName:String,upstreamTask:Task]</li>
-     * <li>(dataSource) => 
Map[alertExecutorId:String,streamName:List[String]]</li>
-     * <li>(site,dataSource) => 
Map[alertExecutorId,Map[policyId,alertDefinition]]</li>
-     * <li>(config["alertExecutorConfigs"]) => AlertExecutor(alertExecutorID, 
partitioner, numPartitions, partitionSeq, alertDefs, alertDefDAO, 
sourceStreams)[]</li>
-     * </ol>
-     */
-       public static AlertExecutor[] createAlertExecutors(Config config, 
AlertDefinitionDAO alertDefDAO,
-                       List<String> streamNames, String alertExecutorId) 
throws Exception{
-               // Read `alertExecutorConfigs` from configuration and get 
config for this alertExecutorId
-        int numPartitions =1;
-        String partitionerCls = 
DefaultPolicyPartitioner.class.getCanonicalName();
-        String alertExecutorConfigsKey = "alertExecutorConfigs";
-        if(config.hasPath(alertExecutorConfigsKey)) {
-            Map<String, ConfigValue> alertExecutorConfigs = 
config.getObject(alertExecutorConfigsKey);
-            if(alertExecutorConfigs !=null && 
alertExecutorConfigs.containsKey(alertExecutorId)) {
-                Map<String, Object> alertExecutorConfig = (Map<String, 
Object>) alertExecutorConfigs.get(alertExecutorId).unwrapped();
-                int parts = 0;
-                if(alertExecutorConfig.containsKey("parallelism")) parts = 
(int) (alertExecutorConfig.get("parallelism"));
-                numPartitions = parts == 0 ? 1 : parts;
-                if(alertExecutorConfig.containsKey("partitioner")) 
partitionerCls = (String) alertExecutorConfig.get("partitioner");
-            }
-        }
-
-        return createAlertExecutors(alertDefDAO, streamNames, alertExecutorId, 
numPartitions, partitionerCls);
-       }
-
-    /**
-     * Build alert executors and assign alert definitions between these 
executors by partitioner 
(alertExecutorConfigs["${alertExecutorId}"]["partitioner"])
-     */
-       public static AlertExecutor[] createAlertExecutors(AlertDefinitionDAO 
alertDefDAO, List<String> sourceStreams,
-                                                          String 
alertExecutorID, int numPartitions, String partitionerCls) throws Exception{
-               LOG.info("Creating alert executors with alertExecutorID: " + 
alertExecutorID + ", numPartitions: " + numPartitions + ", Partition class is: 
"+ partitionerCls);
-
-               PolicyPartitioner partitioner = 
(PolicyPartitioner)Class.forName(partitionerCls).newInstance();
-               AlertExecutor[] alertExecutors = new 
AlertExecutor[numPartitions];
-        String[] _sourceStreams = sourceStreams.toArray(new String[0]);
-
-               for(int i = 0; i < numPartitions; i++){
-                       alertExecutors[i] = new AlertExecutor(alertExecutorID, 
partitioner, numPartitions, i, alertDefDAO,_sourceStreams);
-               }       
-               return alertExecutors;
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/META-INF/services/org.apache.eagle.alert.policy.PolicyEvaluatorServiceProvider
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/META-INF/services/org.apache.eagle.alert.policy.PolicyEvaluatorServiceProvider
 
b/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/META-INF/services/org.apache.eagle.alert.policy.PolicyEvaluatorServiceProvider
deleted file mode 100644
index dc4e0a1..0000000
--- 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/META-INF/services/org.apache.eagle.alert.policy.PolicyEvaluatorServiceProvider
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.eagle.alert.siddhi.SiddhiPolicyEvaluatorServiceProviderImpl
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
 
b/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
new file mode 100644
index 0000000..eac2bfd
--- /dev/null
+++ 
b/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.eagle.policy.siddhi.SiddhiPolicyEvaluatorServiceProviderImpl
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/str.siddhiext
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/str.siddhiext 
b/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/str.siddhiext
deleted file mode 100644
index 4d5e237..0000000
--- 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/str.siddhiext
+++ /dev/null
@@ -1,35 +0,0 @@
-#
-# Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-charAt=org.wso2.siddhi.extension.string.CharAtFunctionExtension
-coalesce=org.wso2.siddhi.extension.string.CoalesceFunctionExtension
-concat=org.wso2.siddhi.extension.string.ConcatFunctionExtension
-length=org.wso2.siddhi.extension.string.LengthFunctionExtension
-lower=org.wso2.siddhi.extension.string.LowerFunctionExtension
-regexp=org.wso2.siddhi.extension.string.RegexpFunctionExtension
-repeat=org.wso2.siddhi.extension.string.RepeatFunctionExtension
-replaceAll=org.wso2.siddhi.extension.string.ReplaceAllFunctionExtension
-replaceFirst=org.wso2.siddhi.extension.string.ReplaceFirstFunctionExtension
-reverse=org.wso2.siddhi.extension.string.ReverseFunctionExtension
-strcmp=org.wso2.siddhi.extension.string.StrcmpFunctionExtension
-substr=org.wso2.siddhi.extension.string.SubstrFunctionExtension
-trim=org.wso2.siddhi.extension.string.TrimFunctionExtension
-upper=org.wso2.siddhi.extension.string.UpperFunctionExtension
-hex=org.wso2.siddhi.extension.string.HexFunctionExtension
-unhex=org.wso2.siddhi.extension.string.UnhexFunctionExtension
-equalsIgnoreCase=org.apache.eagle.alert.siddhi.extension.EqualsIgnoreCaseExtension
-containsIgnoreCase=org.apache.eagle.alert.siddhi.extension.ContainsIgnoreCaseExtension
-regexpIgnoreCase=org.apache.eagle.alert.siddhi.extension.RegexpIgnoreCaseFunctionExtension

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java
 
b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java
index 521790a..3d69887 100644
--- 
a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java
+++ 
b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java
@@ -18,22 +18,23 @@ package org.apache.eagle.alert.cep;
 
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.alert.dao.AlertDefinitionDAO;
-import org.apache.eagle.alert.dao.AlertDefinitionDAOImpl;
-import org.apache.eagle.alert.dao.AlertStreamSchemaDAO;
-import org.apache.eagle.alert.dao.AlertStreamSchemaDAOImpl;
+import junit.framework.Assert;
 import org.apache.eagle.alert.entity.AlertAPIEntity;
 import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
 import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
-import org.apache.eagle.alert.siddhi.EagleAlertContext;
-import org.apache.eagle.alert.siddhi.SiddhiPolicyDefinition;
-import org.apache.eagle.alert.siddhi.SiddhiPolicyEvaluator;
-import org.apache.eagle.alert.siddhi.StreamMetadataManager;
+import org.apache.eagle.alert.executor.AlertExecutor;
+import org.apache.eagle.alert.siddhi.SiddhiAlertAPIEntityRender;
 import org.apache.eagle.dataproc.core.ValuesArray;
 import org.apache.eagle.datastream.Collector;
 import org.apache.eagle.datastream.Tuple2;
-import org.apache.eagle.executor.AlertExecutor;
-import junit.framework.Assert;
+import org.apache.eagle.policy.PolicyEvaluationContext;
+import org.apache.eagle.policy.dao.AlertDefinitionDAOImpl;
+import org.apache.eagle.policy.dao.AlertStreamSchemaDAO;
+import org.apache.eagle.policy.dao.AlertStreamSchemaDAOImpl;
+import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
+import org.apache.eagle.policy.siddhi.SiddhiPolicyDefinition;
+import org.apache.eagle.policy.siddhi.SiddhiPolicyEvaluator;
+import org.apache.eagle.policy.siddhi.StreamMetadataManager;
 import org.apache.eagle.service.client.EagleServiceConnector;
 import org.junit.Test;
 
@@ -92,26 +93,28 @@ public class TestSiddhiEvaluator {
                                                        "select * " +
                                                        "insert into 
outputStream ;";
         policyDef.setExpression(expression);
-        SiddhiPolicyEvaluator evaluator = new SiddhiPolicyEvaluator(config, 
"testPolicy", policyDef, new String[]{"hdfsAuditLogEventStream"});
-               EagleAlertContext context = new EagleAlertContext();
+        SiddhiPolicyEvaluator<AlertDefinitionAPIEntity, AlertAPIEntity> 
evaluator = new SiddhiPolicyEvaluator<AlertDefinitionAPIEntity, 
AlertAPIEntity>(config, "testPolicy", policyDef, new 
String[]{"hdfsAuditLogEventStream"});
+               PolicyEvaluationContext context = new PolicyEvaluationContext();
 
-               AlertDefinitionDAO alertDao = new AlertDefinitionDAOImpl(new 
EagleServiceConnector(null, null)) {
+               PolicyDefinitionDAO alertDao = new AlertDefinitionDAOImpl(new 
EagleServiceConnector(null, null)) {
                        @Override
-                       public Map<String, Map<String, 
AlertDefinitionAPIEntity>> findActiveAlertDefsGroupbyAlertExecutorId(String 
site, String dataSource) throws Exception {
+                       public Map<String, Map<String, 
AlertDefinitionAPIEntity>> findActivePoliciesGroupbyExecutorId(String site, 
String dataSource) throws Exception {
                                return null;
                        }
                };
 
-               context.alertExecutor = new AlertExecutor("alertExecutorId", 
null, 3, 1, alertDao, new String[]{"hdfsAuditLogEventStream"}) {
+               AlertExecutor alertExecutor = new 
AlertExecutor("alertExecutorId", null, 3, 1, alertDao, new 
String[]{"hdfsAuditLogEventStream"}) {
                        @Override
                        protected Map<String, String> getDimensions(String 
policyId) {
                                return new HashMap<String, String>();
                        }
                };
-               context.alertExecutor.prepareConfig(config);
-               context.alertExecutor.init();
+               alertExecutor.prepareConfig(config);
+               alertExecutor.init();
+               context.alertExecutor = alertExecutor;
                context.evaluator = evaluator;
                context.policyId = "testPolicy";
+               context.resultRender = new SiddhiAlertAPIEntityRender();
                context.outputCollector = new Collector<Tuple2<String, 
AlertAPIEntity>> () {
                        @Override
                        public void collect(Tuple2<String, AlertAPIEntity> 
stringAlertAPIEntityTuple2) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestAlertDefinitionDAOImpl.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestAlertDefinitionDAOImpl.java
 
b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestAlertDefinitionDAOImpl.java
index 70853b4..783abea 100644
--- 
a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestAlertDefinitionDAOImpl.java
+++ 
b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestAlertDefinitionDAOImpl.java
@@ -18,8 +18,10 @@ package org.apache.eagle.alert.dao;
 
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.policy.dao.AlertDefinitionDAOImpl;
 import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
 import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
 import org.apache.eagle.service.client.EagleServiceConnector;
 import org.junit.Assert;
 import org.junit.Test;
@@ -52,9 +54,9 @@ public class TestAlertDefinitionDAOImpl {
 
                String site = "sandbox";
                String dataSource = "UnitTest";
-               AlertDefinitionDAO dao = new AlertDefinitionDAOImpl(new 
EagleServiceConnector(eagleServiceHost, eagleServicePort)) {
+               PolicyDefinitionDAO dao = new AlertDefinitionDAOImpl(new 
EagleServiceConnector(eagleServiceHost, eagleServicePort)) {
                        @Override
-                       public List<AlertDefinitionAPIEntity> 
findActiveAlertDefs(String site, String dataSource) throws Exception {
+                       public List<AlertDefinitionAPIEntity> 
findActivePolicies(String site, String dataSource) throws Exception {
                                List<AlertDefinitionAPIEntity> list = new 
ArrayList<AlertDefinitionAPIEntity>();
                                list.add(buildTestAlertDefEntity(site, 
dataSource, "TestExecutor1", "TestPolicyIDA", "TestPolicyTypeA"));
                                list.add(buildTestAlertDefEntity(site, 
dataSource, "TestExecutor1", "TestPolicyIDB", "TestPolicyTypeB"));
@@ -64,7 +66,7 @@ public class TestAlertDefinitionDAOImpl {
                        }
                };
 
-               Map<String, Map<String, AlertDefinitionAPIEntity>> retMap = 
dao.findActiveAlertDefsGroupbyAlertExecutorId(site, dataSource);
+               Map<String, Map<String, AlertDefinitionAPIEntity>> retMap = 
dao.findActivePoliciesGroupbyExecutorId(site, dataSource);
                
                Assert.assertEquals(2, retMap.size());
                Assert.assertEquals(2, retMap.get("TestExecutor1").size());

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java
 
b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java
index b7f40fa..82aa89f 100644
--- 
a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java
+++ 
b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java
@@ -24,9 +24,10 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import junit.framework.Assert;
 
+import org.apache.eagle.policy.dao.AlertStreamSchemaDAO;
 import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
-import org.apache.eagle.alert.siddhi.SiddhiStreamMetadataUtils;
-import org.apache.eagle.alert.siddhi.StreamMetadataManager;
+import org.apache.eagle.policy.siddhi.SiddhiStreamMetadataUtils;
+import org.apache.eagle.policy.siddhi.StreamMetadataManager;
 import org.junit.Test;
 
 public class TestSiddhiStreamMetadataUtils {
@@ -43,7 +44,7 @@ public class TestSiddhiStreamMetadataUtils {
                        }
                });
                String siddhiStreamDef = 
SiddhiStreamMetadataUtils.convertToStreamDef("testStreamName");
-               Assert.assertEquals("define stream " + "testStreamName" + 
"(eagleAlertContext object,attrName1 string,attrName2 long);", siddhiStreamDef);
+               Assert.assertEquals("define stream " + "testStreamName" + 
"(eagleAlertContext object, attrName1 string,attrName2 long);", 
siddhiStreamDef);
                StreamMetadataManager.getInstance().reset();
        }
        

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestStreamDefinitionDAOImpl.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestStreamDefinitionDAOImpl.java
 
b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestStreamDefinitionDAOImpl.java
index 80aacd6..91c5332 100644
--- 
a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestStreamDefinitionDAOImpl.java
+++ 
b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestStreamDefinitionDAOImpl.java
@@ -18,8 +18,10 @@ package org.apache.eagle.alert.dao;
 
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.policy.dao.AlertStreamSchemaDAO;
+import org.apache.eagle.policy.dao.AlertStreamSchemaDAOImpl;
 import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
-import org.apache.eagle.alert.siddhi.StreamMetadataManager;
+import org.apache.eagle.policy.siddhi.StreamMetadataManager;
 import org.junit.Assert;
 import org.junit.Test;
 

Reply via email to