Repository: incubator-eagle Updated Branches: refs/heads/master 83bb66661 -> 98dff2480
EAGLE-550: Add Alert publish implementation for Slack Author: Zeng, Ming Reviewer: ralphsu This closes #441 Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/98dff248 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/98dff248 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/98dff248 Branch: refs/heads/master Commit: 98dff2480f9bf7105554c4c2d62aebacafb3a858 Parents: 83bb666 Author: mizeng <miz...@ebaysf.com> Authored: Wed Sep 14 16:30:29 2016 +0800 Committer: Ralph, Su <suliang...@gmail.com> Committed: Mon Sep 19 09:53:07 2016 -0700 ---------------------------------------------------------------------- .../eagle-alert/alert-engine/pom.xml | 4 + .../engine/publisher/PublishConstants.java | 6 + .../publisher/impl/AlertSlackPublisher.java | 173 +++++++++++++++++++ .../engine/router/TestAlertPublisherBolt.java | 97 +++++++++-- .../resources/router/publishments-slack.json | 20 +++ .../eagle-alert-parent/eagle-alert/pom.xml | 5 + pom.xml | 1 + 7 files changed, 289 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98dff248/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml index e523fd9..1ea6088 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml @@ -133,6 +133,10 @@ <artifactId>de.flapdoodle.embed.mongo</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>com.ullink.slack</groupId> + <artifactId>simpleslackapi</artifactId> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98dff248/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java index 54a9afa..ce57a6e 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java @@ -34,6 +34,12 @@ public class PublishConstants { public static final String TOPIC = "topic"; public static final String BROKER_LIST = "kafka_broker"; + // slack specific constants + public static final String TOKEN = "token"; + public static final String CHANNELS = "channels"; + public static final String SEVERITYS = "severitys"; + public static final String URL_TEMPLATE = "urltemplate"; + public static final String ALERT_EMAIL_TIME_PROPERTY = "timestamp"; public static final String ALERT_EMAIL_COUNT_PROPERTY = "count"; public static final String ALERT_EMAIL_ALERTLIST_PROPERTY = "alertList"; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98dff248/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertSlackPublisher.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertSlackPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertSlackPublisher.java new file mode 100644 index 0000000..3e3bd67 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertSlackPublisher.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.eagle.alert.engine.publisher.impl; + +import com.typesafe.config.Config; +import com.ullink.slack.simpleslackapi.SlackChannel; +import com.ullink.slack.simpleslackapi.SlackSession; +import com.ullink.slack.simpleslackapi.impl.SlackSessionFactory; +import org.apache.commons.lang.StringUtils; +import org.apache.eagle.alert.engine.coordinator.Publishment; +import org.apache.eagle.alert.engine.coordinator.StreamDefinition; +import org.apache.eagle.alert.engine.model.AlertStreamEvent; +import org.apache.eagle.alert.engine.publisher.PublishConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @since Sep 14, 2016. + */ +public class AlertSlackPublisher extends AbstractPublishPlugin { + private static final Logger LOG = LoggerFactory.getLogger(AlertSlackPublisher.class); + + private SlackSession session; + private String slackChannels; + private String urlTemplate; + private String severitys; + + @Override + public void init(Config config, Publishment publishment, Map conf) throws Exception { + super.init(config, publishment, conf); + + if (publishment.getProperties() != null) { + Map<String, String> slackConfig = new HashMap<>(publishment.getProperties()); + final String token = slackConfig.get(PublishConstants.TOKEN).trim(); + slackChannels = slackConfig.get(PublishConstants.CHANNELS).trim(); + urlTemplate = slackConfig.get(PublishConstants.URL_TEMPLATE).trim(); + severitys = slackConfig.get(PublishConstants.SEVERITYS).trim(); + + if (StringUtils.isNotEmpty(token)) { + LOG.debug(" Creating Slack Session... "); + session = SlackSessionFactory.createWebSocketSlackSession(token); + session.connect(); + } + } + + } + + @Override + public void onAlert(AlertStreamEvent event) throws Exception { + if (session == null) { + LOG.warn("Slack session is null due to incorrect configurations!"); + return; + } + List<AlertStreamEvent> outputEvents = dedup(event); + if (outputEvents == null) { + return; + } + + PublishStatus status = new PublishStatus(); + for (AlertStreamEvent outputEvent: outputEvents) { + String message = ""; + String severity = ""; + String docId = ""; + String device = ""; + String deviceType = ""; + String colo = ""; + // only user defined severity level alert will send to Slack; + boolean publishToSlack = false; + + StreamDefinition streamDefinition = outputEvent.getSchema(); + for (int i = 0; i < outputEvent.getData().length; i++) { + if (i > streamDefinition.getColumns().size()) { + if (LOG.isWarnEnabled()) { + LOG.warn("output column does not found for event data, this indicate code error!"); + } + continue; + } + String colName = streamDefinition.getColumns().get(i).getName(); + if (colName.equalsIgnoreCase("severity")) { + severity = outputEvent.getData()[i].toString(); + publishToSlack = severitys.contains(severity); + } + if (colName.equalsIgnoreCase("message")) { + message = outputEvent.getData()[i].toString(); + } + if (colName.equalsIgnoreCase("docId")) { + docId = outputEvent.getData()[i].toString(); + } + if (colName.equalsIgnoreCase("df_device")) { + device = outputEvent.getData()[i].toString(); + } + if (colName.equalsIgnoreCase("df_type")) { + deviceType = outputEvent.getData()[i].toString(); + } + if (colName.equalsIgnoreCase("dc")) { + colo = outputEvent.getData()[i].toString(); + } + } + + StringBuilder messageToSlack = new StringBuilder(); + messageToSlack.append("Message: " + message + "\n"); + messageToSlack.append("Severity: " + severity + "\n"); + messageToSlack.append("Device: " + device + "\n"); + messageToSlack.append("Device Type: " + deviceType + "\n"); + messageToSlack.append("Colo: " + colo + "\n"); + messageToSlack.append("Alert Policy: " + event.getPolicyId() + "\n"); + messageToSlack.append("Doc Id: " + docId + "\n"); + + if (StringUtils.isNotEmpty(urlTemplate)) { + try { + messageToSlack.append("Link to Alert Console: " + String.format(urlTemplate, docId) + "\n"); + } catch (Exception e) { + LOG.warn("There's an error when processing Alert Console Link!"); + } + + } + + if (publishToSlack) { + try { + for (String slackChannel: slackChannels.split(",")) { + sendMessageToAChannel(session, slackChannel, messageToSlack.toString()); + } + } catch (Exception e) { + status.successful = false; + status.errorMessage = String.format("Failed to send message to slack channel %s, due to:%s", slackChannels, e); + LOG.error(status.errorMessage, e); + } + } + } + + } + + @Override + public void close() { + try { + session.disconnect(); + } catch (IOException e) { + LOG.error(e.getMessage()); + } + } + + @Override + protected Logger getLogger() { + return LOG; + } + + private void sendMessageToAChannel(SlackSession session, String channelName, String message) { + //get a channel + SlackChannel channel = session.findChannelByName(channelName); + session.sendMessage(channel, message); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98dff248/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java index 3432496..79c03bc 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java @@ -18,11 +18,12 @@ package org.apache.eagle.alert.engine.router; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.type.CollectionType; +import com.fasterxml.jackson.databind.type.SimpleType; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; import org.apache.eagle.alert.coordination.model.PublishSpec; import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; import org.apache.eagle.alert.engine.coordinator.Publishment; @@ -38,19 +39,13 @@ import org.apache.eagle.alert.engine.publisher.impl.AlertPublisherImpl; import org.apache.eagle.alert.engine.runner.AlertPublisherBolt; import org.apache.eagle.alert.engine.runner.MapComparator; import org.apache.eagle.alert.engine.utils.MetadataSerDeser; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; +import org.junit.*; import org.mockito.Mockito; -import com.fasterxml.jackson.databind.JavaType; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.type.CollectionType; -import com.fasterxml.jackson.databind.type.SimpleType; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; /** * @Since 5/14/16. @@ -91,7 +86,7 @@ public class TestAlertPublisherBolt { policy.setName("policy1"); alert.setPolicyId(policy.getName()); alert.setCreatedTime(System.currentTimeMillis()); - alert.setData(new Object[] {"field_1", 2, "field_3"}); + alert.setData(new Object[]{"field_1", 2, "field_3"}); alert.setStreamId(streamId); alert.setCreatedBy(this.toString()); return alert; @@ -244,4 +239,72 @@ public class TestAlertPublisherBolt { Mockito.verify(store, Mockito.atLeastOnce()).add(Mockito.anyObject(), Mockito.anyObject()); } + + private AlertStreamEvent createSeverityWithStreamDef(String hostname, String appName, String message, String severity, String docId, String df_device, String df_type, String colo) { + AlertStreamEvent alert = new AlertStreamEvent(); + PolicyDefinition policy = new PolicyDefinition(); + policy.setName("switch_check"); + alert.setPolicyId(policy.getName()); + alert.setCreatedTime(System.currentTimeMillis()); + alert.setData(new Object[] {appName, hostname, message, severity, docId, df_device, df_type, colo}); + alert.setStreamId("testAlertStream"); + alert.setCreatedBy(this.toString()); + + // build stream definition + StreamDefinition sd = new StreamDefinition(); + StreamColumn appColumn = new StreamColumn(); + appColumn.setName("appname"); + appColumn.setType(StreamColumn.Type.STRING); + + StreamColumn hostColumn = new StreamColumn(); + hostColumn.setName("hostname"); + hostColumn.setType(StreamColumn.Type.STRING); + + StreamColumn msgColumn = new StreamColumn(); + msgColumn.setName("message"); + msgColumn.setType(StreamColumn.Type.STRING); + + StreamColumn severityColumn = new StreamColumn(); + severityColumn.setName("severity"); + severityColumn.setType(StreamColumn.Type.STRING); + + StreamColumn docIdColumn = new StreamColumn(); + docIdColumn.setName("docId"); + docIdColumn.setType(StreamColumn.Type.STRING); + + StreamColumn deviceColumn = new StreamColumn(); + deviceColumn.setName("df_device"); + deviceColumn.setType(StreamColumn.Type.STRING); + + StreamColumn deviceTypeColumn = new StreamColumn(); + deviceTypeColumn.setName("df_type"); + deviceTypeColumn.setType(StreamColumn.Type.STRING); + + StreamColumn coloColumn = new StreamColumn(); + coloColumn.setName("dc"); + coloColumn.setType(StreamColumn.Type.STRING); + + sd.setColumns(Arrays.asList(appColumn, hostColumn, msgColumn, severityColumn, docIdColumn, deviceColumn, deviceTypeColumn, coloColumn)); + + alert.setSchema(sd); + return alert; + } + + @Test + public void testSlackPublishment() throws Exception { + Config config = ConfigFactory.load("application-test.conf"); + AlertPublisher publisher = new AlertPublisherImpl("alertPublishBolt"); + publisher.init(config, new HashMap()); + List<Publishment> pubs = loadEntities("/router/publishments-slack.json", Publishment.class); + publisher.onPublishChange(pubs, null, null, null); + + AlertStreamEvent event1 = createSeverityWithStreamDef("switch1", "testapp1", "Memory 1 inconsistency detected", "WARNING", "docId1", "ed01", "distribution switch", "us"); + AlertStreamEvent event2 = createSeverityWithStreamDef("switch2", "testapp2", "Memory 2 inconsistency detected", "CRITICAL", "docId2", "ed02", "distribution switch", "us"); + AlertStreamEvent event3 = createSeverityWithStreamDef("switch2", "testapp2", "Memory 3 inconsistency detected", "WARNING", "docId3", "ed02", "distribution switch", "us"); + + publisher.nextEvent(event1); + publisher.nextEvent(event2); + publisher.nextEvent(event3); + + } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98dff248/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/router/publishments-slack.json ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/router/publishments-slack.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/router/publishments-slack.json new file mode 100644 index 0000000..3c49b48 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/router/publishments-slack.json @@ -0,0 +1,20 @@ +[ + { + "name": "test-slack-output", + "type": "org.apache.eagle.alert.engine.publisher.impl.AlertSlackPublisher", + "policyIds": [ + "switch_check" + ], + "properties": { + "token": "your token", + "channels": "your channel1, your channel2", + "severitys": "CRITICAL", + "urltemplate": "your template/?id=%s" + }, + "dedupIntervalMin": "PT1M", + "dedupFields": [ + "appname" + ], + "serializer": "org.apache.eagle.alert.engine.publisher.impl.StringEventSerializer" + } +] \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98dff248/eagle-core/eagle-alert-parent/eagle-alert/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/pom.xml b/eagle-core/eagle-alert-parent/eagle-alert/pom.xml index 2ad8576..a5f10b9 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/pom.xml +++ b/eagle-core/eagle-alert-parent/eagle-alert/pom.xml @@ -306,6 +306,11 @@ <version>${embed.mongo.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>com.ullink.slack</groupId> + <artifactId>simpleslackapi</artifactId> + <version>${ullink.slack.version}</version> + </dependency> </dependencies> </dependencyManagement> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98dff248/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 55f617b..835ab79 100755 --- a/pom.xml +++ b/pom.xml @@ -303,6 +303,7 @@ <metrics-elasticsearch-reporter.version>2.2.0</metrics-elasticsearch-reporter.version> <common.cli.version>1.3.1</common.cli.version> + <ullink.slack.version>0.6.0</ullink.slack.version> <!-- dropwizard --> <dropwizard.version>0.7.1</dropwizard.version>