http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamEntity.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamEntity.java
 
b/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamEntity.java
new file mode 100644
index 0000000..f0f81c5
--- /dev/null
+++ 
b/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamEntity.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.entity;
+
+import org.apache.eagle.alert.common.AlertConstants;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.Column;
+import org.apache.eagle.log.entity.meta.ColumnFamily;
+import org.apache.eagle.log.entity.meta.Prefix;
+import org.apache.eagle.log.entity.meta.Service;
+import org.apache.eagle.log.entity.meta.Table;
+import org.apache.eagle.log.entity.meta.Tags;
+import org.apache.eagle.log.entity.meta.TimeSeries;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@Table("alertStream")
+@ColumnFamily("f")
+@Prefix("alertStream")
+@Service(AlertConstants.ALERT_STREAM_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(false)
+@Tags({"dataSource", "streamName"})
+public class AlertStreamEntity extends TaggedLogAPIEntity{
+       @Column("a")
+       private String desc;    
+
+       public String getDesc() {
+               return desc;
+       }
+       public void setDesc(String desc) {
+               this.desc = desc;
+               valueChanged("desc");
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamSchemaEntity.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamSchemaEntity.java
 
b/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamSchemaEntity.java
new file mode 100644
index 0000000..76b6097
--- /dev/null
+++ 
b/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamSchemaEntity.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.entity;
+
+import org.apache.eagle.alert.common.AlertConstants;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.Column;
+import org.apache.eagle.log.entity.meta.ColumnFamily;
+import org.apache.eagle.log.entity.meta.Prefix;
+import org.apache.eagle.log.entity.meta.Service;
+import org.apache.eagle.log.entity.meta.Table;
+import org.apache.eagle.log.entity.meta.Tags;
+import org.apache.eagle.log.entity.meta.TimeSeries;
+
+/**
+ * ddl to create streammetadata table
+ * 
+ * create 'alertStreamSchema', {NAME => 'f', BLOOMFILTER => 'ROW', VERSIONS => 
'1', COMPRESSION => 'SNAPPY'}
+ */
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@Table("alertStreamSchema")
+@ColumnFamily("f")
+@Prefix("alertStreamSchema")
+@Service(AlertConstants.ALERT_STREAM_SCHEMA_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(false)
+@Tags({"dataSource", "streamName", "attrName"})
+public class AlertStreamSchemaEntity extends TaggedLogAPIEntity{
+       @Column("a")
+       private String attrType;
+       @Column("b")
+       private String category;
+       @Column("c")
+       private String attrValueResolver;
+       /* all tags form the key for alert de-duplication */
+       @Column("d")
+       private Boolean usedAsTag;
+       @Column("e")
+       private String attrDescription;
+       @Column("f")
+       private String attrDisplayName; 
+       @Column("g")
+       private String defaultValue;
+
+       public String getAttrType() {
+               return attrType;
+       }
+       public void setAttrType(String attrType) {
+               this.attrType = attrType;
+               valueChanged("attrType");
+       }
+       public String getCategory() {
+               return category;
+       }
+       public void setCategory(String category) {
+               this.category = category;
+               valueChanged("category");
+       }
+       public String getAttrValueResolver() {
+               return attrValueResolver;
+       }
+       public void setAttrValueResolver(String attrValueResolver) {
+               this.attrValueResolver = attrValueResolver;
+               valueChanged("attrValueResolver");
+       }
+       public Boolean getUsedAsTag() {
+               return usedAsTag;
+       }
+       public void setUsedAsTag(Boolean usedAsTag) {
+               this.usedAsTag = usedAsTag;
+               valueChanged("usedAsTag");
+       }
+       public String getAttrDescription() {
+               return attrDescription;
+       }
+       public void setAttrDescription(String attrDescription) {
+               this.attrDescription = attrDescription;
+               valueChanged("attrDescription");
+       }
+       public String getAttrDisplayName() {
+               return attrDisplayName;
+       }
+       public void setAttrDisplayName(String attrDisplayName) {
+               this.attrDisplayName = attrDisplayName;
+               valueChanged("attrDisplayName");
+       }
+       public String getDefaultValue() {
+               return defaultValue;
+       }
+       public void setDefaultValue(String defaultValue) {
+               this.defaultValue = defaultValue;
+               valueChanged("defaultValue");
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/AbstractPolicyDefinition.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/AbstractPolicyDefinition.java
 
b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/AbstractPolicyDefinition.java
deleted file mode 100644
index c235225..0000000
--- 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/AbstractPolicyDefinition.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.config;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-
-/**
- * base fields for all policy definition
- */
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", visible=true)
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class AbstractPolicyDefinition {
-       private String type;
-    /**
-     * @return type in string
-     */
-       public String getType() {
-               return type;
-       }
-
-    /**
-     * @param type set type value
-     */
-       public void setType(String type) {
-               this.type = type;
-       }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/DeduplicatorConfig.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/DeduplicatorConfig.java
 
b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/DeduplicatorConfig.java
deleted file mode 100644
index 04f20ff..0000000
--- 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/DeduplicatorConfig.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.config;
-
-import java.io.Serializable;
-
-public class DeduplicatorConfig implements Serializable{
-       private static final long serialVersionUID = 1L;
-       public int getAlertDedupIntervalMin() {
-               return alertDedupIntervalMin;
-       }
-       public void setAlertDedupIntervalMin(int alertDedupIntervalMin) {
-               this.alertDedupIntervalMin = alertDedupIntervalMin;
-       }
-       public int getEmailDedupIntervalMin() {
-               return emailDedupIntervalMin;
-       }
-       public void setEmailDedupIntervalMin(int emailDedupIntervalMin) {
-               this.emailDedupIntervalMin = emailDedupIntervalMin;
-       }
-       private int alertDedupIntervalMin;
-       private int emailDedupIntervalMin;
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/EmailNotificationConfig.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/EmailNotificationConfig.java
 
b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/EmailNotificationConfig.java
deleted file mode 100644
index fae24d5..0000000
--- 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/EmailNotificationConfig.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.config;
-
-public class EmailNotificationConfig extends NotificationConfig{
-       private static final long serialVersionUID = 1L;
-       private String sender;
-       private String recipients;
-       private String tplFileName;
-       private String subject;
-       public String getSubject() {
-               return subject;
-       }
-       public void setSubject(String subject) {
-               this.subject = subject;
-       }
-       public String getRecipients() {
-               return recipients;
-       }
-       public void setRecipients(String recipients) {
-               this.recipients = recipients;
-       }
-       public String getSender() {
-               return sender;
-       }
-       public void setSender(String sender) {
-               this.sender = sender;
-       }
-       public String getTplFileName() {
-               return tplFileName;
-       }
-       public void setTplFileName(String tplFileName) {
-               this.tplFileName = tplFileName;
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/NotificationConfig.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/NotificationConfig.java
 
b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/NotificationConfig.java
deleted file mode 100644
index 6903a57..0000000
--- 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/NotificationConfig.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.config;
-
-import java.io.Serializable;
-
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "flavor", visible=true)
-public class NotificationConfig implements Serializable{
-       private static final long serialVersionUID = 1L;
-       private String id;
-       private String flavor;
-
-       public String getId() {
-               return id;
-       }
-
-       public void setId(String id) {
-               this.id = id;
-       }
-
-       public String getFlavor() {
-               return flavor;
-       }
-
-       public void setFlavor(String flavor) {
-               this.flavor = flavor;
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/Remediation.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/Remediation.java
 
b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/Remediation.java
deleted file mode 100644
index c644a31..0000000
--- 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/Remediation.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.config;
-
-import java.io.Serializable;
-
-public class Remediation implements Serializable{
-       private static final long serialVersionUID = 1L;
-       private String id;
-
-       public String getId() {
-               return id;
-       }
-
-       public void setId(String id) {
-               this.id = id;
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertDeduplicationExecutorBase.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertDeduplicationExecutorBase.java
 
b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertDeduplicationExecutorBase.java
deleted file mode 100644
index b459130..0000000
--- 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertDeduplicationExecutorBase.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.dedup;
-
-import eagle.alert.common.AlertConstants;
-import eagle.alert.config.DeduplicatorConfig;
-import eagle.alert.dao.AlertDefinitionDAO;
-import eagle.alert.entity.AlertAPIEntity;
-import eagle.alert.entity.AlertDefinitionAPIEntity;
-import eagle.alert.policy.DynamicPolicyLoader;
-import eagle.alert.policy.PolicyLifecycleMethods;
-import eagle.common.config.EagleConfigConstants;
-import eagle.datastream.Collector;
-import eagle.datastream.JavaStormStreamExecutor2;
-import eagle.datastream.Tuple2;
-import com.sun.jersey.client.impl.CopyOnWriteHashMap;
-import com.typesafe.config.Config;
-import eagle.dataproc.core.JsonSerDeserUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public abstract class AlertDeduplicationExecutorBase extends 
JavaStormStreamExecutor2<String, AlertAPIEntity> implements 
PolicyLifecycleMethods {
-       private static final long serialVersionUID = 1L;
-       private static final Logger LOG = 
LoggerFactory.getLogger(AlertDeduplicationExecutorBase.class);
-       protected Config config;
-       protected DEDUP_TYPE dedupType;
-
-       private List<String> alertExecutorIdList;
-       private volatile CopyOnWriteHashMap<String, 
DefaultDeduplicator<AlertAPIEntity>> alertDedups;
-       private AlertDefinitionDAO dao;
-
-       public enum DEDUP_TYPE {
-               ENTITY,
-               EMAIL
-       }
-
-       public AlertDeduplicationExecutorBase(List<String> alertExecutorIdList, 
DEDUP_TYPE dedupType, AlertDefinitionDAO dao){
-               this.alertExecutorIdList = alertExecutorIdList;
-               this.dedupType = dedupType;
-               this.dao = dao;
-       }
-       
-       @Override
-       public void prepareConfig(Config config) {
-               this.config = config;
-       }
-       
-       public DefaultDeduplicator<AlertAPIEntity> 
createAlertDedup(AlertDefinitionAPIEntity alertDef) {
-               DeduplicatorConfig dedupConfig = null;
-               try {
-                       dedupConfig = 
JsonSerDeserUtils.deserialize(alertDef.getDedupeDef(), 
DeduplicatorConfig.class);
-               }
-               catch (Exception ex) {
-                       LOG.warn("Initial dedupConfig error, " + 
ex.getMessage());
-               }
-
-        if (dedupConfig != null) {
-                       if (dedupType.equals(DEDUP_TYPE.ENTITY)) {
-                               return new 
DefaultDeduplicator<>(dedupConfig.getAlertDedupIntervalMin());
-                       } else if (dedupType.equals(DEDUP_TYPE.EMAIL)) {
-                               return new 
DefaultDeduplicator<>(dedupConfig.getEmailDedupIntervalMin());
-                       }
-               }
-
-               return null;
-       }
-       
-       @Override
-       public void init() {            
-        String site = config.getString(EagleConfigConstants.EAGLE_PROPS + "." 
+ EagleConfigConstants.SITE);
-        String dataSource = config.getString(EagleConfigConstants.EAGLE_PROPS 
+ "." + EagleConfigConstants.DATA_SOURCE);
-           Map<String, Map<String, AlertDefinitionAPIEntity>> 
initialAlertDefs;                    
-           try {
-                       initialAlertDefs = 
dao.findActiveAlertDefsGroupbyAlertExecutorId(site, dataSource);
-           }
-           catch (Exception ex) {
-                       LOG.error("fail to initialize initialAlertDefs: ", ex);
-               throw new IllegalStateException("fail to initialize 
initialAlertDefs: ", ex);
-        }
-           Map<String, DefaultDeduplicator<AlertAPIEntity>> tmpDeduplicators = 
new HashMap<String, DefaultDeduplicator<AlertAPIEntity>>();
-        if(initialAlertDefs == null || initialAlertDefs.isEmpty()){
-            LOG.warn("No alert definitions was found for site: "+site+", 
dataSource: "+dataSource);
-        } else {
-                   for (String alertExecutorId: alertExecutorIdList) {
-                           if(initialAlertDefs.containsKey(alertExecutorId)){
-                    for(AlertDefinitionAPIEntity alertDef : 
initialAlertDefs.get(alertExecutorId).values()){
-                       try {
-                          DefaultDeduplicator<AlertAPIEntity> deduplicator = 
createAlertDedup(alertDef);
-                          if (deduplicator != null)
-                              
tmpDeduplicators.put(alertDef.getTags().get(AlertConstants.POLICY_ID), 
deduplicator);
-                          else LOG.warn("The dedup interval is not set, 
alertDef: " + alertDef);
-                        }
-                        catch (Throwable t) {
-                            LOG.error("Got an exception when initial dedup 
config, probably dedup config is not set: " + t.getMessage() + "," + alertDef);
-                        }
-                    }
-                } else {
-                    LOG.info(String.format("No alert definitions found for 
site: %s, dataSource: %s, alertExecutorId: 
%s",site,dataSource,alertExecutorId));
-                }
-                   }
-        }
-
-               alertDedups = new CopyOnWriteHashMap<>();
-               alertDedups.putAll(tmpDeduplicators);
-               DynamicPolicyLoader policyLoader = 
DynamicPolicyLoader.getInstance();
-               policyLoader.init(initialAlertDefs, dao, config);
-               for (String alertExecutorId : alertExecutorIdList) {
-                       policyLoader.addPolicyChangeListener(alertExecutorId, 
this);
-               }
-       }
-
-    @Override
-    public void flatMap(java.util.List<Object> input, Collector<Tuple2<String, 
AlertAPIEntity>> outputCollector){
-        String policyId = (String) input.get(0);
-        AlertAPIEntity alertEntity = (AlertAPIEntity) input.get(1);
-        DefaultDeduplicator<AlertAPIEntity> dedup;
-        synchronized(alertDedups) {
-            dedup = alertDedups.get(policyId);
-        }
-
-        List<AlertAPIEntity> ret = Arrays.asList(alertEntity);
-        if (dedup == null) {
-            LOG.warn("Dedup config for policyId " + policyId + " is not set or 
is not a valid config");
-        } else {
-            if (dedup.getDedupIntervalMin() == -1) {
-                LOG.warn("the dedup interval is set as -1, which mean all 
alerts should be deduped(skipped)");
-                return;
-            }
-            ret = dedup.dedup(ret);
-        }
-        for (AlertAPIEntity entity : ret) {
-            outputCollector.collect(new Tuple2(policyId, entity));
-        }
-    }
-
-       public void onPolicyCreated(Map<String, AlertDefinitionAPIEntity> 
added) {
-               if(LOG.isDebugEnabled()) LOG.debug("Alert dedup config to be 
added : " + added);
-               for(AlertDefinitionAPIEntity alertDef : added.values()){
-                       LOG.info("Alert dedup config really added " + alertDef);
-                       DefaultDeduplicator<AlertAPIEntity> dedup = 
createAlertDedup(alertDef);
-                       if (dedup != null) {
-                               synchronized(alertDedups) {             
-                                       
alertDedups.put(alertDef.getTags().get(AlertConstants.POLICY_ID), dedup);
-                               }
-                       }
-               }
-       }
-       
-       public void onPolicyChanged(Map<String, AlertDefinitionAPIEntity> 
changed) {
-               LOG.info("Alert dedup config changed : " + changed);
-               for(AlertDefinitionAPIEntity alertDef : changed.values()){
-                       LOG.info("Alert dedup config really changed " + 
alertDef);
-                       DefaultDeduplicator<AlertAPIEntity> dedup = 
createAlertDedup(alertDef);
-                       if (dedup != null) {
-                               synchronized(alertDedups) {
-                                       
alertDedups.put(alertDef.getTags().get(AlertConstants.POLICY_ID), dedup);
-                               }
-                       }
-               }
-       }
-       
-       public void onPolicyDeleted(Map<String, AlertDefinitionAPIEntity> 
deleted) {
-               LOG.info("alert dedup config deleted : " + deleted);
-               for(AlertDefinitionAPIEntity alertDef : deleted.values()){
-                       LOG.info("alert dedup config deleted " + alertDef);
-                       // no cleanup to do, just remove it
-                       synchronized(alertDedups) {             
-                               
alertDedups.remove(alertDef.getTags().get(AlertConstants.POLICY_ID));
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertEmailDeduplicationExecutor.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertEmailDeduplicationExecutor.java
 
b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertEmailDeduplicationExecutor.java
deleted file mode 100644
index 7dd0ddc..0000000
--- 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertEmailDeduplicationExecutor.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.dedup;
-
-import eagle.alert.dao.AlertDefinitionDAO;
-
-import java.util.List;
-
-public class AlertEmailDeduplicationExecutor extends 
AlertDeduplicationExecutorBase {
-
-       private static final long serialVersionUID = 1L;
-
-       public AlertEmailDeduplicationExecutor(List<String> 
alertExecutorIdList, AlertDefinitionDAO dao){
-               super(alertExecutorIdList, DEDUP_TYPE.EMAIL, dao);
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertEntityDeduplicationExecutor.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertEntityDeduplicationExecutor.java
 
b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertEntityDeduplicationExecutor.java
deleted file mode 100644
index b6050a0..0000000
--- 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertEntityDeduplicationExecutor.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.dedup;
-
-import eagle.alert.dao.AlertDefinitionDAO;
-
-import java.util.List;
-
-public class AlertEntityDeduplicationExecutor extends 
AlertDeduplicationExecutorBase {
-
-       private static final long serialVersionUID = 1L;
-
-       public AlertEntityDeduplicationExecutor(List<String> 
alertExecutorIdList, AlertDefinitionDAO dao){
-               super(alertExecutorIdList, DEDUP_TYPE.ENTITY, dao);
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/DefaultDeduplicator.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/DefaultDeduplicator.java
 
b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/DefaultDeduplicator.java
deleted file mode 100644
index 97e58ac..0000000
--- 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/DefaultDeduplicator.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.dedup;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.commons.lang.time.DateUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import eagle.log.base.taggedlog.TaggedLogAPIEntity;
-
-public class DefaultDeduplicator<T extends TaggedLogAPIEntity> implements 
EntityDeduplicator<T>{
-       protected long dedupIntervalMin;
-       protected Map<EntityTagsUniq, Long> entites = new 
HashMap<EntityTagsUniq, Long>();
-       public static Logger LOG = 
LoggerFactory.getLogger(DefaultDeduplicator.class);
-       
-       public static enum AlertDeduplicationStatus{
-               NEW,
-               DUPLICATED,
-               IGNORED
-       }
-       
-       public DefaultDeduplicator() {
-               this.dedupIntervalMin = 0;
-       }
-       
-       public DefaultDeduplicator(long intervalMin) {
-               this.dedupIntervalMin = intervalMin;
-       }
-       
-       public void clearOldCache() {
-               List<EntityTagsUniq> removedkeys = new 
ArrayList<EntityTagsUniq>();
-               for (Entry<EntityTagsUniq, Long> entry : entites.entrySet()) {
-                       EntityTagsUniq entity = entry.getKey();
-                       if (System.currentTimeMillis() - 7 * 
DateUtils.MILLIS_PER_DAY > entity.createdTime) {
-                               removedkeys.add(entry.getKey());
-                       }
-               }
-               for (EntityTagsUniq alertKey : removedkeys) {
-                       entites.remove(alertKey);
-               }
-       }
-       
-       public AlertDeduplicationStatus checkDedup(EntityTagsUniq key){
-               long current = key.timestamp;
-               if(!entites.containsKey(key)){
-                       entites.put(key, current);
-                       return AlertDeduplicationStatus.NEW;
-               }
-               
-               long last = entites.get(key);
-               if(current - last >= dedupIntervalMin * 
DateUtils.MILLIS_PER_MINUTE){
-                       entites.put(key, current);
-                       return AlertDeduplicationStatus.DUPLICATED;
-               }
-               
-               return AlertDeduplicationStatus.IGNORED;
-       }
-       
-       public List<T> dedup(List<T> list) {
-               clearOldCache();
-               List<T> dedupList = new ArrayList<T>();
-        int totalCount = list.size();
-        int dedupedCount = 0;
-               for(T entity: list) {
-                       if (entity.getTags() == null) {
-                               if(LOG.isDebugEnabled()) LOG.debug("Tags is 
null, don't know how to deduplicate, do nothing");
-                       } else {
-                AlertDeduplicationStatus status = checkDedup(new 
EntityTagsUniq(entity.getTags(), entity.getTimestamp()));
-                if (!status.equals(AlertDeduplicationStatus.IGNORED)) {
-                    dedupList.add(entity);
-                } else {
-                    dedupedCount++;
-                    if (LOG.isDebugEnabled())
-                        LOG.debug(String.format("Entity is skipped because 
it's duplicated: " + entity.toString()));
-                }
-            }
-               }
-
-        if(dedupedCount>0){
-            LOG.info(String.format("Skipped %s of %s alerts because they are 
duplicated",dedupedCount,totalCount));
-        }else if(LOG.isDebugEnabled()){
-            LOG.debug(String.format("Skipped %s of %s duplicated 
alerts",dedupedCount,totalCount));
-        }
-
-               return dedupList;
-       }
-
-       public EntityDeduplicator<T> setDedupIntervalMin(long dedupIntervalMin) 
{
-               this.dedupIntervalMin = dedupIntervalMin;
-               return this;
-       }
-       
-       public long getDedupIntervalMin() {
-               return dedupIntervalMin;
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/EntityDeduplicator.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/EntityDeduplicator.java
 
b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/EntityDeduplicator.java
deleted file mode 100644
index f24d31a..0000000
--- 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/EntityDeduplicator.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.dedup;
-
-import java.util.List;
-
-import eagle.log.base.taggedlog.TaggedLogAPIEntity;
-
-/**
- * Dedup Eagle entities.
- *
- * @param <T> Eagle entity
- */
-public interface EntityDeduplicator<T extends TaggedLogAPIEntity> {
-       
-       EntityDeduplicator<T> setDedupIntervalMin(long intervalMin);
-       
-       long getDedupIntervalMin();
-       
-       List<T> dedup(List<T> list);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/EntityTagsUniq.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/EntityTagsUniq.java
 
b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/EntityTagsUniq.java
deleted file mode 100644
index b0e5dcf..0000000
--- 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/EntityTagsUniq.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * 
- */
-package eagle.alert.dedup;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @since Mar 19, 2015
- */
-public class EntityTagsUniq {
-       public Map<String, String> tags;
-       public Long timestamp;   // entity's timestamp
-       public long createdTime; // entityTagsUniq's created time, for cache 
removal;
-       
-       private static final Logger LOG = 
LoggerFactory.getLogger(EntityTagsUniq.class);
-       
-       public EntityTagsUniq(Map<String, String> tags, long timestamp) {
-               this.tags = new HashMap<String, String>(tags);
-               this.timestamp = timestamp;
-               this.createdTime = System.currentTimeMillis();
-       }
-       
-       @Override       
-       public boolean equals(Object obj) {             
-               if (obj instanceof EntityTagsUniq) {
-                       EntityTagsUniq au = (EntityTagsUniq) obj;
-                       if (tags.size() != au.tags.size()) return false;
-                       for (Entry<String, String> keyValue : 
au.tags.entrySet()) {
-                               boolean keyExist = 
tags.containsKey(keyValue.getKey());
-                               // sanity check
-                               if (tags.get(keyValue.getKey()) == null || 
keyValue.getValue() == null) {
-                                       return true;
-                               }
-                               if ( !keyExist || 
!tags.get(keyValue.getKey()).equals(keyValue.getValue())) {                     
      
-                                       return false;
-                               }
-                       }
-                       return true; 
-               }
-               return false;
-       }
-       
-       @Override
-       public int hashCode() { 
-               int hashCode = 0;
-               for (Map.Entry<String,String> entry : tags.entrySet()) {
-            if(entry.getValue() == null) {
-                LOG.warn("Tag value for key ["+entry.getKey()+"] is null, 
skipped for hash code");
-            }else {
-                try {
-                    hashCode ^= entry.getValue().hashCode();
-                } catch (Throwable t) {
-                    LOG.info("Got exception because of entry: " + entry, t);
-                }
-            }
-               }
-               return hashCode;
-       }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertEmailGenerator.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertEmailGenerator.java
 
b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertEmailGenerator.java
deleted file mode 100644
index 58ae733..0000000
--- 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertEmailGenerator.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * 
- */
-package eagle.alert.notification;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.*;
-
-import eagle.alert.common.AlertConstants;
-import eagle.alert.common.AlertEmailSender;
-import eagle.alert.email.AlertEmailComponent;
-import eagle.alert.email.AlertEmailContext;
-import eagle.alert.entity.AlertAPIEntity;
-import com.typesafe.config.ConfigObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AlertEmailGenerator{
-       private String tplFile;
-       private String sender;
-       private String recipients;
-       private String subject;
-       private ConfigObject eagleProps;
-
-    private ThreadPoolExecutor executorPool;
-
-    private final static Logger LOG = 
LoggerFactory.getLogger(AlertEmailGenerator.class);
-
-    private final static long MAX_TIMEOUT_MS =60000;
-
-    public void sendAlertEmail(AlertAPIEntity entity) {
-               sendAlertEmail(entity, recipients, null);
-       }
-       
-       public void sendAlertEmail(AlertAPIEntity entity, String recipients) {
-               sendAlertEmail(entity, recipients, null);       
-       }
-       
-       public void sendAlertEmail(AlertAPIEntity entity, String recipients, 
String cc) {
-               AlertEmailContext email = new AlertEmailContext();
-               
-               AlertEmailComponent component = new AlertEmailComponent();
-               component.setAlertContext(entity.getAlertContext());
-               List<AlertEmailComponent> components = new 
ArrayList<AlertEmailComponent>();
-               components.add(component);              
-               email.setComponents(components);
-               if 
(entity.getAlertContext().getProperty(AlertConstants.SUBJECT) != null) {
-                       
email.setSubject(entity.getAlertContext().getProperty(AlertConstants.SUBJECT));
-               }
-               else email.setSubject(subject);
-               email.setVelocityTplFile(tplFile);
-               email.setRecipients(recipients);
-               email.setCc(cc);
-               email.setSender(sender);
-               
-               /** asynchronized email sending */
-               @SuppressWarnings("rawtypes")
-               AlertEmailSender thread = new AlertEmailSender(email, 
eagleProps);
-
-        if(this.executorPool == null) throw new 
IllegalStateException("Invoking thread executor pool but it's is not set yet");
-
-        LOG.info("Sending email  in asynchronous to: "+recipients+", cc: "+cc);
-        Future future = this.executorPool.submit(thread);
-        try {
-            future.get(MAX_TIMEOUT_MS, TimeUnit.MILLISECONDS);
-            LOG.info(String.format("Successfully send email to %s", 
recipients));
-        } catch (InterruptedException | ExecutionException  e) {
-            LOG.error(String.format("Failed to send email to %s, due 
to:%s",recipients,e),e);
-        } catch (TimeoutException e) {
-            LOG.error(String.format("Failed to send email to %s due to timeout 
exception, max timeout: %s ms ",recipients, MAX_TIMEOUT_MS),e);
-        }
-    }
-       
-       public String getTplFile() {
-               return tplFile;
-       }
-       
-       public void setTplFile(String tplFile) {
-               this.tplFile = tplFile;
-       }
-
-       public String getSender() {
-               return sender;
-       }
-
-       public void setSender(String sender) {
-               this.sender = sender;
-       }
-
-       public String getRecipients() {
-               return recipients;
-       }
-
-       public void setRecipients(String recipients) {
-               this.recipients = recipients;
-       }
-
-       public String getSubject() {
-               return subject;
-       }
-
-       public void setSubject(String subject) {
-               this.subject = subject;
-       }
-
-       public ConfigObject getEagleProps() {
-               return eagleProps;
-       }
-
-       public void setEagleProps(ConfigObject eagleProps) {
-               this.eagleProps = eagleProps;
-       }
-
-    public void setExecutorPool(ThreadPoolExecutor executorPool) {
-        this.executorPool = executorPool;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertEmailGeneratorBuilder.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertEmailGeneratorBuilder.java
 
b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertEmailGeneratorBuilder.java
deleted file mode 100644
index 10be162..0000000
--- 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertEmailGeneratorBuilder.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.notification;
-
-import java.util.concurrent.ThreadPoolExecutor;
-
-import com.typesafe.config.ConfigObject;
-
-public class AlertEmailGeneratorBuilder {
-       private AlertEmailGenerator generator;
-       private AlertEmailGeneratorBuilder(){
-               generator = new AlertEmailGenerator();
-       }
-       public static AlertEmailGeneratorBuilder newBuilder(){
-               return new AlertEmailGeneratorBuilder();
-       }
-       public AlertEmailGeneratorBuilder withSubject(String subject){
-               generator.setSubject(subject);
-               return this;
-       }
-       public AlertEmailGeneratorBuilder withSender(String sender){
-               generator.setSender(sender);
-               return this;
-       }
-       public AlertEmailGeneratorBuilder withRecipients(String recipients){
-               generator.setRecipients(recipients);
-               return this;
-       }
-       public AlertEmailGeneratorBuilder withTplFile(String tplFile){
-               generator.setTplFile(tplFile);
-               return this;
-       }
-       public AlertEmailGeneratorBuilder withEagleProps(ConfigObject 
eagleProps) {
-               generator.setEagleProps(eagleProps);
-               return this;
-       }
-    public AlertEmailGeneratorBuilder withExecutorPool(ThreadPoolExecutor 
threadPoolExecutor) {
-        generator.setExecutorPool(threadPoolExecutor);
-        return this;
-    }
-
-    public AlertEmailGenerator build(){
-               return this.generator;
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertNotificationExecutor.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertNotificationExecutor.java
 
b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertNotificationExecutor.java
deleted file mode 100644
index 6e5a3d7..0000000
--- 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertNotificationExecutor.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.notification;
-
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.jsontype.NamedType;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.sun.jersey.client.impl.CopyOnWriteHashMap;
-import com.typesafe.config.Config;
-import eagle.alert.config.EmailNotificationConfig;
-import eagle.alert.dao.AlertDefinitionDAO;
-import eagle.alert.entity.AlertAPIEntity;
-import eagle.alert.entity.AlertDefinitionAPIEntity;
-import eagle.alert.policy.DynamicPolicyLoader;
-import eagle.alert.policy.PolicyLifecycleMethods;
-import eagle.dataproc.core.JsonSerDeserUtils;
-import eagle.datastream.Collector;
-import eagle.datastream.JavaStormStreamExecutor1;
-import eagle.datastream.Tuple1;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-/**
- * notify alert by email, sms or other means
- * currently we only implements email notification
- */
-public class AlertNotificationExecutor extends 
JavaStormStreamExecutor1<String> implements PolicyLifecycleMethods {
-
-       private static final long serialVersionUID = 1690354365435407034L;
-       private static final Logger LOG = 
LoggerFactory.getLogger(AlertNotificationExecutor.class);
-       private Config config;
-
-       private List<String> alertExecutorIdList;
-       private volatile CopyOnWriteHashMap<String, List<AlertEmailGenerator>> 
alertEmailGeneratorsMap;
-       private AlertDefinitionDAO dao;
-
-    private final static int DEFAULT_THREAD_POOL_CORE_SIZE = 4;
-    private final static int DEFAULT_THREAD_POOL_MAX_SIZE = 8;
-    private final static long DEFAULT_THREAD_POOL_SHRINK_TIME = 60000L; // 1 
minute
-
-    private transient ThreadPoolExecutor executorPool;
-
-    public AlertNotificationExecutor(List<String> alertExecutorIdList, 
AlertDefinitionDAO dao){
-               this.alertExecutorIdList = alertExecutorIdList;
-               this.dao = dao;
-       }
-       
-       public List<AlertEmailGenerator> 
createAlertEmailGenerator(AlertDefinitionAPIEntity alertDef) {         
-               Module module = new 
SimpleModule("notification").registerSubtypes(new 
NamedType(EmailNotificationConfig.class, "email"));
-               EmailNotificationConfig[] emailConfigs = new 
EmailNotificationConfig[0];
-               try {                   
-                       emailConfigs = 
JsonSerDeserUtils.deserialize(alertDef.getNotificationDef(), 
EmailNotificationConfig[].class, Arrays.asList(module));
-               }
-               catch (Exception ex) {
-                       LOG.warn("Initial emailConfig error, wrong format or 
it's error " + ex.getMessage());
-               }
-               List<AlertEmailGenerator> gens = new 
ArrayList<AlertEmailGenerator>();
-               if (emailConfigs == null) {
-                       return gens;            
-               }
-               for(EmailNotificationConfig emailConfig : emailConfigs) {
-                       String tplFileName = emailConfig.getTplFileName();      
                
-                       if (tplFileName == null || tplFileName.equals("")) { // 
empty tplFileName, use default tpl file name
-                               tplFileName = "ALERT_DEFAULT.vm";
-                       }
-                       AlertEmailGenerator gen = 
AlertEmailGeneratorBuilder.newBuilder().
-                                                                               
                                                
withEagleProps(config.getObject("eagleProps")).
-                                                                               
                                                
withSubject(emailConfig.getSubject()).
-                                                                               
                                                
withSender(emailConfig.getSender()).
-                                                                               
                                                
withRecipients(emailConfig.getRecipients()).
-                                                                               
                                                withTplFile(tplFileName).
-                                                                
withExecutorPool(executorPool).
-                                                                               
                                                build();
-                       gens.add(gen);
-               }
-               return gens;
-       }
-       
-       /**
-        * 1. register both file and database configuration
-        * 2. create email generator from configuration
-        */
-    @Override
-       public void init(){
-        executorPool = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_CORE_SIZE, 
DEFAULT_THREAD_POOL_MAX_SIZE, DEFAULT_THREAD_POOL_SHRINK_TIME, 
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
-
-               Map<String, List<AlertEmailGenerator>> tmpEmailGenerators = new 
HashMap<String, List<AlertEmailGenerator>> ();
-               
-        String site = config.getString("eagleProps.site");
-        String dataSource = config.getString("eagleProps.dataSource");
-           Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs;
-           try {
-                       initialAlertDefs = 
dao.findActiveAlertDefsGroupbyAlertExecutorId(site, dataSource);
-           }
-           catch (Exception ex) {
-                       LOG.error("fail to initialize initialAlertDefs: ", ex);
-               throw new IllegalStateException("fail to initialize 
initialAlertDefs: ", ex);
-        }
-          
-        if(initialAlertDefs == null || initialAlertDefs.isEmpty()){
-            LOG.warn("No alert definitions found for site: "+site+", 
dataSource: "+dataSource);
-        }
-        else {
-                   for (String alertExecutorId: alertExecutorIdList) {
-                if(initialAlertDefs.containsKey(alertExecutorId)) {
-                    for (AlertDefinitionAPIEntity alertDef : 
initialAlertDefs.get(alertExecutorId).values()) {
-                        List<AlertEmailGenerator> gens = 
createAlertEmailGenerator(alertDef);
-                        
tmpEmailGenerators.put(alertDef.getTags().get("policyId"), gens);
-                    }
-                }else{
-                    LOG.info(String.format("No alert definitions found for 
site: %s, dataSource: %s, alertExecutorId: 
%s",site,dataSource,alertExecutorId));
-                }
-                   }
-        }
-               
-               alertEmailGeneratorsMap = new CopyOnWriteHashMap<String, 
List<AlertEmailGenerator>>();
-               alertEmailGeneratorsMap.putAll(tmpEmailGenerators);             
                
-               DynamicPolicyLoader policyLoader = 
DynamicPolicyLoader.getInstance();
-               policyLoader.init(initialAlertDefs, dao, config);
-               for (String alertExecutorId : alertExecutorIdList) {
-                       policyLoader.addPolicyChangeListener(alertExecutorId, 
this);
-               }
-       }
-
-    @Override
-       public void prepareConfig(Config config) {
-               this.config = config;
-       }
-
-    @Override
-    public void flatMap(java.util.List<Object> input, 
Collector<Tuple1<String>> outputCollector){
-        String policyId = (String) input.get(0);
-        AlertAPIEntity alertEntity = (AlertAPIEntity) input.get(1);
-        processAlerts(policyId, Arrays.asList(alertEntity));
-    }
-       
-       //TODO: add a thread pool for email sender?
-       private void processAlerts(String policyId, List<AlertAPIEntity> list) {
-               List<AlertEmailGenerator> generators;
-               synchronized(alertEmailGeneratorsMap) {         
-                       generators = alertEmailGeneratorsMap.get(policyId);
-               }
-               if (generators == null) {
-                       LOG.warn("Notification config of policyId " + policyId 
+ " has been deleted");
-                       return;
-               }
-               for (AlertAPIEntity entity : list) {
-                       for(AlertEmailGenerator generator : generators){
-                               generator.sendAlertEmail(entity);
-                       }
-               }
-       }
-
-       @Override
-       public void onPolicyCreated(Map<String, AlertDefinitionAPIEntity> 
added) {
-               if(LOG.isDebugEnabled()) LOG.debug(" alert notification config 
changed : " + added);
-               for(AlertDefinitionAPIEntity alertDef : added.values()){
-                       LOG.info("alert notification config really changed " + 
alertDef);
-                       List<AlertEmailGenerator> gens = 
createAlertEmailGenerator(alertDef);
-                       synchronized(alertEmailGeneratorsMap) {         
-                               
alertEmailGeneratorsMap.put(alertDef.getTags().get("policyId"), gens);
-                       }
-               }               
-       }
-
-       @Override
-       public void onPolicyChanged(Map<String, AlertDefinitionAPIEntity> 
changed) {
-               if(LOG.isDebugEnabled()) LOG.debug("alert notification config 
to be added : " + changed);
-               for(AlertDefinitionAPIEntity alertDef : changed.values()){
-                       LOG.info("alert notification config really added " + 
alertDef);
-                       List<AlertEmailGenerator> gens = 
createAlertEmailGenerator(alertDef);
-                       synchronized(alertEmailGeneratorsMap) {                 
                
-                               
alertEmailGeneratorsMap.put(alertDef.getTags().get("policyId"), gens);
-                       }
-               }                       
-       }
-
-       @Override
-       public void onPolicyDeleted(Map<String, AlertDefinitionAPIEntity> 
deleted) {
-               if(LOG.isDebugEnabled()) LOG.debug("alert notification config 
to be deleted : " + deleted);
-               for(AlertDefinitionAPIEntity alertDef : deleted.values()){
-                       LOG.info("alert notification config really deleted " + 
alertDef);
-                       synchronized(alertEmailGeneratorsMap) {         
-                               
alertEmailGeneratorsMap.remove(alertDef.getTags().get("policyId"));
-                       }
-               }               
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/UrlBuilder.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/UrlBuilder.java
 
b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/UrlBuilder.java
deleted file mode 100644
index fa5dc82..0000000
--- 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/UrlBuilder.java
+++ /dev/null
@@ -1,47 +0,0 @@
-package eagle.alert.notification;
-
-import eagle.alert.common.AlertConstants;
-import eagle.alert.entity.AlertAPIEntity;
-import eagle.common.EagleBase64Wrapper;
-import eagle.common.config.EagleConfigConstants;
-import eagle.log.entity.HBaseInternalLogHelper;
-import eagle.log.entity.InternalLog;
-import eagle.log.entity.RowkeyBuilder;
-import eagle.log.entity.meta.EntityDefinitionManager;
-import org.mortbay.util.UrlEncoded;
-import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class UrlBuilder {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(UrlBuilder.class);
-
-    public static String getEncodedRowkey(AlertAPIEntity entity) throws 
Exception {
-        InternalLog log = HBaseInternalLogHelper.convertToInternalLog(entity, 
EntityDefinitionManager.getEntityDefinitionByEntityClass(entity.getClass()));
-        return 
EagleBase64Wrapper.encodeByteArray2URLSafeString(RowkeyBuilder.buildRowkey(log));
-    }
-
-    public static String buildAlertDetailUrl(String host, int port, 
AlertAPIEntity entity) {
-        String baseUrl = "http://"; + host + ":" + String.valueOf(port) + 
"/eagle-service/#/dam/alertDetail/";
-        try {
-            return baseUrl + UrlEncoded.encodeString(getEncodedRowkey(entity));
-        }
-        catch (Exception ex) {
-            logger.error("Fail to populate encodedRowkey for alert Entity" + 
entity.toString());
-            return "N/A";
-        }
-    }
-
-    public static String buiildPolicyDetailUrl(String host, int port, 
Map<String, String> tags) {
-        String baseUrl = "http://"; + host + ":" + String.valueOf(port) + 
"/eagle-service/#/dam/policyDetail?";
-        String format = "policy=%s&site=%s&executor=%s";
-        String policy = tags.get(AlertConstants.POLICY_ID);
-        String site = tags.get(EagleConfigConstants.SITE);
-        String alertExecutorID = tags.get(AlertConstants.ALERT_EXECUTOR_ID);
-        if (policy != null && site != null && alertExecutorID != null) {
-            return baseUrl + String.format(format, policy, site, 
alertExecutorID);
-        }
-        return "N/A";
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/persist/AlertPersistExecutor.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/persist/AlertPersistExecutor.java
 
b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/persist/AlertPersistExecutor.java
deleted file mode 100644
index 543405d..0000000
--- 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/persist/AlertPersistExecutor.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.persist;
-
-import eagle.alert.entity.AlertAPIEntity;
-import com.typesafe.config.Config;
-import eagle.common.config.EagleConfigConstants;
-import eagle.datastream.Collector;
-import eagle.datastream.JavaStormStreamExecutor1;
-import eagle.datastream.Tuple1;
-
-import java.util.Arrays;
-
-public class AlertPersistExecutor extends JavaStormStreamExecutor1<String> {
-
-       private static final long serialVersionUID = 1L;
-       private Config config;
-       private EaglePersist persist;
-
-       public AlertPersistExecutor(){
-       }
-    @Override
-       public void prepareConfig(Config config) {
-               this.config = config;           
-       }
-
-    @Override
-       public void init() {
-               String host = config.getString(EagleConfigConstants.EAGLE_PROPS 
+ "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
-               int port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." 
+ EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
-               String username = 
config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + 
EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME)
-                               ? 
config.getString(EagleConfigConstants.EAGLE_PROPS + "." + 
EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME) : 
null;
-               String password = 
config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + 
EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD)
-                               ? 
config.getString(EagleConfigConstants.EAGLE_PROPS + "." + 
EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD) : 
null;
-               this.persist = new EaglePersist(host, port, username, password);
-       }
-
-    @Override
-    public void flatMap(java.util.List<Object> input, 
Collector<Tuple1<String>> outputCollector){
-        persist.doPersist(Arrays.asList((AlertAPIEntity)(input.get(1))));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/persist/EaglePersist.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/persist/EaglePersist.java
 
b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/persist/EaglePersist.java
deleted file mode 100644
index b5820c4..0000000
--- 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/persist/EaglePersist.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * 
- */
-package eagle.alert.persist;
-
-import eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import eagle.log.entity.GenericServiceAPIResponseEntity;
-import eagle.service.client.IEagleServiceClient;
-import eagle.service.client.impl.EagleServiceClientImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-public class EaglePersist {
-               
-       private static Logger LOG = LoggerFactory.getLogger(EaglePersist.class);
-       private String eagleServiceHost;
-       private int eagleServicePort;
-       private String username;
-       private String password;
-
-       public EaglePersist(String eagleServiceHost, int eagleServicePort) {
-               this(eagleServiceHost, eagleServicePort, null, null);
-       }
-
-       public EaglePersist(String eagleServiceHost, int eagleServicePort, 
String username, String password) {
-               this.eagleServiceHost = eagleServiceHost;
-               this.eagleServicePort = eagleServicePort;
-               this.username = username;
-               this.password = password;
-       }
-       
-       public boolean doPersist(List<? extends TaggedLogAPIEntity> list) {
-               if (list.isEmpty()) return false;
-               LOG.info("Going to persist entities, type: " + " " + 
list.get(0).getClass().getSimpleName() + ", list size: " + list.size());
-               try {
-                       IEagleServiceClient client = new 
EagleServiceClientImpl(eagleServiceHost, eagleServicePort, username, password);
-                       GenericServiceAPIResponseEntity<String> response = 
client.create(list);
-                       client.close();
-                       if (response.isSuccess()) {
-                               LOG.info("Successfully create entities " + 
list.toString());
-                               return true;
-                       }
-                       else {
-                               LOG.error("Fail to create entities");
-                               return false;
-                       }
-               }
-               catch (Exception ex) {
-                       LOG.error("Got an exception in persisting entities" + 
ex.getMessage(), ex);
-                       return false;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/DefaultPolicyPartitioner.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/DefaultPolicyPartitioner.java
 
b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/DefaultPolicyPartitioner.java
deleted file mode 100644
index ef76c2f..0000000
--- 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/DefaultPolicyPartitioner.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.policy;
-
-
-public class DefaultPolicyPartitioner implements PolicyPartitioner{
-       @Override
-       public int partition(int numTotalPartitions, String policyType,
-                       String policyId) {
-               final int prime = 31;
-               int result = 1;
-               result = result * prime + policyType.hashCode();
-               result = result < 0 ? result*-1 : result;
-               result = result * prime + policyId.hashCode();
-               result = result < 0 ? result*-1 : result;
-               return result % numTotalPartitions;
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/DynamicPolicyLoader.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/DynamicPolicyLoader.java
 
b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/DynamicPolicyLoader.java
deleted file mode 100644
index ba76a15..0000000
--- 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/DynamicPolicyLoader.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.policy;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import eagle.common.config.EagleConfigConstants;
-import org.apache.commons.collections.CollectionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import eagle.alert.dao.AlertDefinitionDAO;
-import eagle.alert.entity.AlertDefinitionAPIEntity;
-import com.netflix.config.AbstractPollingScheduler;
-import com.netflix.config.ConcurrentCompositeConfiguration;
-import com.netflix.config.DynamicConfiguration;
-import com.netflix.config.FixedDelayPollingScheduler;
-import com.netflix.config.PollListener;
-import com.netflix.config.PollResult;
-import com.netflix.config.PolledConfigurationSource;
-import com.sun.jersey.client.impl.CopyOnWriteHashMap;
-import com.typesafe.config.Config;
-
-public class DynamicPolicyLoader {
-       private static final Logger LOG = 
LoggerFactory.getLogger(DynamicPolicyLoader.class);
-       
-       private final int defaultInitialDelayMillis = 30*1000;
-       private final int defaultDelayMillis = 60*1000;
-       private final boolean defaultIgnoreDeleteFromSource = true;
-       private volatile CopyOnWriteHashMap<String, 
List<PolicyLifecycleMethods>> policyChangeListeners = new 
CopyOnWriteHashMap<String, List<PolicyLifecycleMethods>>();
-       private static DynamicPolicyLoader instance = new DynamicPolicyLoader();
-       private volatile boolean initialized = false;
-       
-       public void addPolicyChangeListener(String alertExecutorId, 
PolicyLifecycleMethods alertExecutor){
-               synchronized(policyChangeListeners) {
-                       if (policyChangeListeners.get(alertExecutorId) == null) 
{
-                               policyChangeListeners.put(alertExecutorId, new 
ArrayList<PolicyLifecycleMethods>());
-                       }
-                       
policyChangeListeners.get(alertExecutorId).add(alertExecutor);
-               }
-       }
-       
-       public static DynamicPolicyLoader getInstance(){
-               return instance;
-       }
-       
-       /**
-        * singleton with init would be good for unit test as well, and it 
ensures that
-        * initialization happens only once before you use it.  
-        * @param config
-        * @param dao
-        */
-       public void init(Map<String, Map<String, AlertDefinitionAPIEntity>> 
initialAlertDefs, 
-                       AlertDefinitionDAO dao, Config config){
-               if(!initialized){
-                       synchronized(this){
-                               if(!initialized){
-                                       internalInit(initialAlertDefs, dao, 
config);
-                                       initialized = true;
-                               }
-                       }
-               }
-       }
-       
-       /**
-        * map from alertExecutorId+partitionId to AlertExecutor which 
implements PolicyLifecycleMethods
-        * @param initialAlertDefs
-        * @param dao
-        * @param config
-        */
-       private void internalInit(Map<String, Map<String, 
AlertDefinitionAPIEntity>> initialAlertDefs,
-                       AlertDefinitionDAO dao, Config config){
-               if(!config.getBoolean("dynamicConfigSource.enabled")) {
-            return;
-        }
-               AbstractPollingScheduler scheduler = new 
FixedDelayPollingScheduler(
-                config.getInt("dynamicConfigSource.initDelayMillis"),
-                config.getInt("dynamicConfigSource.delayMillis"),
-                false
-        );
-
-               scheduler.addPollListener(new PollListener(){
-                       @Override
-                       public void handleEvent(EventType eventType, PollResult 
lastResult,
-                                       Throwable exception) {
-                               if (lastResult == null) {
-                                       LOG.error("The lastResult is null, 
something must be wrong, probably the eagle service is dead!");
-                                       throw new RuntimeException("The 
lastResult is null, probably the eagle service is dead! ", exception);
-                               }
-                               Map<String, Object> added = 
lastResult.getAdded();
-                               Map<String, Object> changed = 
lastResult.getChanged();
-                               Map<String, Object> deleted = 
lastResult.getDeleted();
-                               for(Map.Entry<String, 
List<PolicyLifecycleMethods>> entry : policyChangeListeners.entrySet()){
-                                       String alertExecutorId = entry.getKey();
-                                       for (PolicyLifecycleMethods 
policyLifecycleMethod : entry.getValue()) {
-                                               Map<String, 
AlertDefinitionAPIEntity> addedPolicies = (Map<String, 
AlertDefinitionAPIEntity>)added.get(trimPartitionNum(alertExecutorId));
-                                               if(addedPolicies != null && 
addedPolicies.size() > 0){
-                                                       
policyLifecycleMethod.onPolicyCreated(addedPolicies);
-                                               }
-                                               Map<String, 
AlertDefinitionAPIEntity> changedPolicies = (Map<String, 
AlertDefinitionAPIEntity>)changed.get(trimPartitionNum(alertExecutorId));
-                                               if(changedPolicies != null && 
changedPolicies.size() > 0){
-                                                       
policyLifecycleMethod.onPolicyChanged(changedPolicies);
-                                               }
-                                               Map<String, 
AlertDefinitionAPIEntity> deletedPolicies = (Map<String, 
AlertDefinitionAPIEntity>)deleted.get(trimPartitionNum(alertExecutorId));
-                                               if(deletedPolicies != null && 
deletedPolicies.size() > 0){
-                                                       
policyLifecycleMethod.onPolicyDeleted(deletedPolicies);
-                                               }
-                                       }
-                               }
-                       }
-                       private String trimPartitionNum(String alertExecutorId){
-                               int i = alertExecutorId.lastIndexOf('_');
-                               if(i != -1){
-                                       return alertExecutorId.substring(0, i);
-                               }
-                               return alertExecutorId;
-                       }
-               });
-               
-               ConcurrentCompositeConfiguration finalConfig = new 
ConcurrentCompositeConfiguration();
-                     
-               PolledConfigurationSource source = new 
DynamicPolicySource(initialAlertDefs, dao, config);
-
-               try{
-                       DynamicConfiguration dbSourcedConfiguration = new 
DynamicConfiguration(source, scheduler);
-                       finalConfig.addConfiguration(dbSourcedConfiguration);
-               }catch(Exception ex){
-                       LOG.warn("Fail loading from DB, continue without DB 
sourced configuration", ex);
-               }
-       }
-       
-       public static class DynamicPolicySource implements 
PolledConfigurationSource{
-               private static Logger LOG = 
LoggerFactory.getLogger(DynamicPolicySource.class);
-               private Config config;
-               private AlertDefinitionDAO dao;
-               /**
-                * mapping from alertExecutorId to list of policies 
-                */
-               private Map<String, Map<String, AlertDefinitionAPIEntity>> 
cachedAlertDefs;
-               
-               public DynamicPolicySource(Map<String, Map<String, 
AlertDefinitionAPIEntity>> initialAlertDefs, AlertDefinitionDAO dao, Config 
config){
-                       this.cachedAlertDefs = initialAlertDefs;
-                       this.dao = dao;
-                       this.config = config;
-               }
-
-               public PollResult poll(boolean initial, Object checkPoint) 
throws Exception {
-                       LOG.info("Poll policy from eagle service " +  
config.getString(EagleConfigConstants.EAGLE_PROPS + "." + 
EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST) +
-                                       ":" + 
config.getString(EagleConfigConstants.EAGLE_PROPS + "." + 
EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT) );
-                       Map<String, Map<String, AlertDefinitionAPIEntity>> 
newAlertDefs = 
-                                       
dao.findActiveAlertDefsGroupbyAlertExecutorId(config.getString("eagleProps.site"),
-                            config.getString("eagleProps.dataSource"));
-                       
-                       // compare runtime alertDefs with cachedAlertDefs and 
figure out what are added/deleted/updated
-                       Map<String, Object> added = new HashMap<String, 
Object>();
-                       Map<String, Object> changed = new HashMap<String, 
Object>();
-                       Map<String, Object> deleted = new HashMap<String, 
Object>();
-                       
-                       Set<String> newAlertExecutorIds = newAlertDefs.keySet();
-                       Set<String> cachedAlertExecutorIds = 
cachedAlertDefs.keySet();
-                       
-                       // dynamically adding new alert executor is not 
supported, because alert executor is pre-built while program starts up
-                       Collection<String> addedAlertExecutorIds = 
CollectionUtils.subtract(newAlertExecutorIds, cachedAlertExecutorIds);
-                       if(addedAlertExecutorIds != null && 
addedAlertExecutorIds.size() > 0){
-                               LOG.warn("New alertExecutorIds are found : " + 
addedAlertExecutorIds);
-                       }
-                       
-                       // if one alert executor is missing, it means all 
policy under that alert executor should be removed
-                       Collection<String> deletedAlertExecutorIds = 
CollectionUtils.subtract(cachedAlertExecutorIds, newAlertExecutorIds);
-                       if(deletedAlertExecutorIds != null && 
deletedAlertExecutorIds.size() > 0){
-                               LOG.warn("Some alertExecutorIds are deleted : " 
+ deletedAlertExecutorIds);
-                               for(String deletedAlertExecutorId : 
deletedAlertExecutorIds){
-                                       deleted.put(deletedAlertExecutorId, 
cachedAlertDefs.get(deletedAlertExecutorId));
-                               }
-                       }
-                       
-                       // we need calculate added/updated/deleted policy for 
all executors which are not deleted
-//                     Collection<String> updatedAlertExecutorIds = 
CollectionUtils.intersection(newAlertExecutorIds, cachedAlertExecutorIds);
-            Collection<String> updatedAlertExecutorIds = newAlertExecutorIds;
-                       for(String updatedAlertExecutorId : 
updatedAlertExecutorIds){
-                               Map<String, AlertDefinitionAPIEntity> 
newPolicies = newAlertDefs.get(updatedAlertExecutorId);
-                               Map<String, AlertDefinitionAPIEntity> 
cachedPolicies = cachedAlertDefs.get(updatedAlertExecutorId);
-                               
PolicyComparator.compare(updatedAlertExecutorId, newPolicies, cachedPolicies, 
added, changed, deleted);
-                       }
-                       
-                       cachedAlertDefs = newAlertDefs;
-                       return PollResult.createIncremental(added, changed, 
deleted, new Date().getTime());
-               }
-       }
-       
-       public static class PolicyComparator{
-               public static void compare(String alertExecutorId, Map<String, 
AlertDefinitionAPIEntity> newPolicies, Map<String, AlertDefinitionAPIEntity> 
cachedPolicies, 
-                               Map<String, Object> added, Map<String, Object> 
changed, Map<String, Object> deleted){
-                       Set<String> newPolicyIds = newPolicies.keySet();
-            Set<String> cachedPolicyIds = cachedPolicies != null ? 
cachedPolicies.keySet() : new HashSet<String>();
-                       Collection<String> addedPolicyIds = 
CollectionUtils.subtract(newPolicyIds, cachedPolicyIds);
-                       Collection<String> deletedPolicyIds = 
CollectionUtils.subtract(cachedPolicyIds, newPolicyIds);
-                       Collection<String> changedPolicyIds = 
CollectionUtils.intersection(cachedPolicyIds, newPolicyIds);
-                       if(addedPolicyIds != null && addedPolicyIds.size() > 0){
-                               Map<String, AlertDefinitionAPIEntity> tmp = new 
HashMap<String, AlertDefinitionAPIEntity>();
-                               for(String addedPolicyId : addedPolicyIds){
-                                       tmp.put(addedPolicyId, 
newPolicies.get(addedPolicyId));
-                               }
-                               added.put(alertExecutorId, tmp);
-                       }
-                       if(deletedPolicyIds != null && deletedPolicyIds.size() 
> 0){
-                               Map<String, AlertDefinitionAPIEntity> tmp = new 
HashMap<String, AlertDefinitionAPIEntity>();
-                               for(String deletedPolicyId : deletedPolicyIds){
-                                       tmp.put(deletedPolicyId, 
cachedPolicies.get(deletedPolicyId));
-                               }
-                               deleted.put(alertExecutorId, tmp);
-                       }
-                       if(changedPolicyIds != null && changedPolicyIds.size() 
> 0){
-                               Map<String, AlertDefinitionAPIEntity> tmp = new 
HashMap<String, AlertDefinitionAPIEntity>();
-                               for(String changedPolicyId : changedPolicyIds){
-                                       // check if policy is really changed
-                                       
if(!newPolicies.get(changedPolicyId).equals(cachedPolicies.get(changedPolicyId))){
-                                               tmp.put(changedPolicyId, 
newPolicies.get(changedPolicyId));
-                                       }
-                               }
-                               changed.put(alertExecutorId, tmp);
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PartitionUtils.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PartitionUtils.java
 
b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PartitionUtils.java
deleted file mode 100644
index c3b53c2..0000000
--- 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PartitionUtils.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.policy;
-
-import eagle.alert.common.AlertConstants;
-import eagle.alert.entity.AlertDefinitionAPIEntity;
-
-public class PartitionUtils {
-       
-       public static boolean accept(AlertDefinitionAPIEntity alertDef, 
PolicyPartitioner partitioner, int numPartitions, int partitionSeq){
-               int targetPartitionSeq = partitioner.partition(numPartitions, 
alertDef.getTags().get(AlertConstants.POLICY_TYPE), 
alertDef.getTags().get(AlertConstants.POLICY_ID));
-               if(targetPartitionSeq == partitionSeq)
-                       return true;
-               return false;
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PolicyEvaluator.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PolicyEvaluator.java
 
b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PolicyEvaluator.java
deleted file mode 100644
index c209b97..0000000
--- 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PolicyEvaluator.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.policy;
-
-import java.util.List;
-import java.util.Map;
-
-import eagle.alert.entity.AlertDefinitionAPIEntity;
-import eagle.dataproc.core.ValuesArray;
-
-public interface PolicyEvaluator {
-       /**
-        * take input and evaluate expression
-        * input has 3 fields, first is siddhiAlertContext, second one is 
streamName, the third is map of attribute name/value
-        * @param input
-        * @throws Exception
-        */
-       public void evaluate(ValuesArray input) throws Exception;
-       
-       /**
-        * notify policy evaluator that policy is updated
-        */
-       public void onPolicyUpdate(AlertDefinitionAPIEntity newAlertDef);
-       
-       /**
-        * notify policy evaluator that policy is deleted, here is cleanup work 
for this policy evaluator
-        */
-       public void onPolicyDelete();
-       
-       /**
-        * get additional context
-        */     
-       public Map<String, String> getAdditionalContext();
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PolicyEvaluatorServiceProvider.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PolicyEvaluatorServiceProvider.java
 
b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PolicyEvaluatorServiceProvider.java
deleted file mode 100644
index a022f98..0000000
--- 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PolicyEvaluatorServiceProvider.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.policy;
-
-import java.util.List;
-
-import com.fasterxml.jackson.databind.Module;
-
-/**
- * to provide extensibility, we need a clear differentiation between framework 
job and provider logic
- * policy evaluator framework:
- * - connect to eagle data source
- * - read all policy definitions
- * - compare with cached policy definitions
- * - figure out if policy is created, deleted or updated
- *   - if policy is created, then invoke onPolicyCreated
- *   - if policy is deleted, then invoke onPolicyDeleted
- *   - if policy is updated, then invoke onPolicyUpdated
- * - for policy update, replace old evaluator engine with new evaluator engine 
which is created by policy evaluator provider
- * - specify # of executors for this alert executor id
- * - dynamically balance # of policies evaluated by each alert executor
- *   - use zookeeper to balance. 
eaglePolicies/${alertExecutorId}/${alertExecutorInstanceId} => list of policies
- * 
- * policy evaluator business features:
- * - register mapping between policy type and PolicyEvaluator
- * - create evaluator engine runtime when configuration is changed
- *
- */
-public interface PolicyEvaluatorServiceProvider {
-       String getPolicyType();
-       Class<? extends PolicyEvaluator> getPolicyEvaluator();
-       List<Module> getBindingModules();
-}

Reply via email to