http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamEntity.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamEntity.java b/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamEntity.java new file mode 100644 index 0000000..f0f81c5 --- /dev/null +++ b/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamEntity.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.entity; + +import org.apache.eagle.alert.common.AlertConstants; +import org.codehaus.jackson.annotate.JsonIgnoreProperties; +import org.codehaus.jackson.map.annotate.JsonSerialize; + +import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; +import org.apache.eagle.log.entity.meta.Column; +import org.apache.eagle.log.entity.meta.ColumnFamily; +import org.apache.eagle.log.entity.meta.Prefix; +import org.apache.eagle.log.entity.meta.Service; +import org.apache.eagle.log.entity.meta.Table; +import org.apache.eagle.log.entity.meta.Tags; +import org.apache.eagle.log.entity.meta.TimeSeries; + +@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@Table("alertStream") +@ColumnFamily("f") +@Prefix("alertStream") +@Service(AlertConstants.ALERT_STREAM_SERVICE_ENDPOINT_NAME) +@JsonIgnoreProperties(ignoreUnknown = true) +@TimeSeries(false) +@Tags({"dataSource", "streamName"}) +public class AlertStreamEntity extends TaggedLogAPIEntity{ + @Column("a") + private String desc; + + public String getDesc() { + return desc; + } + public void setDesc(String desc) { + this.desc = desc; + valueChanged("desc"); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamSchemaEntity.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamSchemaEntity.java b/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamSchemaEntity.java new file mode 100644 index 0000000..76b6097 --- /dev/null +++ b/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamSchemaEntity.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.entity; + +import org.apache.eagle.alert.common.AlertConstants; +import org.codehaus.jackson.annotate.JsonIgnoreProperties; +import org.codehaus.jackson.map.annotate.JsonSerialize; + +import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; +import org.apache.eagle.log.entity.meta.Column; +import org.apache.eagle.log.entity.meta.ColumnFamily; +import org.apache.eagle.log.entity.meta.Prefix; +import org.apache.eagle.log.entity.meta.Service; +import org.apache.eagle.log.entity.meta.Table; +import org.apache.eagle.log.entity.meta.Tags; +import org.apache.eagle.log.entity.meta.TimeSeries; + +/** + * ddl to create streammetadata table + * + * create 'alertStreamSchema', {NAME => 'f', BLOOMFILTER => 'ROW', VERSIONS => '1', COMPRESSION => 'SNAPPY'} + */ +@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@Table("alertStreamSchema") +@ColumnFamily("f") +@Prefix("alertStreamSchema") +@Service(AlertConstants.ALERT_STREAM_SCHEMA_SERVICE_ENDPOINT_NAME) +@JsonIgnoreProperties(ignoreUnknown = true) +@TimeSeries(false) +@Tags({"dataSource", "streamName", "attrName"}) +public class AlertStreamSchemaEntity extends TaggedLogAPIEntity{ + @Column("a") + private String attrType; + @Column("b") + private String category; + @Column("c") + private String attrValueResolver; + /* all tags form the key for alert de-duplication */ + @Column("d") + private Boolean usedAsTag; + @Column("e") + private String attrDescription; + @Column("f") + private String attrDisplayName; + @Column("g") + private String defaultValue; + + public String getAttrType() { + return attrType; + } + public void setAttrType(String attrType) { + this.attrType = attrType; + valueChanged("attrType"); + } + public String getCategory() { + return category; + } + public void setCategory(String category) { + this.category = category; + valueChanged("category"); + } + public String getAttrValueResolver() { + return attrValueResolver; + } + public void setAttrValueResolver(String attrValueResolver) { + this.attrValueResolver = attrValueResolver; + valueChanged("attrValueResolver"); + } + public Boolean getUsedAsTag() { + return usedAsTag; + } + public void setUsedAsTag(Boolean usedAsTag) { + this.usedAsTag = usedAsTag; + valueChanged("usedAsTag"); + } + public String getAttrDescription() { + return attrDescription; + } + public void setAttrDescription(String attrDescription) { + this.attrDescription = attrDescription; + valueChanged("attrDescription"); + } + public String getAttrDisplayName() { + return attrDisplayName; + } + public void setAttrDisplayName(String attrDisplayName) { + this.attrDisplayName = attrDisplayName; + valueChanged("attrDisplayName"); + } + public String getDefaultValue() { + return defaultValue; + } + public void setDefaultValue(String defaultValue) { + this.defaultValue = defaultValue; + valueChanged("defaultValue"); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/AbstractPolicyDefinition.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/AbstractPolicyDefinition.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/AbstractPolicyDefinition.java deleted file mode 100644 index c235225..0000000 --- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/AbstractPolicyDefinition.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package eagle.alert.config; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.fasterxml.jackson.annotation.JsonTypeInfo; - -/** - * base fields for all policy definition - */ -@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", visible=true) -@JsonIgnoreProperties(ignoreUnknown = true) -public class AbstractPolicyDefinition { - private String type; - /** - * @return type in string - */ - public String getType() { - return type; - } - - /** - * @param type set type value - */ - public void setType(String type) { - this.type = type; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/DeduplicatorConfig.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/DeduplicatorConfig.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/DeduplicatorConfig.java deleted file mode 100644 index 04f20ff..0000000 --- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/DeduplicatorConfig.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package eagle.alert.config; - -import java.io.Serializable; - -public class DeduplicatorConfig implements Serializable{ - private static final long serialVersionUID = 1L; - public int getAlertDedupIntervalMin() { - return alertDedupIntervalMin; - } - public void setAlertDedupIntervalMin(int alertDedupIntervalMin) { - this.alertDedupIntervalMin = alertDedupIntervalMin; - } - public int getEmailDedupIntervalMin() { - return emailDedupIntervalMin; - } - public void setEmailDedupIntervalMin(int emailDedupIntervalMin) { - this.emailDedupIntervalMin = emailDedupIntervalMin; - } - private int alertDedupIntervalMin; - private int emailDedupIntervalMin; -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/EmailNotificationConfig.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/EmailNotificationConfig.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/EmailNotificationConfig.java deleted file mode 100644 index fae24d5..0000000 --- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/EmailNotificationConfig.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package eagle.alert.config; - -public class EmailNotificationConfig extends NotificationConfig{ - private static final long serialVersionUID = 1L; - private String sender; - private String recipients; - private String tplFileName; - private String subject; - public String getSubject() { - return subject; - } - public void setSubject(String subject) { - this.subject = subject; - } - public String getRecipients() { - return recipients; - } - public void setRecipients(String recipients) { - this.recipients = recipients; - } - public String getSender() { - return sender; - } - public void setSender(String sender) { - this.sender = sender; - } - public String getTplFileName() { - return tplFileName; - } - public void setTplFileName(String tplFileName) { - this.tplFileName = tplFileName; - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/NotificationConfig.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/NotificationConfig.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/NotificationConfig.java deleted file mode 100644 index 6903a57..0000000 --- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/NotificationConfig.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package eagle.alert.config; - -import java.io.Serializable; - -import com.fasterxml.jackson.annotation.JsonTypeInfo; - -@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "flavor", visible=true) -public class NotificationConfig implements Serializable{ - private static final long serialVersionUID = 1L; - private String id; - private String flavor; - - public String getId() { - return id; - } - - public void setId(String id) { - this.id = id; - } - - public String getFlavor() { - return flavor; - } - - public void setFlavor(String flavor) { - this.flavor = flavor; - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/Remediation.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/Remediation.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/Remediation.java deleted file mode 100644 index c644a31..0000000 --- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/config/Remediation.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package eagle.alert.config; - -import java.io.Serializable; - -public class Remediation implements Serializable{ - private static final long serialVersionUID = 1L; - private String id; - - public String getId() { - return id; - } - - public void setId(String id) { - this.id = id; - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertDeduplicationExecutorBase.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertDeduplicationExecutorBase.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertDeduplicationExecutorBase.java deleted file mode 100644 index b459130..0000000 --- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertDeduplicationExecutorBase.java +++ /dev/null @@ -1,191 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package eagle.alert.dedup; - -import eagle.alert.common.AlertConstants; -import eagle.alert.config.DeduplicatorConfig; -import eagle.alert.dao.AlertDefinitionDAO; -import eagle.alert.entity.AlertAPIEntity; -import eagle.alert.entity.AlertDefinitionAPIEntity; -import eagle.alert.policy.DynamicPolicyLoader; -import eagle.alert.policy.PolicyLifecycleMethods; -import eagle.common.config.EagleConfigConstants; -import eagle.datastream.Collector; -import eagle.datastream.JavaStormStreamExecutor2; -import eagle.datastream.Tuple2; -import com.sun.jersey.client.impl.CopyOnWriteHashMap; -import com.typesafe.config.Config; -import eagle.dataproc.core.JsonSerDeserUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public abstract class AlertDeduplicationExecutorBase extends JavaStormStreamExecutor2<String, AlertAPIEntity> implements PolicyLifecycleMethods { - private static final long serialVersionUID = 1L; - private static final Logger LOG = LoggerFactory.getLogger(AlertDeduplicationExecutorBase.class); - protected Config config; - protected DEDUP_TYPE dedupType; - - private List<String> alertExecutorIdList; - private volatile CopyOnWriteHashMap<String, DefaultDeduplicator<AlertAPIEntity>> alertDedups; - private AlertDefinitionDAO dao; - - public enum DEDUP_TYPE { - ENTITY, - EMAIL - } - - public AlertDeduplicationExecutorBase(List<String> alertExecutorIdList, DEDUP_TYPE dedupType, AlertDefinitionDAO dao){ - this.alertExecutorIdList = alertExecutorIdList; - this.dedupType = dedupType; - this.dao = dao; - } - - @Override - public void prepareConfig(Config config) { - this.config = config; - } - - public DefaultDeduplicator<AlertAPIEntity> createAlertDedup(AlertDefinitionAPIEntity alertDef) { - DeduplicatorConfig dedupConfig = null; - try { - dedupConfig = JsonSerDeserUtils.deserialize(alertDef.getDedupeDef(), DeduplicatorConfig.class); - } - catch (Exception ex) { - LOG.warn("Initial dedupConfig error, " + ex.getMessage()); - } - - if (dedupConfig != null) { - if (dedupType.equals(DEDUP_TYPE.ENTITY)) { - return new DefaultDeduplicator<>(dedupConfig.getAlertDedupIntervalMin()); - } else if (dedupType.equals(DEDUP_TYPE.EMAIL)) { - return new DefaultDeduplicator<>(dedupConfig.getEmailDedupIntervalMin()); - } - } - - return null; - } - - @Override - public void init() { - String site = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.SITE); - String dataSource = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.DATA_SOURCE); - Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs; - try { - initialAlertDefs = dao.findActiveAlertDefsGroupbyAlertExecutorId(site, dataSource); - } - catch (Exception ex) { - LOG.error("fail to initialize initialAlertDefs: ", ex); - throw new IllegalStateException("fail to initialize initialAlertDefs: ", ex); - } - Map<String, DefaultDeduplicator<AlertAPIEntity>> tmpDeduplicators = new HashMap<String, DefaultDeduplicator<AlertAPIEntity>>(); - if(initialAlertDefs == null || initialAlertDefs.isEmpty()){ - LOG.warn("No alert definitions was found for site: "+site+", dataSource: "+dataSource); - } else { - for (String alertExecutorId: alertExecutorIdList) { - if(initialAlertDefs.containsKey(alertExecutorId)){ - for(AlertDefinitionAPIEntity alertDef : initialAlertDefs.get(alertExecutorId).values()){ - try { - DefaultDeduplicator<AlertAPIEntity> deduplicator = createAlertDedup(alertDef); - if (deduplicator != null) - tmpDeduplicators.put(alertDef.getTags().get(AlertConstants.POLICY_ID), deduplicator); - else LOG.warn("The dedup interval is not set, alertDef: " + alertDef); - } - catch (Throwable t) { - LOG.error("Got an exception when initial dedup config, probably dedup config is not set: " + t.getMessage() + "," + alertDef); - } - } - } else { - LOG.info(String.format("No alert definitions found for site: %s, dataSource: %s, alertExecutorId: %s",site,dataSource,alertExecutorId)); - } - } - } - - alertDedups = new CopyOnWriteHashMap<>(); - alertDedups.putAll(tmpDeduplicators); - DynamicPolicyLoader policyLoader = DynamicPolicyLoader.getInstance(); - policyLoader.init(initialAlertDefs, dao, config); - for (String alertExecutorId : alertExecutorIdList) { - policyLoader.addPolicyChangeListener(alertExecutorId, this); - } - } - - @Override - public void flatMap(java.util.List<Object> input, Collector<Tuple2<String, AlertAPIEntity>> outputCollector){ - String policyId = (String) input.get(0); - AlertAPIEntity alertEntity = (AlertAPIEntity) input.get(1); - DefaultDeduplicator<AlertAPIEntity> dedup; - synchronized(alertDedups) { - dedup = alertDedups.get(policyId); - } - - List<AlertAPIEntity> ret = Arrays.asList(alertEntity); - if (dedup == null) { - LOG.warn("Dedup config for policyId " + policyId + " is not set or is not a valid config"); - } else { - if (dedup.getDedupIntervalMin() == -1) { - LOG.warn("the dedup interval is set as -1, which mean all alerts should be deduped(skipped)"); - return; - } - ret = dedup.dedup(ret); - } - for (AlertAPIEntity entity : ret) { - outputCollector.collect(new Tuple2(policyId, entity)); - } - } - - public void onPolicyCreated(Map<String, AlertDefinitionAPIEntity> added) { - if(LOG.isDebugEnabled()) LOG.debug("Alert dedup config to be added : " + added); - for(AlertDefinitionAPIEntity alertDef : added.values()){ - LOG.info("Alert dedup config really added " + alertDef); - DefaultDeduplicator<AlertAPIEntity> dedup = createAlertDedup(alertDef); - if (dedup != null) { - synchronized(alertDedups) { - alertDedups.put(alertDef.getTags().get(AlertConstants.POLICY_ID), dedup); - } - } - } - } - - public void onPolicyChanged(Map<String, AlertDefinitionAPIEntity> changed) { - LOG.info("Alert dedup config changed : " + changed); - for(AlertDefinitionAPIEntity alertDef : changed.values()){ - LOG.info("Alert dedup config really changed " + alertDef); - DefaultDeduplicator<AlertAPIEntity> dedup = createAlertDedup(alertDef); - if (dedup != null) { - synchronized(alertDedups) { - alertDedups.put(alertDef.getTags().get(AlertConstants.POLICY_ID), dedup); - } - } - } - } - - public void onPolicyDeleted(Map<String, AlertDefinitionAPIEntity> deleted) { - LOG.info("alert dedup config deleted : " + deleted); - for(AlertDefinitionAPIEntity alertDef : deleted.values()){ - LOG.info("alert dedup config deleted " + alertDef); - // no cleanup to do, just remove it - synchronized(alertDedups) { - alertDedups.remove(alertDef.getTags().get(AlertConstants.POLICY_ID)); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertEmailDeduplicationExecutor.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertEmailDeduplicationExecutor.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertEmailDeduplicationExecutor.java deleted file mode 100644 index 7dd0ddc..0000000 --- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertEmailDeduplicationExecutor.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package eagle.alert.dedup; - -import eagle.alert.dao.AlertDefinitionDAO; - -import java.util.List; - -public class AlertEmailDeduplicationExecutor extends AlertDeduplicationExecutorBase { - - private static final long serialVersionUID = 1L; - - public AlertEmailDeduplicationExecutor(List<String> alertExecutorIdList, AlertDefinitionDAO dao){ - super(alertExecutorIdList, DEDUP_TYPE.EMAIL, dao); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertEntityDeduplicationExecutor.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertEntityDeduplicationExecutor.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertEntityDeduplicationExecutor.java deleted file mode 100644 index b6050a0..0000000 --- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/AlertEntityDeduplicationExecutor.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package eagle.alert.dedup; - -import eagle.alert.dao.AlertDefinitionDAO; - -import java.util.List; - -public class AlertEntityDeduplicationExecutor extends AlertDeduplicationExecutorBase { - - private static final long serialVersionUID = 1L; - - public AlertEntityDeduplicationExecutor(List<String> alertExecutorIdList, AlertDefinitionDAO dao){ - super(alertExecutorIdList, DEDUP_TYPE.ENTITY, dao); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/DefaultDeduplicator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/DefaultDeduplicator.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/DefaultDeduplicator.java deleted file mode 100644 index 97e58ac..0000000 --- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/DefaultDeduplicator.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package eagle.alert.dedup; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - -import org.apache.commons.lang.time.DateUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import eagle.log.base.taggedlog.TaggedLogAPIEntity; - -public class DefaultDeduplicator<T extends TaggedLogAPIEntity> implements EntityDeduplicator<T>{ - protected long dedupIntervalMin; - protected Map<EntityTagsUniq, Long> entites = new HashMap<EntityTagsUniq, Long>(); - public static Logger LOG = LoggerFactory.getLogger(DefaultDeduplicator.class); - - public static enum AlertDeduplicationStatus{ - NEW, - DUPLICATED, - IGNORED - } - - public DefaultDeduplicator() { - this.dedupIntervalMin = 0; - } - - public DefaultDeduplicator(long intervalMin) { - this.dedupIntervalMin = intervalMin; - } - - public void clearOldCache() { - List<EntityTagsUniq> removedkeys = new ArrayList<EntityTagsUniq>(); - for (Entry<EntityTagsUniq, Long> entry : entites.entrySet()) { - EntityTagsUniq entity = entry.getKey(); - if (System.currentTimeMillis() - 7 * DateUtils.MILLIS_PER_DAY > entity.createdTime) { - removedkeys.add(entry.getKey()); - } - } - for (EntityTagsUniq alertKey : removedkeys) { - entites.remove(alertKey); - } - } - - public AlertDeduplicationStatus checkDedup(EntityTagsUniq key){ - long current = key.timestamp; - if(!entites.containsKey(key)){ - entites.put(key, current); - return AlertDeduplicationStatus.NEW; - } - - long last = entites.get(key); - if(current - last >= dedupIntervalMin * DateUtils.MILLIS_PER_MINUTE){ - entites.put(key, current); - return AlertDeduplicationStatus.DUPLICATED; - } - - return AlertDeduplicationStatus.IGNORED; - } - - public List<T> dedup(List<T> list) { - clearOldCache(); - List<T> dedupList = new ArrayList<T>(); - int totalCount = list.size(); - int dedupedCount = 0; - for(T entity: list) { - if (entity.getTags() == null) { - if(LOG.isDebugEnabled()) LOG.debug("Tags is null, don't know how to deduplicate, do nothing"); - } else { - AlertDeduplicationStatus status = checkDedup(new EntityTagsUniq(entity.getTags(), entity.getTimestamp())); - if (!status.equals(AlertDeduplicationStatus.IGNORED)) { - dedupList.add(entity); - } else { - dedupedCount++; - if (LOG.isDebugEnabled()) - LOG.debug(String.format("Entity is skipped because it's duplicated: " + entity.toString())); - } - } - } - - if(dedupedCount>0){ - LOG.info(String.format("Skipped %s of %s alerts because they are duplicated",dedupedCount,totalCount)); - }else if(LOG.isDebugEnabled()){ - LOG.debug(String.format("Skipped %s of %s duplicated alerts",dedupedCount,totalCount)); - } - - return dedupList; - } - - public EntityDeduplicator<T> setDedupIntervalMin(long dedupIntervalMin) { - this.dedupIntervalMin = dedupIntervalMin; - return this; - } - - public long getDedupIntervalMin() { - return dedupIntervalMin; - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/EntityDeduplicator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/EntityDeduplicator.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/EntityDeduplicator.java deleted file mode 100644 index f24d31a..0000000 --- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/EntityDeduplicator.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package eagle.alert.dedup; - -import java.util.List; - -import eagle.log.base.taggedlog.TaggedLogAPIEntity; - -/** - * Dedup Eagle entities. - * - * @param <T> Eagle entity - */ -public interface EntityDeduplicator<T extends TaggedLogAPIEntity> { - - EntityDeduplicator<T> setDedupIntervalMin(long intervalMin); - - long getDedupIntervalMin(); - - List<T> dedup(List<T> list); - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/EntityTagsUniq.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/EntityTagsUniq.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/EntityTagsUniq.java deleted file mode 100644 index b0e5dcf..0000000 --- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/dedup/EntityTagsUniq.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * - */ -package eagle.alert.dedup; - -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @since Mar 19, 2015 - */ -public class EntityTagsUniq { - public Map<String, String> tags; - public Long timestamp; // entity's timestamp - public long createdTime; // entityTagsUniq's created time, for cache removal; - - private static final Logger LOG = LoggerFactory.getLogger(EntityTagsUniq.class); - - public EntityTagsUniq(Map<String, String> tags, long timestamp) { - this.tags = new HashMap<String, String>(tags); - this.timestamp = timestamp; - this.createdTime = System.currentTimeMillis(); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof EntityTagsUniq) { - EntityTagsUniq au = (EntityTagsUniq) obj; - if (tags.size() != au.tags.size()) return false; - for (Entry<String, String> keyValue : au.tags.entrySet()) { - boolean keyExist = tags.containsKey(keyValue.getKey()); - // sanity check - if (tags.get(keyValue.getKey()) == null || keyValue.getValue() == null) { - return true; - } - if ( !keyExist || !tags.get(keyValue.getKey()).equals(keyValue.getValue())) { - return false; - } - } - return true; - } - return false; - } - - @Override - public int hashCode() { - int hashCode = 0; - for (Map.Entry<String,String> entry : tags.entrySet()) { - if(entry.getValue() == null) { - LOG.warn("Tag value for key ["+entry.getKey()+"] is null, skipped for hash code"); - }else { - try { - hashCode ^= entry.getValue().hashCode(); - } catch (Throwable t) { - LOG.info("Got exception because of entry: " + entry, t); - } - } - } - return hashCode; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertEmailGenerator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertEmailGenerator.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertEmailGenerator.java deleted file mode 100644 index 58ae733..0000000 --- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertEmailGenerator.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * - */ -package eagle.alert.notification; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.*; - -import eagle.alert.common.AlertConstants; -import eagle.alert.common.AlertEmailSender; -import eagle.alert.email.AlertEmailComponent; -import eagle.alert.email.AlertEmailContext; -import eagle.alert.entity.AlertAPIEntity; -import com.typesafe.config.ConfigObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AlertEmailGenerator{ - private String tplFile; - private String sender; - private String recipients; - private String subject; - private ConfigObject eagleProps; - - private ThreadPoolExecutor executorPool; - - private final static Logger LOG = LoggerFactory.getLogger(AlertEmailGenerator.class); - - private final static long MAX_TIMEOUT_MS =60000; - - public void sendAlertEmail(AlertAPIEntity entity) { - sendAlertEmail(entity, recipients, null); - } - - public void sendAlertEmail(AlertAPIEntity entity, String recipients) { - sendAlertEmail(entity, recipients, null); - } - - public void sendAlertEmail(AlertAPIEntity entity, String recipients, String cc) { - AlertEmailContext email = new AlertEmailContext(); - - AlertEmailComponent component = new AlertEmailComponent(); - component.setAlertContext(entity.getAlertContext()); - List<AlertEmailComponent> components = new ArrayList<AlertEmailComponent>(); - components.add(component); - email.setComponents(components); - if (entity.getAlertContext().getProperty(AlertConstants.SUBJECT) != null) { - email.setSubject(entity.getAlertContext().getProperty(AlertConstants.SUBJECT)); - } - else email.setSubject(subject); - email.setVelocityTplFile(tplFile); - email.setRecipients(recipients); - email.setCc(cc); - email.setSender(sender); - - /** asynchronized email sending */ - @SuppressWarnings("rawtypes") - AlertEmailSender thread = new AlertEmailSender(email, eagleProps); - - if(this.executorPool == null) throw new IllegalStateException("Invoking thread executor pool but it's is not set yet"); - - LOG.info("Sending email in asynchronous to: "+recipients+", cc: "+cc); - Future future = this.executorPool.submit(thread); - try { - future.get(MAX_TIMEOUT_MS, TimeUnit.MILLISECONDS); - LOG.info(String.format("Successfully send email to %s", recipients)); - } catch (InterruptedException | ExecutionException e) { - LOG.error(String.format("Failed to send email to %s, due to:%s",recipients,e),e); - } catch (TimeoutException e) { - LOG.error(String.format("Failed to send email to %s due to timeout exception, max timeout: %s ms ",recipients, MAX_TIMEOUT_MS),e); - } - } - - public String getTplFile() { - return tplFile; - } - - public void setTplFile(String tplFile) { - this.tplFile = tplFile; - } - - public String getSender() { - return sender; - } - - public void setSender(String sender) { - this.sender = sender; - } - - public String getRecipients() { - return recipients; - } - - public void setRecipients(String recipients) { - this.recipients = recipients; - } - - public String getSubject() { - return subject; - } - - public void setSubject(String subject) { - this.subject = subject; - } - - public ConfigObject getEagleProps() { - return eagleProps; - } - - public void setEagleProps(ConfigObject eagleProps) { - this.eagleProps = eagleProps; - } - - public void setExecutorPool(ThreadPoolExecutor executorPool) { - this.executorPool = executorPool; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertEmailGeneratorBuilder.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertEmailGeneratorBuilder.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertEmailGeneratorBuilder.java deleted file mode 100644 index 10be162..0000000 --- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertEmailGeneratorBuilder.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package eagle.alert.notification; - -import java.util.concurrent.ThreadPoolExecutor; - -import com.typesafe.config.ConfigObject; - -public class AlertEmailGeneratorBuilder { - private AlertEmailGenerator generator; - private AlertEmailGeneratorBuilder(){ - generator = new AlertEmailGenerator(); - } - public static AlertEmailGeneratorBuilder newBuilder(){ - return new AlertEmailGeneratorBuilder(); - } - public AlertEmailGeneratorBuilder withSubject(String subject){ - generator.setSubject(subject); - return this; - } - public AlertEmailGeneratorBuilder withSender(String sender){ - generator.setSender(sender); - return this; - } - public AlertEmailGeneratorBuilder withRecipients(String recipients){ - generator.setRecipients(recipients); - return this; - } - public AlertEmailGeneratorBuilder withTplFile(String tplFile){ - generator.setTplFile(tplFile); - return this; - } - public AlertEmailGeneratorBuilder withEagleProps(ConfigObject eagleProps) { - generator.setEagleProps(eagleProps); - return this; - } - public AlertEmailGeneratorBuilder withExecutorPool(ThreadPoolExecutor threadPoolExecutor) { - generator.setExecutorPool(threadPoolExecutor); - return this; - } - - public AlertEmailGenerator build(){ - return this.generator; - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertNotificationExecutor.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertNotificationExecutor.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertNotificationExecutor.java deleted file mode 100644 index 6e5a3d7..0000000 --- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/AlertNotificationExecutor.java +++ /dev/null @@ -1,207 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package eagle.alert.notification; - -import com.fasterxml.jackson.databind.Module; -import com.fasterxml.jackson.databind.jsontype.NamedType; -import com.fasterxml.jackson.databind.module.SimpleModule; -import com.sun.jersey.client.impl.CopyOnWriteHashMap; -import com.typesafe.config.Config; -import eagle.alert.config.EmailNotificationConfig; -import eagle.alert.dao.AlertDefinitionDAO; -import eagle.alert.entity.AlertAPIEntity; -import eagle.alert.entity.AlertDefinitionAPIEntity; -import eagle.alert.policy.DynamicPolicyLoader; -import eagle.alert.policy.PolicyLifecycleMethods; -import eagle.dataproc.core.JsonSerDeserUtils; -import eagle.datastream.Collector; -import eagle.datastream.JavaStormStreamExecutor1; -import eagle.datastream.Tuple1; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.*; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -/** - * notify alert by email, sms or other means - * currently we only implements email notification - */ -public class AlertNotificationExecutor extends JavaStormStreamExecutor1<String> implements PolicyLifecycleMethods { - - private static final long serialVersionUID = 1690354365435407034L; - private static final Logger LOG = LoggerFactory.getLogger(AlertNotificationExecutor.class); - private Config config; - - private List<String> alertExecutorIdList; - private volatile CopyOnWriteHashMap<String, List<AlertEmailGenerator>> alertEmailGeneratorsMap; - private AlertDefinitionDAO dao; - - private final static int DEFAULT_THREAD_POOL_CORE_SIZE = 4; - private final static int DEFAULT_THREAD_POOL_MAX_SIZE = 8; - private final static long DEFAULT_THREAD_POOL_SHRINK_TIME = 60000L; // 1 minute - - private transient ThreadPoolExecutor executorPool; - - public AlertNotificationExecutor(List<String> alertExecutorIdList, AlertDefinitionDAO dao){ - this.alertExecutorIdList = alertExecutorIdList; - this.dao = dao; - } - - public List<AlertEmailGenerator> createAlertEmailGenerator(AlertDefinitionAPIEntity alertDef) { - Module module = new SimpleModule("notification").registerSubtypes(new NamedType(EmailNotificationConfig.class, "email")); - EmailNotificationConfig[] emailConfigs = new EmailNotificationConfig[0]; - try { - emailConfigs = JsonSerDeserUtils.deserialize(alertDef.getNotificationDef(), EmailNotificationConfig[].class, Arrays.asList(module)); - } - catch (Exception ex) { - LOG.warn("Initial emailConfig error, wrong format or it's error " + ex.getMessage()); - } - List<AlertEmailGenerator> gens = new ArrayList<AlertEmailGenerator>(); - if (emailConfigs == null) { - return gens; - } - for(EmailNotificationConfig emailConfig : emailConfigs) { - String tplFileName = emailConfig.getTplFileName(); - if (tplFileName == null || tplFileName.equals("")) { // empty tplFileName, use default tpl file name - tplFileName = "ALERT_DEFAULT.vm"; - } - AlertEmailGenerator gen = AlertEmailGeneratorBuilder.newBuilder(). - withEagleProps(config.getObject("eagleProps")). - withSubject(emailConfig.getSubject()). - withSender(emailConfig.getSender()). - withRecipients(emailConfig.getRecipients()). - withTplFile(tplFileName). - withExecutorPool(executorPool). - build(); - gens.add(gen); - } - return gens; - } - - /** - * 1. register both file and database configuration - * 2. create email generator from configuration - */ - @Override - public void init(){ - executorPool = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_CORE_SIZE, DEFAULT_THREAD_POOL_MAX_SIZE, DEFAULT_THREAD_POOL_SHRINK_TIME, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); - - Map<String, List<AlertEmailGenerator>> tmpEmailGenerators = new HashMap<String, List<AlertEmailGenerator>> (); - - String site = config.getString("eagleProps.site"); - String dataSource = config.getString("eagleProps.dataSource"); - Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs; - try { - initialAlertDefs = dao.findActiveAlertDefsGroupbyAlertExecutorId(site, dataSource); - } - catch (Exception ex) { - LOG.error("fail to initialize initialAlertDefs: ", ex); - throw new IllegalStateException("fail to initialize initialAlertDefs: ", ex); - } - - if(initialAlertDefs == null || initialAlertDefs.isEmpty()){ - LOG.warn("No alert definitions found for site: "+site+", dataSource: "+dataSource); - } - else { - for (String alertExecutorId: alertExecutorIdList) { - if(initialAlertDefs.containsKey(alertExecutorId)) { - for (AlertDefinitionAPIEntity alertDef : initialAlertDefs.get(alertExecutorId).values()) { - List<AlertEmailGenerator> gens = createAlertEmailGenerator(alertDef); - tmpEmailGenerators.put(alertDef.getTags().get("policyId"), gens); - } - }else{ - LOG.info(String.format("No alert definitions found for site: %s, dataSource: %s, alertExecutorId: %s",site,dataSource,alertExecutorId)); - } - } - } - - alertEmailGeneratorsMap = new CopyOnWriteHashMap<String, List<AlertEmailGenerator>>(); - alertEmailGeneratorsMap.putAll(tmpEmailGenerators); - DynamicPolicyLoader policyLoader = DynamicPolicyLoader.getInstance(); - policyLoader.init(initialAlertDefs, dao, config); - for (String alertExecutorId : alertExecutorIdList) { - policyLoader.addPolicyChangeListener(alertExecutorId, this); - } - } - - @Override - public void prepareConfig(Config config) { - this.config = config; - } - - @Override - public void flatMap(java.util.List<Object> input, Collector<Tuple1<String>> outputCollector){ - String policyId = (String) input.get(0); - AlertAPIEntity alertEntity = (AlertAPIEntity) input.get(1); - processAlerts(policyId, Arrays.asList(alertEntity)); - } - - //TODO: add a thread pool for email sender? - private void processAlerts(String policyId, List<AlertAPIEntity> list) { - List<AlertEmailGenerator> generators; - synchronized(alertEmailGeneratorsMap) { - generators = alertEmailGeneratorsMap.get(policyId); - } - if (generators == null) { - LOG.warn("Notification config of policyId " + policyId + " has been deleted"); - return; - } - for (AlertAPIEntity entity : list) { - for(AlertEmailGenerator generator : generators){ - generator.sendAlertEmail(entity); - } - } - } - - @Override - public void onPolicyCreated(Map<String, AlertDefinitionAPIEntity> added) { - if(LOG.isDebugEnabled()) LOG.debug(" alert notification config changed : " + added); - for(AlertDefinitionAPIEntity alertDef : added.values()){ - LOG.info("alert notification config really changed " + alertDef); - List<AlertEmailGenerator> gens = createAlertEmailGenerator(alertDef); - synchronized(alertEmailGeneratorsMap) { - alertEmailGeneratorsMap.put(alertDef.getTags().get("policyId"), gens); - } - } - } - - @Override - public void onPolicyChanged(Map<String, AlertDefinitionAPIEntity> changed) { - if(LOG.isDebugEnabled()) LOG.debug("alert notification config to be added : " + changed); - for(AlertDefinitionAPIEntity alertDef : changed.values()){ - LOG.info("alert notification config really added " + alertDef); - List<AlertEmailGenerator> gens = createAlertEmailGenerator(alertDef); - synchronized(alertEmailGeneratorsMap) { - alertEmailGeneratorsMap.put(alertDef.getTags().get("policyId"), gens); - } - } - } - - @Override - public void onPolicyDeleted(Map<String, AlertDefinitionAPIEntity> deleted) { - if(LOG.isDebugEnabled()) LOG.debug("alert notification config to be deleted : " + deleted); - for(AlertDefinitionAPIEntity alertDef : deleted.values()){ - LOG.info("alert notification config really deleted " + alertDef); - synchronized(alertEmailGeneratorsMap) { - alertEmailGeneratorsMap.remove(alertDef.getTags().get("policyId")); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/UrlBuilder.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/UrlBuilder.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/UrlBuilder.java deleted file mode 100644 index fa5dc82..0000000 --- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/notification/UrlBuilder.java +++ /dev/null @@ -1,47 +0,0 @@ -package eagle.alert.notification; - -import eagle.alert.common.AlertConstants; -import eagle.alert.entity.AlertAPIEntity; -import eagle.common.EagleBase64Wrapper; -import eagle.common.config.EagleConfigConstants; -import eagle.log.entity.HBaseInternalLogHelper; -import eagle.log.entity.InternalLog; -import eagle.log.entity.RowkeyBuilder; -import eagle.log.entity.meta.EntityDefinitionManager; -import org.mortbay.util.UrlEncoded; -import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class UrlBuilder { - - private static final Logger logger = LoggerFactory.getLogger(UrlBuilder.class); - - public static String getEncodedRowkey(AlertAPIEntity entity) throws Exception { - InternalLog log = HBaseInternalLogHelper.convertToInternalLog(entity, EntityDefinitionManager.getEntityDefinitionByEntityClass(entity.getClass())); - return EagleBase64Wrapper.encodeByteArray2URLSafeString(RowkeyBuilder.buildRowkey(log)); - } - - public static String buildAlertDetailUrl(String host, int port, AlertAPIEntity entity) { - String baseUrl = "http://" + host + ":" + String.valueOf(port) + "/eagle-service/#/dam/alertDetail/"; - try { - return baseUrl + UrlEncoded.encodeString(getEncodedRowkey(entity)); - } - catch (Exception ex) { - logger.error("Fail to populate encodedRowkey for alert Entity" + entity.toString()); - return "N/A"; - } - } - - public static String buiildPolicyDetailUrl(String host, int port, Map<String, String> tags) { - String baseUrl = "http://" + host + ":" + String.valueOf(port) + "/eagle-service/#/dam/policyDetail?"; - String format = "policy=%s&site=%s&executor=%s"; - String policy = tags.get(AlertConstants.POLICY_ID); - String site = tags.get(EagleConfigConstants.SITE); - String alertExecutorID = tags.get(AlertConstants.ALERT_EXECUTOR_ID); - if (policy != null && site != null && alertExecutorID != null) { - return baseUrl + String.format(format, policy, site, alertExecutorID); - } - return "N/A"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/persist/AlertPersistExecutor.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/persist/AlertPersistExecutor.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/persist/AlertPersistExecutor.java deleted file mode 100644 index 543405d..0000000 --- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/persist/AlertPersistExecutor.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package eagle.alert.persist; - -import eagle.alert.entity.AlertAPIEntity; -import com.typesafe.config.Config; -import eagle.common.config.EagleConfigConstants; -import eagle.datastream.Collector; -import eagle.datastream.JavaStormStreamExecutor1; -import eagle.datastream.Tuple1; - -import java.util.Arrays; - -public class AlertPersistExecutor extends JavaStormStreamExecutor1<String> { - - private static final long serialVersionUID = 1L; - private Config config; - private EaglePersist persist; - - public AlertPersistExecutor(){ - } - @Override - public void prepareConfig(Config config) { - this.config = config; - } - - @Override - public void init() { - String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST); - int port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT); - String username = config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME) - ? config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME) : null; - String password = config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD) - ? config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD) : null; - this.persist = new EaglePersist(host, port, username, password); - } - - @Override - public void flatMap(java.util.List<Object> input, Collector<Tuple1<String>> outputCollector){ - persist.doPersist(Arrays.asList((AlertAPIEntity)(input.get(1)))); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/persist/EaglePersist.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/persist/EaglePersist.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/persist/EaglePersist.java deleted file mode 100644 index b5820c4..0000000 --- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/persist/EaglePersist.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * - */ -package eagle.alert.persist; - -import eagle.log.base.taggedlog.TaggedLogAPIEntity; -import eagle.log.entity.GenericServiceAPIResponseEntity; -import eagle.service.client.IEagleServiceClient; -import eagle.service.client.impl.EagleServiceClientImpl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; - -public class EaglePersist { - - private static Logger LOG = LoggerFactory.getLogger(EaglePersist.class); - private String eagleServiceHost; - private int eagleServicePort; - private String username; - private String password; - - public EaglePersist(String eagleServiceHost, int eagleServicePort) { - this(eagleServiceHost, eagleServicePort, null, null); - } - - public EaglePersist(String eagleServiceHost, int eagleServicePort, String username, String password) { - this.eagleServiceHost = eagleServiceHost; - this.eagleServicePort = eagleServicePort; - this.username = username; - this.password = password; - } - - public boolean doPersist(List<? extends TaggedLogAPIEntity> list) { - if (list.isEmpty()) return false; - LOG.info("Going to persist entities, type: " + " " + list.get(0).getClass().getSimpleName() + ", list size: " + list.size()); - try { - IEagleServiceClient client = new EagleServiceClientImpl(eagleServiceHost, eagleServicePort, username, password); - GenericServiceAPIResponseEntity<String> response = client.create(list); - client.close(); - if (response.isSuccess()) { - LOG.info("Successfully create entities " + list.toString()); - return true; - } - else { - LOG.error("Fail to create entities"); - return false; - } - } - catch (Exception ex) { - LOG.error("Got an exception in persisting entities" + ex.getMessage(), ex); - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/DefaultPolicyPartitioner.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/DefaultPolicyPartitioner.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/DefaultPolicyPartitioner.java deleted file mode 100644 index ef76c2f..0000000 --- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/DefaultPolicyPartitioner.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package eagle.alert.policy; - - -public class DefaultPolicyPartitioner implements PolicyPartitioner{ - @Override - public int partition(int numTotalPartitions, String policyType, - String policyId) { - final int prime = 31; - int result = 1; - result = result * prime + policyType.hashCode(); - result = result < 0 ? result*-1 : result; - result = result * prime + policyId.hashCode(); - result = result < 0 ? result*-1 : result; - return result % numTotalPartitions; - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/DynamicPolicyLoader.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/DynamicPolicyLoader.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/DynamicPolicyLoader.java deleted file mode 100644 index ba76a15..0000000 --- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/DynamicPolicyLoader.java +++ /dev/null @@ -1,246 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package eagle.alert.policy; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import eagle.common.config.EagleConfigConstants; -import org.apache.commons.collections.CollectionUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import eagle.alert.dao.AlertDefinitionDAO; -import eagle.alert.entity.AlertDefinitionAPIEntity; -import com.netflix.config.AbstractPollingScheduler; -import com.netflix.config.ConcurrentCompositeConfiguration; -import com.netflix.config.DynamicConfiguration; -import com.netflix.config.FixedDelayPollingScheduler; -import com.netflix.config.PollListener; -import com.netflix.config.PollResult; -import com.netflix.config.PolledConfigurationSource; -import com.sun.jersey.client.impl.CopyOnWriteHashMap; -import com.typesafe.config.Config; - -public class DynamicPolicyLoader { - private static final Logger LOG = LoggerFactory.getLogger(DynamicPolicyLoader.class); - - private final int defaultInitialDelayMillis = 30*1000; - private final int defaultDelayMillis = 60*1000; - private final boolean defaultIgnoreDeleteFromSource = true; - private volatile CopyOnWriteHashMap<String, List<PolicyLifecycleMethods>> policyChangeListeners = new CopyOnWriteHashMap<String, List<PolicyLifecycleMethods>>(); - private static DynamicPolicyLoader instance = new DynamicPolicyLoader(); - private volatile boolean initialized = false; - - public void addPolicyChangeListener(String alertExecutorId, PolicyLifecycleMethods alertExecutor){ - synchronized(policyChangeListeners) { - if (policyChangeListeners.get(alertExecutorId) == null) { - policyChangeListeners.put(alertExecutorId, new ArrayList<PolicyLifecycleMethods>()); - } - policyChangeListeners.get(alertExecutorId).add(alertExecutor); - } - } - - public static DynamicPolicyLoader getInstance(){ - return instance; - } - - /** - * singleton with init would be good for unit test as well, and it ensures that - * initialization happens only once before you use it. - * @param config - * @param dao - */ - public void init(Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs, - AlertDefinitionDAO dao, Config config){ - if(!initialized){ - synchronized(this){ - if(!initialized){ - internalInit(initialAlertDefs, dao, config); - initialized = true; - } - } - } - } - - /** - * map from alertExecutorId+partitionId to AlertExecutor which implements PolicyLifecycleMethods - * @param initialAlertDefs - * @param dao - * @param config - */ - private void internalInit(Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs, - AlertDefinitionDAO dao, Config config){ - if(!config.getBoolean("dynamicConfigSource.enabled")) { - return; - } - AbstractPollingScheduler scheduler = new FixedDelayPollingScheduler( - config.getInt("dynamicConfigSource.initDelayMillis"), - config.getInt("dynamicConfigSource.delayMillis"), - false - ); - - scheduler.addPollListener(new PollListener(){ - @Override - public void handleEvent(EventType eventType, PollResult lastResult, - Throwable exception) { - if (lastResult == null) { - LOG.error("The lastResult is null, something must be wrong, probably the eagle service is dead!"); - throw new RuntimeException("The lastResult is null, probably the eagle service is dead! ", exception); - } - Map<String, Object> added = lastResult.getAdded(); - Map<String, Object> changed = lastResult.getChanged(); - Map<String, Object> deleted = lastResult.getDeleted(); - for(Map.Entry<String, List<PolicyLifecycleMethods>> entry : policyChangeListeners.entrySet()){ - String alertExecutorId = entry.getKey(); - for (PolicyLifecycleMethods policyLifecycleMethod : entry.getValue()) { - Map<String, AlertDefinitionAPIEntity> addedPolicies = (Map<String, AlertDefinitionAPIEntity>)added.get(trimPartitionNum(alertExecutorId)); - if(addedPolicies != null && addedPolicies.size() > 0){ - policyLifecycleMethod.onPolicyCreated(addedPolicies); - } - Map<String, AlertDefinitionAPIEntity> changedPolicies = (Map<String, AlertDefinitionAPIEntity>)changed.get(trimPartitionNum(alertExecutorId)); - if(changedPolicies != null && changedPolicies.size() > 0){ - policyLifecycleMethod.onPolicyChanged(changedPolicies); - } - Map<String, AlertDefinitionAPIEntity> deletedPolicies = (Map<String, AlertDefinitionAPIEntity>)deleted.get(trimPartitionNum(alertExecutorId)); - if(deletedPolicies != null && deletedPolicies.size() > 0){ - policyLifecycleMethod.onPolicyDeleted(deletedPolicies); - } - } - } - } - private String trimPartitionNum(String alertExecutorId){ - int i = alertExecutorId.lastIndexOf('_'); - if(i != -1){ - return alertExecutorId.substring(0, i); - } - return alertExecutorId; - } - }); - - ConcurrentCompositeConfiguration finalConfig = new ConcurrentCompositeConfiguration(); - - PolledConfigurationSource source = new DynamicPolicySource(initialAlertDefs, dao, config); - - try{ - DynamicConfiguration dbSourcedConfiguration = new DynamicConfiguration(source, scheduler); - finalConfig.addConfiguration(dbSourcedConfiguration); - }catch(Exception ex){ - LOG.warn("Fail loading from DB, continue without DB sourced configuration", ex); - } - } - - public static class DynamicPolicySource implements PolledConfigurationSource{ - private static Logger LOG = LoggerFactory.getLogger(DynamicPolicySource.class); - private Config config; - private AlertDefinitionDAO dao; - /** - * mapping from alertExecutorId to list of policies - */ - private Map<String, Map<String, AlertDefinitionAPIEntity>> cachedAlertDefs; - - public DynamicPolicySource(Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs, AlertDefinitionDAO dao, Config config){ - this.cachedAlertDefs = initialAlertDefs; - this.dao = dao; - this.config = config; - } - - public PollResult poll(boolean initial, Object checkPoint) throws Exception { - LOG.info("Poll policy from eagle service " + config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST) + - ":" + config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT) ); - Map<String, Map<String, AlertDefinitionAPIEntity>> newAlertDefs = - dao.findActiveAlertDefsGroupbyAlertExecutorId(config.getString("eagleProps.site"), - config.getString("eagleProps.dataSource")); - - // compare runtime alertDefs with cachedAlertDefs and figure out what are added/deleted/updated - Map<String, Object> added = new HashMap<String, Object>(); - Map<String, Object> changed = new HashMap<String, Object>(); - Map<String, Object> deleted = new HashMap<String, Object>(); - - Set<String> newAlertExecutorIds = newAlertDefs.keySet(); - Set<String> cachedAlertExecutorIds = cachedAlertDefs.keySet(); - - // dynamically adding new alert executor is not supported, because alert executor is pre-built while program starts up - Collection<String> addedAlertExecutorIds = CollectionUtils.subtract(newAlertExecutorIds, cachedAlertExecutorIds); - if(addedAlertExecutorIds != null && addedAlertExecutorIds.size() > 0){ - LOG.warn("New alertExecutorIds are found : " + addedAlertExecutorIds); - } - - // if one alert executor is missing, it means all policy under that alert executor should be removed - Collection<String> deletedAlertExecutorIds = CollectionUtils.subtract(cachedAlertExecutorIds, newAlertExecutorIds); - if(deletedAlertExecutorIds != null && deletedAlertExecutorIds.size() > 0){ - LOG.warn("Some alertExecutorIds are deleted : " + deletedAlertExecutorIds); - for(String deletedAlertExecutorId : deletedAlertExecutorIds){ - deleted.put(deletedAlertExecutorId, cachedAlertDefs.get(deletedAlertExecutorId)); - } - } - - // we need calculate added/updated/deleted policy for all executors which are not deleted -// Collection<String> updatedAlertExecutorIds = CollectionUtils.intersection(newAlertExecutorIds, cachedAlertExecutorIds); - Collection<String> updatedAlertExecutorIds = newAlertExecutorIds; - for(String updatedAlertExecutorId : updatedAlertExecutorIds){ - Map<String, AlertDefinitionAPIEntity> newPolicies = newAlertDefs.get(updatedAlertExecutorId); - Map<String, AlertDefinitionAPIEntity> cachedPolicies = cachedAlertDefs.get(updatedAlertExecutorId); - PolicyComparator.compare(updatedAlertExecutorId, newPolicies, cachedPolicies, added, changed, deleted); - } - - cachedAlertDefs = newAlertDefs; - return PollResult.createIncremental(added, changed, deleted, new Date().getTime()); - } - } - - public static class PolicyComparator{ - public static void compare(String alertExecutorId, Map<String, AlertDefinitionAPIEntity> newPolicies, Map<String, AlertDefinitionAPIEntity> cachedPolicies, - Map<String, Object> added, Map<String, Object> changed, Map<String, Object> deleted){ - Set<String> newPolicyIds = newPolicies.keySet(); - Set<String> cachedPolicyIds = cachedPolicies != null ? cachedPolicies.keySet() : new HashSet<String>(); - Collection<String> addedPolicyIds = CollectionUtils.subtract(newPolicyIds, cachedPolicyIds); - Collection<String> deletedPolicyIds = CollectionUtils.subtract(cachedPolicyIds, newPolicyIds); - Collection<String> changedPolicyIds = CollectionUtils.intersection(cachedPolicyIds, newPolicyIds); - if(addedPolicyIds != null && addedPolicyIds.size() > 0){ - Map<String, AlertDefinitionAPIEntity> tmp = new HashMap<String, AlertDefinitionAPIEntity>(); - for(String addedPolicyId : addedPolicyIds){ - tmp.put(addedPolicyId, newPolicies.get(addedPolicyId)); - } - added.put(alertExecutorId, tmp); - } - if(deletedPolicyIds != null && deletedPolicyIds.size() > 0){ - Map<String, AlertDefinitionAPIEntity> tmp = new HashMap<String, AlertDefinitionAPIEntity>(); - for(String deletedPolicyId : deletedPolicyIds){ - tmp.put(deletedPolicyId, cachedPolicies.get(deletedPolicyId)); - } - deleted.put(alertExecutorId, tmp); - } - if(changedPolicyIds != null && changedPolicyIds.size() > 0){ - Map<String, AlertDefinitionAPIEntity> tmp = new HashMap<String, AlertDefinitionAPIEntity>(); - for(String changedPolicyId : changedPolicyIds){ - // check if policy is really changed - if(!newPolicies.get(changedPolicyId).equals(cachedPolicies.get(changedPolicyId))){ - tmp.put(changedPolicyId, newPolicies.get(changedPolicyId)); - } - } - changed.put(alertExecutorId, tmp); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PartitionUtils.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PartitionUtils.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PartitionUtils.java deleted file mode 100644 index c3b53c2..0000000 --- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PartitionUtils.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package eagle.alert.policy; - -import eagle.alert.common.AlertConstants; -import eagle.alert.entity.AlertDefinitionAPIEntity; - -public class PartitionUtils { - - public static boolean accept(AlertDefinitionAPIEntity alertDef, PolicyPartitioner partitioner, int numPartitions, int partitionSeq){ - int targetPartitionSeq = partitioner.partition(numPartitions, alertDef.getTags().get(AlertConstants.POLICY_TYPE), alertDef.getTags().get(AlertConstants.POLICY_ID)); - if(targetPartitionSeq == partitionSeq) - return true; - return false; - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PolicyEvaluator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PolicyEvaluator.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PolicyEvaluator.java deleted file mode 100644 index c209b97..0000000 --- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PolicyEvaluator.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package eagle.alert.policy; - -import java.util.List; -import java.util.Map; - -import eagle.alert.entity.AlertDefinitionAPIEntity; -import eagle.dataproc.core.ValuesArray; - -public interface PolicyEvaluator { - /** - * take input and evaluate expression - * input has 3 fields, first is siddhiAlertContext, second one is streamName, the third is map of attribute name/value - * @param input - * @throws Exception - */ - public void evaluate(ValuesArray input) throws Exception; - - /** - * notify policy evaluator that policy is updated - */ - public void onPolicyUpdate(AlertDefinitionAPIEntity newAlertDef); - - /** - * notify policy evaluator that policy is deleted, here is cleanup work for this policy evaluator - */ - public void onPolicyDelete(); - - /** - * get additional context - */ - public Map<String, String> getAdditionalContext(); -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PolicyEvaluatorServiceProvider.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PolicyEvaluatorServiceProvider.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PolicyEvaluatorServiceProvider.java deleted file mode 100644 index a022f98..0000000 --- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/eagle/alert/policy/PolicyEvaluatorServiceProvider.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package eagle.alert.policy; - -import java.util.List; - -import com.fasterxml.jackson.databind.Module; - -/** - * to provide extensibility, we need a clear differentiation between framework job and provider logic - * policy evaluator framework: - * - connect to eagle data source - * - read all policy definitions - * - compare with cached policy definitions - * - figure out if policy is created, deleted or updated - * - if policy is created, then invoke onPolicyCreated - * - if policy is deleted, then invoke onPolicyDeleted - * - if policy is updated, then invoke onPolicyUpdated - * - for policy update, replace old evaluator engine with new evaluator engine which is created by policy evaluator provider - * - specify # of executors for this alert executor id - * - dynamically balance # of policies evaluated by each alert executor - * - use zookeeper to balance. eaglePolicies/${alertExecutorId}/${alertExecutorInstanceId} => list of policies - * - * policy evaluator business features: - * - register mapping between policy type and PolicyEvaluator - * - create evaluator engine runtime when configuration is changed - * - */ -public interface PolicyEvaluatorServiceProvider { - String getPolicyType(); - Class<? extends PolicyEvaluator> getPolicyEvaluator(); - List<Module> getBindingModules(); -}