Repository: incubator-eagle
Updated Branches:
  refs/heads/master 83bb66661 -> 98dff2480


EAGLE-550: Add Alert publish implementation for Slack

Author: Zeng, Ming
Reviewer: ralphsu

This closes #441


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/98dff248
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/98dff248
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/98dff248

Branch: refs/heads/master
Commit: 98dff2480f9bf7105554c4c2d62aebacafb3a858
Parents: 83bb666
Author: mizeng <miz...@ebaysf.com>
Authored: Wed Sep 14 16:30:29 2016 +0800
Committer: Ralph, Su <suliang...@gmail.com>
Committed: Mon Sep 19 09:53:07 2016 -0700

----------------------------------------------------------------------
 .../eagle-alert/alert-engine/pom.xml            |   4 +
 .../engine/publisher/PublishConstants.java      |   6 +
 .../publisher/impl/AlertSlackPublisher.java     | 173 +++++++++++++++++++
 .../engine/router/TestAlertPublisherBolt.java   |  97 +++++++++--
 .../resources/router/publishments-slack.json    |  20 +++
 .../eagle-alert-parent/eagle-alert/pom.xml      |   5 +
 pom.xml                                         |   1 +
 7 files changed, 289 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98dff248/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml
index e523fd9..1ea6088 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml
@@ -133,6 +133,10 @@
                        <artifactId>de.flapdoodle.embed.mongo</artifactId>
                        <scope>test</scope>
                </dependency>
+        <dependency>
+            <groupId>com.ullink.slack</groupId>
+            <artifactId>simpleslackapi</artifactId>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98dff248/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java
index 54a9afa..ce57a6e 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java
@@ -34,6 +34,12 @@ public class PublishConstants {
     public static final String TOPIC = "topic";
     public static final String BROKER_LIST = "kafka_broker";
 
+    // slack specific constants
+    public static final String TOKEN = "token";
+    public static final String CHANNELS = "channels";
+    public static final String SEVERITYS = "severitys";
+    public static final String URL_TEMPLATE = "urltemplate";
+
     public static final String ALERT_EMAIL_TIME_PROPERTY = "timestamp";
     public static final String ALERT_EMAIL_COUNT_PROPERTY = "count";
     public static final String ALERT_EMAIL_ALERTLIST_PROPERTY = "alertList";

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98dff248/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertSlackPublisher.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertSlackPublisher.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertSlackPublisher.java
new file mode 100644
index 0000000..3e3bd67
--- /dev/null
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertSlackPublisher.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.publisher.impl;
+
+import com.typesafe.config.Config;
+import com.ullink.slack.simpleslackapi.SlackChannel;
+import com.ullink.slack.simpleslackapi.SlackSession;
+import com.ullink.slack.simpleslackapi.impl.SlackSessionFactory;
+import org.apache.commons.lang.StringUtils;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.publisher.PublishConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @since Sep 14, 2016.
+ */
+public class AlertSlackPublisher extends AbstractPublishPlugin {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AlertSlackPublisher.class);
+
+    private SlackSession session;
+    private String slackChannels;
+    private String urlTemplate;
+    private String severitys;
+
+    @Override
+    public void init(Config config, Publishment publishment, Map conf) throws 
Exception {
+        super.init(config, publishment, conf);
+
+        if (publishment.getProperties() != null) {
+            Map<String, String> slackConfig = new 
HashMap<>(publishment.getProperties());
+            final String token = 
slackConfig.get(PublishConstants.TOKEN).trim();
+            slackChannels = slackConfig.get(PublishConstants.CHANNELS).trim();
+            urlTemplate = 
slackConfig.get(PublishConstants.URL_TEMPLATE).trim();
+            severitys = slackConfig.get(PublishConstants.SEVERITYS).trim();
+
+            if (StringUtils.isNotEmpty(token)) {
+                LOG.debug(" Creating Slack Session... ");
+                session = 
SlackSessionFactory.createWebSocketSlackSession(token);
+                session.connect();
+            }
+        }
+
+    }
+
+    @Override
+    public void onAlert(AlertStreamEvent event) throws Exception {
+        if (session == null) {
+            LOG.warn("Slack session is null due to incorrect configurations!");
+            return;
+        }
+        List<AlertStreamEvent> outputEvents = dedup(event);
+        if (outputEvents == null) {
+            return;
+        }
+
+        PublishStatus status = new PublishStatus();
+        for (AlertStreamEvent outputEvent: outputEvents) {
+            String message = "";
+            String severity = "";
+            String docId = "";
+            String device = "";
+            String deviceType = "";
+            String colo = "";
+            // only user defined severity level alert will send to Slack;
+            boolean publishToSlack = false;
+
+            StreamDefinition streamDefinition = outputEvent.getSchema();
+            for (int i = 0; i < outputEvent.getData().length; i++) {
+                if (i > streamDefinition.getColumns().size()) {
+                    if (LOG.isWarnEnabled()) {
+                        LOG.warn("output column does not found for event data, 
this indicate code error!");
+                    }
+                    continue;
+                }
+                String colName = 
streamDefinition.getColumns().get(i).getName();
+                if (colName.equalsIgnoreCase("severity")) {
+                    severity = outputEvent.getData()[i].toString();
+                    publishToSlack = severitys.contains(severity);
+                }
+                if (colName.equalsIgnoreCase("message")) {
+                    message = outputEvent.getData()[i].toString();
+                }
+                if (colName.equalsIgnoreCase("docId")) {
+                    docId = outputEvent.getData()[i].toString();
+                }
+                if (colName.equalsIgnoreCase("df_device")) {
+                    device = outputEvent.getData()[i].toString();
+                }
+                if (colName.equalsIgnoreCase("df_type")) {
+                    deviceType = outputEvent.getData()[i].toString();
+                }
+                if (colName.equalsIgnoreCase("dc")) {
+                    colo = outputEvent.getData()[i].toString();
+                }
+            }
+
+            StringBuilder messageToSlack = new StringBuilder();
+            messageToSlack.append("Message: " + message + "\n");
+            messageToSlack.append("Severity: " + severity + "\n");
+            messageToSlack.append("Device: " + device + "\n");
+            messageToSlack.append("Device Type: " + deviceType + "\n");
+            messageToSlack.append("Colo: " + colo + "\n");
+            messageToSlack.append("Alert Policy: " + event.getPolicyId() + 
"\n");
+            messageToSlack.append("Doc Id: " + docId + "\n");
+
+            if (StringUtils.isNotEmpty(urlTemplate)) {
+                try {
+                    messageToSlack.append("Link to Alert Console: " + 
String.format(urlTemplate, docId) + "\n");
+                } catch (Exception e) {
+                    LOG.warn("There's an error when processing Alert Console 
Link!");
+                }
+
+            }
+
+            if (publishToSlack) {
+                try {
+                    for (String slackChannel: slackChannels.split(",")) {
+                        sendMessageToAChannel(session, slackChannel, 
messageToSlack.toString());
+                    }
+                } catch (Exception e) {
+                    status.successful = false;
+                    status.errorMessage = String.format("Failed to send 
message to slack channel %s, due to:%s", slackChannels, e);
+                    LOG.error(status.errorMessage, e);
+                }
+            }
+        }
+
+    }
+
+    @Override
+    public void close() {
+        try {
+            session.disconnect();
+        } catch (IOException e) {
+            LOG.error(e.getMessage());
+        }
+    }
+
+    @Override
+    protected Logger getLogger() {
+        return LOG;
+    }
+
+    private void sendMessageToAChannel(SlackSession session, String 
channelName, String message) {
+        //get a channel
+        SlackChannel channel = session.findChannelByName(channelName);
+        session.sendMessage(channel, message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98dff248/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
index 3432496..79c03bc 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
@@ -18,11 +18,12 @@
 
 package org.apache.eagle.alert.engine.router;
 
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.CollectionType;
+import com.fasterxml.jackson.databind.type.SimpleType;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 import org.apache.eagle.alert.coordination.model.PublishSpec;
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
 import org.apache.eagle.alert.engine.coordinator.Publishment;
@@ -38,19 +39,13 @@ import 
org.apache.eagle.alert.engine.publisher.impl.AlertPublisherImpl;
 import org.apache.eagle.alert.engine.runner.AlertPublisherBolt;
 import org.apache.eagle.alert.engine.runner.MapComparator;
 import org.apache.eagle.alert.engine.utils.MetadataSerDeser;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
+import org.junit.*;
 import org.mockito.Mockito;
 
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.type.CollectionType;
-import com.fasterxml.jackson.databind.type.SimpleType;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 /**
  * @Since 5/14/16.
@@ -91,7 +86,7 @@ public class TestAlertPublisherBolt {
         policy.setName("policy1");
         alert.setPolicyId(policy.getName());
         alert.setCreatedTime(System.currentTimeMillis());
-        alert.setData(new Object[] {"field_1", 2, "field_3"});
+        alert.setData(new Object[]{"field_1", 2, "field_3"});
         alert.setStreamId(streamId);
         alert.setCreatedBy(this.toString());
         return alert;
@@ -244,4 +239,72 @@ public class TestAlertPublisherBolt {
         
         Mockito.verify(store, Mockito.atLeastOnce()).add(Mockito.anyObject(), 
Mockito.anyObject());
     }
+
+    private AlertStreamEvent createSeverityWithStreamDef(String hostname, 
String appName, String message, String severity, String docId, String 
df_device, String df_type, String colo) {
+        AlertStreamEvent alert = new AlertStreamEvent();
+        PolicyDefinition policy = new PolicyDefinition();
+        policy.setName("switch_check");
+        alert.setPolicyId(policy.getName());
+        alert.setCreatedTime(System.currentTimeMillis());
+        alert.setData(new Object[] {appName, hostname, message, severity, 
docId, df_device, df_type, colo});
+        alert.setStreamId("testAlertStream");
+        alert.setCreatedBy(this.toString());
+
+        // build stream definition
+        StreamDefinition sd = new StreamDefinition();
+        StreamColumn appColumn = new StreamColumn();
+        appColumn.setName("appname");
+        appColumn.setType(StreamColumn.Type.STRING);
+
+        StreamColumn hostColumn = new StreamColumn();
+        hostColumn.setName("hostname");
+        hostColumn.setType(StreamColumn.Type.STRING);
+
+        StreamColumn msgColumn = new StreamColumn();
+        msgColumn.setName("message");
+        msgColumn.setType(StreamColumn.Type.STRING);
+
+        StreamColumn severityColumn = new StreamColumn();
+        severityColumn.setName("severity");
+        severityColumn.setType(StreamColumn.Type.STRING);
+
+        StreamColumn docIdColumn = new StreamColumn();
+        docIdColumn.setName("docId");
+        docIdColumn.setType(StreamColumn.Type.STRING);
+
+        StreamColumn deviceColumn = new StreamColumn();
+        deviceColumn.setName("df_device");
+        deviceColumn.setType(StreamColumn.Type.STRING);
+
+        StreamColumn deviceTypeColumn = new StreamColumn();
+        deviceTypeColumn.setName("df_type");
+        deviceTypeColumn.setType(StreamColumn.Type.STRING);
+
+        StreamColumn coloColumn = new StreamColumn();
+        coloColumn.setName("dc");
+        coloColumn.setType(StreamColumn.Type.STRING);
+
+        sd.setColumns(Arrays.asList(appColumn, hostColumn, msgColumn, 
severityColumn, docIdColumn, deviceColumn, deviceTypeColumn, coloColumn));
+
+        alert.setSchema(sd);
+        return alert;
+    }
+
+    @Test
+    public void testSlackPublishment() throws Exception {
+        Config config = ConfigFactory.load("application-test.conf");
+        AlertPublisher publisher = new AlertPublisherImpl("alertPublishBolt");
+        publisher.init(config, new HashMap());
+        List<Publishment> pubs = 
loadEntities("/router/publishments-slack.json", Publishment.class);
+        publisher.onPublishChange(pubs, null, null, null);
+
+        AlertStreamEvent event1 = createSeverityWithStreamDef("switch1", 
"testapp1", "Memory 1 inconsistency detected", "WARNING", "docId1", "ed01", 
"distribution switch", "us");
+        AlertStreamEvent event2 = createSeverityWithStreamDef("switch2", 
"testapp2", "Memory 2 inconsistency detected", "CRITICAL", "docId2", "ed02", 
"distribution switch", "us");
+        AlertStreamEvent event3 = createSeverityWithStreamDef("switch2", 
"testapp2", "Memory 3 inconsistency detected", "WARNING", "docId3", "ed02", 
"distribution switch", "us");
+
+        publisher.nextEvent(event1);
+        publisher.nextEvent(event2);
+        publisher.nextEvent(event3);
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98dff248/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/router/publishments-slack.json
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/router/publishments-slack.json
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/router/publishments-slack.json
new file mode 100644
index 0000000..3c49b48
--- /dev/null
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/router/publishments-slack.json
@@ -0,0 +1,20 @@
+[
+  {
+    "name": "test-slack-output",
+    "type": "org.apache.eagle.alert.engine.publisher.impl.AlertSlackPublisher",
+    "policyIds": [
+      "switch_check"
+    ],
+    "properties": {
+      "token": "your token",
+      "channels": "your channel1, your channel2",
+      "severitys": "CRITICAL",
+      "urltemplate": "your template/?id=%s"
+    },
+    "dedupIntervalMin": "PT1M",
+    "dedupFields": [
+      "appname"
+    ],
+    "serializer": 
"org.apache.eagle.alert.engine.publisher.impl.StringEventSerializer"
+  }
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98dff248/eagle-core/eagle-alert-parent/eagle-alert/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/pom.xml 
b/eagle-core/eagle-alert-parent/eagle-alert/pom.xml
index 2ad8576..a5f10b9 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/pom.xml
+++ b/eagle-core/eagle-alert-parent/eagle-alert/pom.xml
@@ -306,6 +306,11 @@
                                <version>${embed.mongo.version}</version>
                                <scope>test</scope>
                        </dependency>
+            <dependency>
+                <groupId>com.ullink.slack</groupId>
+                <artifactId>simpleslackapi</artifactId>
+                <version>${ullink.slack.version}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98dff248/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 55f617b..835ab79 100755
--- a/pom.xml
+++ b/pom.xml
@@ -303,6 +303,7 @@
         
<metrics-elasticsearch-reporter.version>2.2.0</metrics-elasticsearch-reporter.version>
 
         <common.cli.version>1.3.1</common.cli.version>
+           <ullink.slack.version>0.6.0</ullink.slack.version>
 
         <!-- dropwizard -->
         <dropwizard.version>0.7.1</dropwizard.version>

Reply via email to