Repository: activemq-artemis
Updated Branches:
  refs/heads/master e0334dff0 -> 171591d23


ARTEMIS-1799 - Add a NotificationActiveMQServerPlugin

Adds a new plugin that will support sending new types of notifications
for broker events which will allow enhanced broker monitoring


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/4795f7c6
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/4795f7c6
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/4795f7c6

Branch: refs/heads/master
Commit: 4795f7c6d0e9242c2fc1f364b1cc025a1ce037b6
Parents: e0334df
Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Authored: Tue Apr 10 09:19:11 2018 -0400
Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Committed: Tue Apr 10 09:43:02 2018 -0400

----------------------------------------------------------------------
 .../core/management/CoreNotificationType.java   |  10 +-
 .../api/core/management/ManagementHelper.java   |   4 +
 .../core/server/impl/ActiveMQServerImpl.java    |   7 +
 .../server/plugin/ActiveMQServerPlugin.java     |  18 ++
 .../impl/NotificationActiveMQServerPlugin.java  | 287 +++++++++++++++++++
 docs/user-manual/en/broker-plugins.md           |  63 ++++
 .../management/NotificationTest.java            | 168 ++++++++++-
 7 files changed, 549 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4795f7c6/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/CoreNotificationType.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/CoreNotificationType.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/CoreNotificationType.java
index ff514ad..686400e 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/CoreNotificationType.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/CoreNotificationType.java
@@ -39,7 +39,15 @@ public enum CoreNotificationType implements NotificationType 
{
    PROPOSAL(18),
    PROPOSAL_RESPONSE(19),
    UNPROPOSAL(20),
-   CONSUMER_SLOW(21);
+   CONSUMER_SLOW(21),
+   ADDRESS_ADDED(22),
+   ADDRESS_REMOVED(23),
+   CONNECTION_CREATED(24),
+   CONNECTION_DESTROYED(25),
+   SESSION_CREATED(26),
+   SESSION_CLOSED(27),
+   MESSAGE_DELIVERED(28),
+   MESSAGE_EXPIRED(29);
 
    private final int value;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4795f7c6/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java
index 66180f2..a682eba 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java
@@ -48,6 +48,8 @@ public final class ManagementHelper {
 
    public static final SimpleString HDR_ADDRESS = new 
SimpleString("_AMQ_Address");
 
+   public static final SimpleString HDR_ROUTING_TYPE = new 
SimpleString("_AMQ_Routing_Type");
+
    public static final SimpleString HDR_BINDING_ID = new 
SimpleString("_AMQ_Binding_ID");
 
    public static final SimpleString HDR_BINDING_TYPE = new 
SimpleString("_AMQ_Binding_Type");
@@ -76,6 +78,8 @@ public final class ManagementHelper {
 
    public static final SimpleString HDR_CONNECTION_NAME = new 
SimpleString("_AMQ_ConnectionName");
 
+   public static final SimpleString HDR_MESSAGE_ID = new 
SimpleString("_AMQ_Message_ID");
+
    // Attributes ----------------------------------------------------
 
    // Static --------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4795f7c6/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 1b4b0d1..96076cf 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -1959,16 +1959,19 @@ public class ActiveMQServerImpl implements 
ActiveMQServer {
    @Override
    public void registerBrokerPlugins(final List<ActiveMQServerPlugin> plugins) 
{
       configuration.registerBrokerPlugins(plugins);
+      plugins.forEach(plugin -> plugin.registered(this));
    }
 
    @Override
    public void registerBrokerPlugin(final ActiveMQServerPlugin plugin) {
       configuration.registerBrokerPlugin(plugin);
+      plugin.registered(this);
    }
 
    @Override
    public void unRegisterBrokerPlugin(final ActiveMQServerPlugin plugin) {
       configuration.unRegisterBrokerPlugin(plugin);
+      plugin.unregistered(this);
    }
 
    @Override
@@ -2340,6 +2343,10 @@ public class ActiveMQServerImpl implements 
ActiveMQServer {
          reloadManager.addCallback(configuration.getConfigurationUrl(), new 
ConfigurationFileReloader());
       }
 
+      if (hasBrokerPlugins()) {
+         callBrokerPlugins(plugin -> plugin.registered(this));
+      }
+
       return true;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4795f7c6/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java
index 25e759f..ed00ab0 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java
@@ -30,6 +30,7 @@ import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.QueueBinding;
 import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
 import org.apache.activemq.artemis.core.security.SecurityAuth;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.QueueConfig;
@@ -49,6 +50,23 @@ import 
org.apache.activemq.artemis.utils.critical.CriticalComponent;
  */
 public interface ActiveMQServerPlugin {
 
+   /**
+    * The plugin has been registered with the server
+    *
+    * @param server The ActiveMQServer the plugin has been registered to
+    */
+   default void registered(ActiveMQServer server) {
+
+   }
+
+   /**
+    * The plugin has been unregistered with the server
+    *
+    * @param server The ActiveMQServer the plugin has been unregistered to
+    */
+   default void unregistered(ActiveMQServer server) {
+
+   }
 
    /**
     * A connection has been created.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4795f7c6/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/NotificationActiveMQServerPlugin.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/NotificationActiveMQServerPlugin.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/NotificationActiveMQServerPlugin.java
new file mode 100644
index 0000000..abaa27f
--- /dev/null
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/NotificationActiveMQServerPlugin.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.plugin.impl;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
+import org.apache.activemq.artemis.api.core.management.ManagementHelper;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.server.ServerSession;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.management.ManagementService;
+import org.apache.activemq.artemis.core.server.management.Notification;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.utils.collections.TypedProperties;
+import org.jboss.logging.Logger;
+
+/**
+ *
+ */
+public class NotificationActiveMQServerPlugin implements ActiveMQServerPlugin {
+
+   private static final Logger logger = 
Logger.getLogger(NotificationActiveMQServerPlugin.class);
+
+   public static final String SEND_CONNECTION_NOTIFICATIONS = 
"SEND_CONNECTION_NOTIFICATIONS";
+   public static final String SEND_SESSION_NOTIFICATIONS = 
"SEND_SESSION_NOTIFICATIONS";
+   public static final String SEND_ADDRESS_NOTIFICATIONS = 
"SEND_ADDRESS_NOTIFICATIONS";
+   public static final String SEND_DELIVERED_NOTIFICATIONS = 
"SEND_DELIVERED_NOTIFICATIONS";
+   public static final String SEND_EXPIRED_NOTIFICATIONS = 
"SEND_EXPIRED_NOTIFICATIONS";
+
+   private boolean sendConnectionNotifications;
+   private boolean sendSessionNotifications;
+   private boolean sendAddressNotifications;
+   private boolean sendDeliveredNotifications;
+   private boolean sendExpiredNotifications;
+
+
+   private final AtomicReference<ManagementService> managementService = new 
AtomicReference<>();
+
+   /**
+    * used to pass configured properties to Plugin
+    *
+    * @param properties
+    */
+   @Override
+   public void init(Map<String, String> properties) {
+      sendConnectionNotifications = 
Boolean.parseBoolean(properties.getOrDefault(SEND_CONNECTION_NOTIFICATIONS,
+            Boolean.FALSE.toString()));
+      sendSessionNotifications = 
Boolean.parseBoolean(properties.getOrDefault(SEND_SESSION_NOTIFICATIONS,
+            Boolean.FALSE.toString()));
+      sendAddressNotifications = 
Boolean.parseBoolean(properties.getOrDefault(SEND_ADDRESS_NOTIFICATIONS,
+            Boolean.FALSE.toString()));
+      sendDeliveredNotifications = 
Boolean.parseBoolean(properties.getOrDefault(SEND_DELIVERED_NOTIFICATIONS,
+            Boolean.FALSE.toString()));
+      sendExpiredNotifications = 
Boolean.parseBoolean(properties.getOrDefault(SEND_EXPIRED_NOTIFICATIONS,
+            Boolean.FALSE.toString()));
+   }
+
+   @Override
+   public void registered(ActiveMQServer server) {
+      managementService.set(server.getManagementService());
+   }
+
+   @Override
+   public void unregistered(ActiveMQServer server) {
+      managementService.set(null);
+   }
+
+   @Override
+   public void afterCreateConnection(RemotingConnection connection) throws 
ActiveMQException {
+      sendConnectionNotification(connection, 
CoreNotificationType.CONNECTION_CREATED);
+   }
+
+   @Override
+   public void afterDestroyConnection(RemotingConnection connection) throws 
ActiveMQException {
+      sendConnectionNotification(connection, 
CoreNotificationType.CONNECTION_DESTROYED);
+   }
+
+   @Override
+   public void afterCreateSession(ServerSession session) throws 
ActiveMQException {
+      sendSessionNotification(session, CoreNotificationType.SESSION_CREATED);
+   }
+
+   @Override
+   public void afterCloseSession(ServerSession session, boolean failed) throws 
ActiveMQException {
+      sendSessionNotification(session, CoreNotificationType.SESSION_CLOSED);
+   }
+
+   @Override
+   public void afterAddAddress(AddressInfo addressInfo, boolean reload) throws 
ActiveMQException {
+      sendAddressNotification(addressInfo, CoreNotificationType.ADDRESS_ADDED);
+   }
+
+   @Override
+   public void afterRemoveAddress(SimpleString address, AddressInfo 
addressInfo) throws ActiveMQException {
+      sendAddressNotification(addressInfo, 
CoreNotificationType.ADDRESS_REMOVED);
+   }
+
+   @Override
+   public void afterDeliver(ServerConsumer consumer, MessageReference 
reference) throws ActiveMQException {
+      final ManagementService managementService = getManagementService();
+
+      if (managementService != null && sendDeliveredNotifications) {
+         try {
+            if 
(!reference.getQueue().getAddress().equals(managementService.getManagementNotificationAddress()))
 {
+               final TypedProperties props = new TypedProperties();
+               props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, 
consumer.getQueueAddress());
+               props.putByteProperty(ManagementHelper.HDR_ROUTING_TYPE, 
consumer.getQueueType().getType());
+               
props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME, 
consumer.getQueueName());
+               props.putLongProperty(ManagementHelper.HDR_CONSUMER_NAME, 
consumer.getID());
+               props.putLongProperty(ManagementHelper.HDR_MESSAGE_ID, 
reference.getMessageID());
+
+               managementService.sendNotification(new Notification(null, 
CoreNotificationType.MESSAGE_DELIVERED, props));
+            }
+         } catch (Exception e) {
+            logger.warn("Error sending notification: " + 
CoreNotificationType.MESSAGE_DELIVERED, e.getMessage(), e);
+         }
+      }
+   }
+
+   @Override
+   public void messageExpired(MessageReference message, SimpleString 
messageExpiryAddress) throws ActiveMQException {
+      final ManagementService managementService = getManagementService();
+
+      if (managementService != null && sendExpiredNotifications) {
+         try {
+            if 
(!message.getQueue().getAddress().equals(managementService.getManagementNotificationAddress()))
 {
+               final TypedProperties props = new TypedProperties();
+               props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, 
message.getQueue().getAddress());
+               props.putByteProperty(ManagementHelper.HDR_ROUTING_TYPE, 
message.getQueue().getRoutingType().getType());
+               
props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME, 
message.getQueue().getName());
+               props.putLongProperty(ManagementHelper.HDR_MESSAGE_ID, 
message.getMessageID());
+               if (message.hasConsumerId()) {
+                  props.putLongProperty(ManagementHelper.HDR_CONSUMER_NAME, 
message.getConsumerId());
+               }
+
+               managementService.sendNotification(new Notification(null, 
CoreNotificationType.MESSAGE_EXPIRED, props));
+            }
+         } catch (Exception e) {
+            logger.warn("Error sending notification: " + 
CoreNotificationType.MESSAGE_EXPIRED, e.getMessage(), e);
+         }
+      }
+   }
+
+   private void sendAddressNotification(AddressInfo addressInfo, final 
CoreNotificationType type) {
+      final ManagementService managementService = getManagementService();
+
+      if (managementService != null && sendAddressNotifications) {
+         try {
+            final TypedProperties props = new TypedProperties();
+            props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, 
addressInfo.getName());
+            props.putByteProperty(ManagementHelper.HDR_ROUTING_TYPE, 
addressInfo.getRoutingType().getType());
+
+            managementService.sendNotification(new Notification(null, type, 
props));
+         } catch (Exception e) {
+            logger.warn("Error sending notification: " + type, e.getMessage(), 
e);
+         }
+      }
+   }
+
+   private void sendConnectionNotification(final RemotingConnection 
connection, final CoreNotificationType type) {
+      final ManagementService managementService = getManagementService();
+
+      if (managementService != null && sendConnectionNotifications) {
+         try {
+            final TypedProperties props = new TypedProperties();
+            
props.putSimpleStringProperty(ManagementHelper.HDR_CONNECTION_NAME, 
SimpleString.toSimpleString(connection.getID().toString()));
+            props.putSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS, 
SimpleString.toSimpleString(connection.getRemoteAddress()));
+
+            managementService.sendNotification(new Notification(null, type, 
props));
+         } catch (Exception e) {
+            logger.warn("Error sending notification: " + type, e.getMessage(), 
e);
+         }
+      }
+   }
+
+   private void sendSessionNotification(final ServerSession session, final 
CoreNotificationType type) {
+      final ManagementService managementService = getManagementService();
+
+      if (managementService != null && sendSessionNotifications) {
+         try {
+            final TypedProperties props = new TypedProperties();
+            
props.putSimpleStringProperty(ManagementHelper.HDR_CONNECTION_NAME, 
SimpleString.toSimpleString(session.getConnectionID().toString()));
+            props.putSimpleStringProperty(ManagementHelper.HDR_USER, 
SimpleString.toSimpleString(session.getUsername()));
+            props.putSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME, 
SimpleString.toSimpleString(session.getName()));
+
+            managementService.sendNotification(new Notification(null, type, 
props));
+         } catch (Exception e) {
+            logger.warn("Error sending notification: " + type, e.getMessage(), 
e);
+         }
+      }
+   }
+
+   /**
+    * @return the sendConnectionNotifications
+    */
+   public boolean isSendConnectionNotifications() {
+      return sendConnectionNotifications;
+   }
+
+   /**
+    * @param sendConnectionNotifications the sendConnectionNotifications to set
+    */
+   public void setSendConnectionNotifications(boolean 
sendConnectionNotifications) {
+      this.sendConnectionNotifications = sendConnectionNotifications;
+   }
+
+   /**
+    * @return the sendSessionNotifications
+    */
+   public boolean isSendSessionNotifications() {
+      return sendSessionNotifications;
+   }
+
+   /**
+    * @param sendSessionNotifications the sendSessionNotifications to set
+    */
+   public void setSendSessionNotifications(boolean sendSessionNotifications) {
+      this.sendSessionNotifications = sendSessionNotifications;
+   }
+
+   /**
+    * @return the sendDeliveredNotifications
+    */
+   public boolean isSendDeliveredNotifications() {
+      return sendDeliveredNotifications;
+   }
+
+   /**
+    * @param sendDeliveredNotifications the sendDeliveredNotifications to set
+    */
+   public void setSendDeliveredNotifications(boolean 
sendDeliveredNotifications) {
+      this.sendDeliveredNotifications = sendDeliveredNotifications;
+   }
+
+   /**
+    * @return the sendExpiredNotifications
+    */
+   public boolean isSendExpiredNotifications() {
+      return sendExpiredNotifications;
+   }
+
+   /**
+    * @param sendExpiredNotifications the sendExpiredNotifications to set
+    */
+   public void setSendExpiredNotifications(boolean sendExpiredNotifications) {
+      this.sendExpiredNotifications = sendExpiredNotifications;
+   }
+
+   /**
+    * @return the sendAddressNotifications
+    */
+   public boolean isSendAddressNotifications() {
+      return sendAddressNotifications;
+   }
+
+   /**
+    * @param sendAddressNotifications the sendAddressNotifications to set
+    */
+   public void setSendAddressNotifications(boolean sendAddressNotifications) {
+      this.sendAddressNotifications = sendAddressNotifications;
+   }
+
+   private ManagementService getManagementService() {
+      return this.managementService.get();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4795f7c6/docs/user-manual/en/broker-plugins.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/broker-plugins.md 
b/docs/user-manual/en/broker-plugins.md
index d0a7040..6fd03a8 100644
--- a/docs/user-manual/en/broker-plugins.md
+++ b/docs/user-manual/en/broker-plugins.md
@@ -131,3 +131,66 @@ Most events in the LoggingActiveMQServerPlugin follow a 
`beforeX` and `afterX` n
 At Log Level `INFO`, the LoggingActiveMQServerPlugin logs an entry when an 
`afterX` notification occurs. By setting the Logger
 "org.apache.activemq.artemis.core.server.plugin.impl" to `DEBUG` Level, log 
entries are generated for both `beforeX` and `afterX` notifications.
 Log Level `DEBUG` will also log more information for a notification when 
available.
+
+## Using the NotificationActiveMQServerPlugin
+
+The NotificationActiveMQServerPlugin can be configured to send extra 
notifications for specific broker events.
+
+You can select which notifications are sent by setting the following 
configuration properties to `true`.
+
+<table summary="NotificationActiveMQServerPlugin configuration" border="1">
+    <colgroup>
+        <col/>
+        <col/>
+    </colgroup>
+    <thead>
+    <tr>
+        <th>Property</th>
+        <th>Property Description</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+        <td>SEND_CONNECTION_NOTIFICATIONS</td>
+        <td>Sends a notification when a Connection is created/destroy. Default 
`false`.</td>
+    </tr>
+    <tr>
+        <td>SEND_SESSION_NOTIFICATIONS</td>
+        <td>Sends a notification when a Session is created/closed. Default 
`false`.</td>
+    </tr>
+    <tr>
+        <td>SEND_ADDRESS_NOTIFICATIONS</td>
+        <td>Sends a notification when an Address is added/removed. Default 
`false`.</td>
+    </tr>
+    <tr>
+        <td>SEND_DELIVERED_NOTIFICATIONS</td>
+        <td>Sends a notification when message is delivered to a consumer. 
Default `false`</td>
+    </tr>
+    <tr>
+        <td>SEND_EXPIRED_NOTIFICATIONS</td>
+        <td>Sends a notification when message has been expired by the broker. 
Default `false`</td>
+    </tr>
+    </tbody>
+</table>
+
+By default the NotificationActiveMQServerPlugin will not send any 
notifications. The plugin is activated by setting one (or a selection)
+of the above configuration properties to `true`.
+
+To configure the plugin, you can add the following configuration to the 
broker. In the example below both SEND_CONNECTION_NOTIFICATIONS
+and SEND_SESSION_NOTIFICATIONS will be sent by the broker.
+
+```xml
+<configuration ...>
+
+...
+    <broker-plugins>
+        <broker-plugin 
class-name="org.apache.activemq.artemis.core.server.plugin.impl.NotificationActiveMQServerPlugin">
+            <property key="SEND_CONNECTION_NOTIFICATIONS" value="true" />
+            <property key="SEND_SESSION_NOTIFICATIONS" value="true" />
+        </broker-plugin>
+    </broker-plugins>
+...
+
+</configuration>
+```
+

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4795f7c6/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/NotificationTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/NotificationTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/NotificationTest.java
index e0e6f4c..09aec0e 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/NotificationTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/NotificationTest.java
@@ -18,9 +18,11 @@ package 
org.apache.activemq.artemis.tests.integration.management;
 
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
@@ -28,16 +30,25 @@ import 
org.apache.activemq.artemis.api.core.management.ManagementHelper;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServers;
+import 
org.apache.activemq.artemis.core.server.plugin.impl.NotificationActiveMQServerPlugin;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.utils.RandomUtil;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import static 
org.apache.activemq.artemis.api.core.management.CoreNotificationType.CONNECTION_CREATED;
+import static 
org.apache.activemq.artemis.api.core.management.CoreNotificationType.CONNECTION_DESTROYED;
+import static 
org.apache.activemq.artemis.api.core.management.CoreNotificationType.SESSION_CREATED;
+import static 
org.apache.activemq.artemis.api.core.management.CoreNotificationType.SESSION_CLOSED;
+import static 
org.apache.activemq.artemis.api.core.management.CoreNotificationType.ADDRESS_ADDED;
+import static 
org.apache.activemq.artemis.api.core.management.CoreNotificationType.ADDRESS_REMOVED;
 import static 
org.apache.activemq.artemis.api.core.management.CoreNotificationType.BINDING_ADDED;
 import static 
org.apache.activemq.artemis.api.core.management.CoreNotificationType.BINDING_REMOVED;
 import static 
org.apache.activemq.artemis.api.core.management.CoreNotificationType.CONSUMER_CLOSED;
 import static 
org.apache.activemq.artemis.api.core.management.CoreNotificationType.CONSUMER_CREATED;
+import static 
org.apache.activemq.artemis.api.core.management.CoreNotificationType.MESSAGE_DELIVERED;
+import static 
org.apache.activemq.artemis.api.core.management.CoreNotificationType.MESSAGE_EXPIRED;
 
 public class NotificationTest extends ActiveMQTestBase {
 
@@ -70,10 +81,11 @@ public class NotificationTest extends ActiveMQTestBase {
 
       session.createQueue(address, queue, durable);
 
-      ClientMessage[] notifications = NotificationTest.consumeMessages(1, 
notifConsumer);
-      Assert.assertEquals(BINDING_ADDED.toString(), 
notifications[0].getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString());
-      Assert.assertEquals(queue.toString(), 
notifications[0].getObjectProperty(ManagementHelper.HDR_ROUTING_NAME).toString());
-      Assert.assertEquals(address.toString(), 
notifications[0].getObjectProperty(ManagementHelper.HDR_ADDRESS).toString());
+      //the first message received will be for the address creation
+      ClientMessage[] notifications = NotificationTest.consumeMessages(2, 
notifConsumer);
+      Assert.assertEquals(BINDING_ADDED.toString(), 
notifications[1].getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString());
+      Assert.assertEquals(queue.toString(), 
notifications[1].getObjectProperty(ManagementHelper.HDR_ROUTING_NAME).toString());
+      Assert.assertEquals(address.toString(), 
notifications[1].getObjectProperty(ManagementHelper.HDR_ADDRESS).toString());
 
       session.deleteQueue(queue);
    }
@@ -110,8 +122,7 @@ public class NotificationTest extends ActiveMQTestBase {
       System.out.println(queue);
       notifConsumer.close();
       notifConsumer = session.createConsumer(notifQueue.toString(), 
ManagementHelper.HDR_ROUTING_NAME + " <> '" +
-         queue +
-         "'");
+         queue + "' AND " + ManagementHelper.HDR_ADDRESS + " <> '" + address + 
"'");
       NotificationTest.flush(notifConsumer);
 
       session.createQueue(address, queue, durable);
@@ -133,7 +144,8 @@ public class NotificationTest extends ActiveMQTestBase {
 
       session.deleteQueue(queue);
 
-      ClientMessage[] notifications = NotificationTest.consumeMessages(1, 
notifConsumer);
+      //There will be 2 notifications, first is for binding removal, second is 
for address removal
+      ClientMessage[] notifications = NotificationTest.consumeMessages(2, 
notifConsumer);
       Assert.assertEquals(BINDING_REMOVED.toString(), 
notifications[0].getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString());
       Assert.assertEquals(queue.toString(), 
notifications[0].getObjectProperty(ManagementHelper.HDR_ROUTING_NAME).toString());
       Assert.assertEquals(address.toString(), 
notifications[0].getObjectProperty(ManagementHelper.HDR_ADDRESS).toString());
@@ -201,6 +213,140 @@ public class NotificationTest extends ActiveMQTestBase {
       session.deleteQueue(queue);
    }
 
+   @Test
+   public void testAddressAdded() throws Exception {
+      SimpleString address = RandomUtil.randomSimpleString();
+
+      NotificationTest.flush(notifConsumer);
+
+      session.createAddress(address, RoutingType.ANYCAST, true);
+
+      ClientMessage[] notifications = NotificationTest.consumeMessages(1, 
notifConsumer);
+      Assert.assertEquals(ADDRESS_ADDED.toString(), 
notifications[0].getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString());
+      Assert.assertEquals(RoutingType.ANYCAST.getType(), 
notifications[0].getObjectProperty(ManagementHelper.HDR_ROUTING_TYPE));
+      Assert.assertEquals(address.toString(), 
notifications[0].getObjectProperty(ManagementHelper.HDR_ADDRESS).toString());
+
+   }
+
+   @Test
+   public void testAddressRemoved() throws Exception {
+      SimpleString address = RandomUtil.randomSimpleString();
+      session.createAddress(address, RoutingType.ANYCAST, true);
+      NotificationTest.flush(notifConsumer);
+
+      server.getPostOffice().removeAddressInfo(address);
+
+      ClientMessage[] notifications = NotificationTest.consumeMessages(1, 
notifConsumer);
+      Assert.assertEquals(ADDRESS_REMOVED.toString(), 
notifications[0].getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString());
+      Assert.assertEquals(RoutingType.ANYCAST.getType(), 
notifications[0].getObjectProperty(ManagementHelper.HDR_ROUTING_TYPE));
+      Assert.assertEquals(address.toString(), 
notifications[0].getObjectProperty(ManagementHelper.HDR_ADDRESS).toString());
+   }
+
+   @Test
+   public void testConnectionCreatedAndDestroyed() throws Exception {
+      NotificationTest.flush(notifConsumer);
+
+      ClientSessionFactory sf = createSessionFactory(locator);
+      ClientSession mySession = sf.createSession("myUser", "myPassword", 
false, true, true, locator.isPreAcknowledge(), locator.getAckBatchSize());
+      mySession.start();
+
+      ClientMessage[] notifications = NotificationTest.consumeMessages(2, 
notifConsumer);
+      Assert.assertEquals(CONNECTION_CREATED.toString(), 
notifications[0].getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString());
+      
Assert.assertNotNull(notifications[0].getObjectProperty(ManagementHelper.HDR_CONNECTION_NAME));
+      final String connectionId = 
notifications[0].getObjectProperty(ManagementHelper.HDR_CONNECTION_NAME).toString();
+
+      Assert.assertEquals(SESSION_CREATED.toString(), 
notifications[1].getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString());
+      
Assert.assertNotNull(notifications[1].getObjectProperty(ManagementHelper.HDR_CONNECTION_NAME));
+      
Assert.assertNotNull(notifications[1].getObjectProperty(ManagementHelper.HDR_SESSION_NAME));
+      Assert.assertEquals(SimpleString.toSimpleString("myUser"), 
notifications[1].getObjectProperty(ManagementHelper.HDR_USER));
+
+      NotificationTest.flush(notifConsumer);
+      mySession.close();
+      sf.close();
+
+      notifications = NotificationTest.consumeMessages(2, notifConsumer);
+
+      Assert.assertEquals(SESSION_CLOSED.toString(), 
notifications[0].getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString());
+      
Assert.assertNotNull(notifications[0].getObjectProperty(ManagementHelper.HDR_CONNECTION_NAME));
+      
Assert.assertNotNull(notifications[0].getObjectProperty(ManagementHelper.HDR_SESSION_NAME));
+      Assert.assertEquals(SimpleString.toSimpleString("myUser"), 
notifications[0].getObjectProperty(ManagementHelper.HDR_USER));
+
+      Assert.assertEquals(CONNECTION_DESTROYED.toString(), 
notifications[1].getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString());
+      
Assert.assertNotNull(notifications[1].getObjectProperty(ManagementHelper.HDR_CONNECTION_NAME));
+      Assert.assertEquals(connectionId, 
notifications[1].getObjectProperty(ManagementHelper.HDR_CONNECTION_NAME).toString());
+   }
+
+   @Test
+   public void testMessageDelivered() throws Exception {
+      ClientSessionFactory sf = createSessionFactory(locator);
+      ClientSession mySession = sf.createSession("myUser", "myPassword", 
false, true, true, locator.isPreAcknowledge(), locator.getAckBatchSize());
+
+      mySession.start();
+
+      SimpleString queue = RandomUtil.randomSimpleString();
+      SimpleString address = RandomUtil.randomSimpleString();
+      boolean durable = RandomUtil.randomBoolean();
+
+      session.createQueue(address, queue, durable);
+
+      ClientConsumer consumer = mySession.createConsumer(queue);
+      ClientProducer producer = mySession.createProducer(address);
+
+      NotificationTest.flush(notifConsumer);
+
+      ClientMessage msg = session.createMessage(false);
+      msg.putStringProperty("someKey", "someValue");
+      producer.send(msg);
+      consumer.receive(1000);
+
+      ClientMessage[] notifications = NotificationTest.consumeMessages(1, 
notifConsumer);
+      Assert.assertEquals(MESSAGE_DELIVERED.toString(), 
notifications[0].getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString());
+      
Assert.assertNotNull(notifications[0].getObjectProperty(ManagementHelper.HDR_MESSAGE_ID));
+      
Assert.assertNotNull(notifications[0].getObjectProperty(ManagementHelper.HDR_CONSUMER_NAME));
+      Assert.assertEquals(address, 
notifications[0].getObjectProperty(ManagementHelper.HDR_ADDRESS));
+      Assert.assertEquals(queue, 
notifications[0].getObjectProperty(ManagementHelper.HDR_ROUTING_NAME));
+      Assert.assertEquals(RoutingType.MULTICAST.getType(), 
notifications[0].getObjectProperty(ManagementHelper.HDR_ROUTING_TYPE));
+
+      consumer.close();
+      session.deleteQueue(queue);
+   }
+
+   @Test
+   public void testMessageExpired() throws Exception {
+      ClientSessionFactory sf = createSessionFactory(locator);
+      ClientSession mySession = sf.createSession("myUser", "myPassword", 
false, true, true, locator.isPreAcknowledge(), locator.getAckBatchSize());
+
+      mySession.start();
+
+      SimpleString queue = RandomUtil.randomSimpleString();
+      SimpleString address = RandomUtil.randomSimpleString();
+      boolean durable = RandomUtil.randomBoolean();
+
+      session.createQueue(address, queue, durable);
+
+      ClientConsumer consumer = mySession.createConsumer(queue);
+      ClientProducer producer = mySession.createProducer(address);
+
+      NotificationTest.flush(notifConsumer);
+
+      ClientMessage msg = session.createMessage(false);
+      msg.putStringProperty("someKey", "someValue");
+      msg.setExpiration(1);
+      producer.send(msg);
+      Thread.sleep(500);
+      consumer.receive(500);
+
+      ClientMessage[] notifications = NotificationTest.consumeMessages(1, 
notifConsumer);
+      Assert.assertEquals(MESSAGE_EXPIRED.toString(), 
notifications[0].getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString());
+      
Assert.assertNotNull(notifications[0].getObjectProperty(ManagementHelper.HDR_MESSAGE_ID));
+      Assert.assertEquals(address, 
notifications[0].getObjectProperty(ManagementHelper.HDR_ADDRESS));
+      Assert.assertEquals(queue, 
notifications[0].getObjectProperty(ManagementHelper.HDR_ROUTING_NAME));
+      Assert.assertEquals(RoutingType.MULTICAST.getType(), 
notifications[0].getObjectProperty(ManagementHelper.HDR_ROUTING_TYPE));
+
+      consumer.close();
+      session.deleteQueue(queue);
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -211,6 +357,14 @@ public class NotificationTest extends ActiveMQTestBase {
       super.setUp();
 
       server = 
addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig(), false));
+      NotificationActiveMQServerPlugin notificationPlugin = new 
NotificationActiveMQServerPlugin();
+      notificationPlugin.setSendAddressNotifications(true);
+      notificationPlugin.setSendConnectionNotifications(true);
+      notificationPlugin.setSendSessionNotifications(true);
+      notificationPlugin.setSendDeliveredNotifications(true);
+      notificationPlugin.setSendExpiredNotifications(true);
+
+      server.registerBrokerPlugin(notificationPlugin);
       server.start();
 
       locator = createInVMNonHALocator();

Reply via email to