Repository: incubator-eagle
Updated Branches:
  refs/heads/master dcfeae759 -> 2ac5985f9


EAGLE-140 Add eagle pipeline, single kafka monitor and hadoop metric monitor 
scripts

https://issues.apache.org/jira/browse/EAGLE-140

Author: @hao <h...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/2ac5985f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/2ac5985f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/2ac5985f

Branch: refs/heads/master
Commit: 2ac5985f963ed06624bdfc679432f5e86e8efb25
Parents: dcfeae7
Author: Hao Chen <h...@apache.org>
Authored: Fri Jan 29 11:30:22 2016 +0800
Committer: Hao Chen <h...@apache.org>
Committed: Fri Jan 29 11:30:22 2016 +0800

----------------------------------------------------------------------
 eagle-assembly/src/main/bin/eagle-ambari.sh     |   0
 eagle-assembly/src/main/bin/eagle-env.sh        |   5 +
 eagle-assembly/src/main/bin/eagle-policy.sh     |   0
 .../src/main/bin/eagle-service-init.sh          |   0
 .../src/main/bin/eagle-topology-init.sh         |  29 +++++
 .../src/main/bin/hadoop-metric-monitor.sh       |  48 ++++++++
 .../main/bin/hbase-securitylog-schema-create.sh |   0
 .../bin/hdfs-securitylog-metadata-create.sh     |   0
 .../src/main/bin/kafka-stream-monitor.sh        |  61 ++++++++++
 eagle-assembly/src/main/bin/pipeline-runner.sh  |  52 +++++++++
 eagle-assembly/src/main/conf/pipeline.conf      |  40 +++++++
 .../main/conf/sandbox-hadoopjmx-pipeline.conf   |  49 +++++++++
 .../main/conf/sandbox-hadoopjmx-topology.conf   |  68 ++++++++++++
 .../eagle-alert/eagle-alert-process/pom.xml     |   6 +-
 .../eagle-stream-pipeline/pom.xml               |  53 +++++----
 .../pipeline/extension/ModuleManager.scala      |  20 ++--
 .../src/test/resources/application.conf         |   2 +-
 .../src/test/resources/pipeline_4.conf          |   2 +-
 .../src/test/resources/pipeline_5.conf          | 110 +++++++++++++++++++
 .../eagle/stream/pipeline/PipelineSpec.scala    |   4 +
 .../storm/StormTopologyExecutorImpl.scala       |  31 +++++-
 .../eagle-embed/eagle-embed-hbase/pom.xml       |   8 +-
 .../eagle-embed/eagle-embed-server/pom.xml      |   8 +-
 eagle-core/eagle-query/pom.xml                  |   4 -
 eagle-external/eagle-log4jkafka/pom.xml         |   8 +-
 eagle-external/hadoop_jmx_collector/config.json |   2 +-
 .../hadoop_jmx_collector/util_func.py           |   2 +-
 .../src/assembly/eagle-topology-assembly.xml    |   2 +
 pom.xml                                         |  23 ++--
 29 files changed, 565 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-assembly/src/main/bin/eagle-ambari.sh
----------------------------------------------------------------------
diff --git a/eagle-assembly/src/main/bin/eagle-ambari.sh 
b/eagle-assembly/src/main/bin/eagle-ambari.sh
old mode 100644
new mode 100755

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-assembly/src/main/bin/eagle-env.sh
----------------------------------------------------------------------
diff --git a/eagle-assembly/src/main/bin/eagle-env.sh 
b/eagle-assembly/src/main/bin/eagle-env.sh
index 30aff25..79ff5fa 100755
--- a/eagle-assembly/src/main/bin/eagle-env.sh
+++ b/eagle-assembly/src/main/bin/eagle-env.sh
@@ -37,3 +37,8 @@ export EAGLE_SERVICE_USER=admin
 # EAGLE_SERVICE_PASSWORD
 export EAGLE_SERVICE_PASSWD=secret
 
+export EAGLE_CLASSPATH=$EAGLE_HOME/conf
+# Add eagle shared library jars
+for file in $EAGLE_HOME/lib/share/*;do
+       EAGLE_CLASSPATH=$EAGLE_CLASSPATH:$file
+done

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-assembly/src/main/bin/eagle-policy.sh
----------------------------------------------------------------------
diff --git a/eagle-assembly/src/main/bin/eagle-policy.sh 
b/eagle-assembly/src/main/bin/eagle-policy.sh
old mode 100644
new mode 100755

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-assembly/src/main/bin/eagle-service-init.sh
----------------------------------------------------------------------
diff --git a/eagle-assembly/src/main/bin/eagle-service-init.sh 
b/eagle-assembly/src/main/bin/eagle-service-init.sh
old mode 100644
new mode 100755

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-assembly/src/main/bin/eagle-topology-init.sh
----------------------------------------------------------------------
diff --git a/eagle-assembly/src/main/bin/eagle-topology-init.sh 
b/eagle-assembly/src/main/bin/eagle-topology-init.sh
old mode 100644
new mode 100755
index 88b756a..f30749f
--- a/eagle-assembly/src/main/bin/eagle-topology-init.sh
+++ b/eagle-assembly/src/main/bin/eagle-topology-init.sh
@@ -90,6 +90,35 @@ echo "Importing AlertStreamService for USERPROFILE"
 curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 
"Content-Type: application/json"  
"http://$EAGLE_SERVICE_HOST:$EAGLE_SERVICE_PORT/eagle-service/rest/entities?serviceName=AlertStreamService";
 \
      -d '[ { "prefix": "alertStream", "tags": { "streamName": "userActivity", 
"site":"sandbox", "dataSource":"userProfile" }, "alertExecutorIdList": [ 
"userProfileAnomalyDetectionExecutor" ] } ]'
 
+#####################################################################
+#            Import stream metadata for HADOOP METRIC
+#####################################################################
+
+## AlertDataSource: data sources bound to sites
+echo "Importing AlertDataSourceService for HADOOP METRIC ... "
+
+curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 
'Content-Type:application/json' 
"http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDataSourceService";
 -d '[{"prefix":"alertDataSource","tags":{"site" : "sandbox", 
"dataSource":"hadoop"}, "enabled": "true", "config" : "", "desc":"HADOOP"}]'
+
+
+## AlertStreamService: alert streams generated from data source
+echo ""
+echo "Importing AlertStreamService for HADOOP METRIC ... "
+curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST \
+-H 'Content-Type:application/json' 
"http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamService";
 \
+-d 
'[{"prefix":"alertStream","tags":{"dataSource":"hadoop","streamName":"hadoopJmxMetric"},"desc":"hadoop
 jmx metric 
stream"},{"prefix":"alertStream","tags":{"dataSource":"hadoop","streamName":"hadoopSysMetric"},"desc":"hadoop
 system metric stream"}]'
+
+## AlertExecutorService: what alert streams are consumed by alert executor
+echo ""
+echo "Importing AlertExecutorService for HADOOP METRIC ... "
+curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 
'Content-Type:application/json' 
"http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertExecutorService";
 -d 
'[{"prefix":"alertExecutor","tags":{"dataSource":"hadoop","alertExecutorId":"hadoopJmxMetricExecutor","streamName":"hadoopJmxMetric"},"desc":"alert
 executor for hadoop jmx stream"}]'
+curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 
'Content-Type:application/json' 
"http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertExecutorService";
 -d 
'[{"prefix":"alertExecutor","tags":{"dataSource":"hadoop","alertExecutorId":"hadoopSysMetricExecutor","streamName":"hadoopSysMetric"},"desc":"alert
 executor for hadoop system stream"}]'
+
+## AlertStreamSchemaService: schema for event from alert stream
+echo ""
+echo "Importing AlertStreamSchemaService for HADOOP METRIC ... "
+curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 
'Content-Type:application/json' 
"http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService";
 -d 
'[{"tags":{"dataSource":"hadoop","attrName":"host","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":"metric
 
host","attrDisplayName":"host"},{"tags":{"dataSource":"hadoop","attrName":"site","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":"metric
 
site","attrDisplayName":"site"},{"tags":{"dataSource":"hadoop","attrName":"timestamp","streamName":"hadoopJmxMetric"},"attrType":"long","attrDescription":"metric
 
timestamp","attrDisplayName":"timestamp"},{"tags":{"dataSource":"hadoop","attrName":"value","streamName":"hadoopJmxMetric"},"attrType":"double","attrDescription":"metric
 
value","attrDisplayName":"value"},{"tags":{"dataSource":"hadoop","attrName":"component","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":"
 service 
component","attrDisplayName":"component"},{"tags":{"dataSource":"hadoop","attrName":"metric","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":"metric
 name","attrDisplayName":"metric"}]'
+curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 
'Content-Type:application/json' 
"http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService";
 -d 
'[{"tags":{"dataSource":"hadoop","attrName":"host","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":"metric
 
host","attrDisplayName":"host"},{"tags":{"dataSource":"hadoop","attrName":"site","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":"metric
 
site","attrDisplayName":"site"},{"tags":{"dataSource":"hadoop","attrName":"timestamp","streamName":"hadoopSysMetric"},"attrType":"long","attrDescription":"metric
 
timestamp","attrDisplayName":"timestamp"},{"tags":{"dataSource":"hadoop","attrName":"value","streamName":"hadoopSysMetric"},"attrType":"double","attrDescription":"metric
 
value","attrDisplayName":"value"},{"tags":{"dataSource":"hadoop","attrName":"component","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":"
 service 
component","attrDisplayName":"component"},{"tags":{"dataSource":"hadoop","attrName":"metric","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":"metric
 name","attrDisplayName":"metric"}]'
+
 ## Finished
 echo ""
 echo "Finished initialization for eagle topology"

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-assembly/src/main/bin/hadoop-metric-monitor.sh
----------------------------------------------------------------------
diff --git a/eagle-assembly/src/main/bin/hadoop-metric-monitor.sh 
b/eagle-assembly/src/main/bin/hadoop-metric-monitor.sh
new file mode 100644
index 0000000..706154a
--- /dev/null
+++ b/eagle-assembly/src/main/bin/hadoop-metric-monitor.sh
@@ -0,0 +1,48 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+source $(dirname $0)/eagle-env.sh
+
+#####################################################################
+#            Import stream metadata for HADOOP METRIC
+#####################################################################
+## AlertDataSource: data sources bound to sites
+echo "Importing AlertDataSourceService for HADOOP METRIC ... "
+
+curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 
'Content-Type:application/json' 
"http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDataSourceService";
 -d '[{"prefix":"alertDataSource","tags":{"site" : "sandbox", 
"dataSource":"hadoop"}, "enabled": "true", "config" : "", "desc":"HADOOP"}]'
+
+
+## AlertStreamService: alert streams generated from data source
+echo ""
+echo "Importing AlertStreamService for HADOOP METRIC ... "
+curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST \
+-H 'Content-Type:application/json' 
"http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamService";
 \
+-d 
'[{"prefix":"alertStream","tags":{"dataSource":"hadoop","streamName":"hadoopJmxMetric"},"desc":"hadoop
 jmx metric 
stream"},{"prefix":"alertStream","tags":{"dataSource":"hadoop","streamName":"hadoopSysMetric"},"desc":"hadoop
 system metric stream"}]'
+
+## AlertExecutorService: what alert streams are consumed by alert executor
+echo ""
+echo "Importing AlertExecutorService for HADOOP METRIC ... "
+curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 
'Content-Type:application/json' 
"http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertExecutorService";
 -d 
'[{"prefix":"alertExecutor","tags":{"dataSource":"hadoop","alertExecutorId":"hadoopJmxMetricExecutor","streamName":"hadoopJmxMetric"},"desc":"alert
 executor for hadoop jmx stream"}]'
+curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 
'Content-Type:application/json' 
"http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertExecutorService";
 -d 
'[{"prefix":"alertExecutor","tags":{"dataSource":"hadoop","alertExecutorId":"hadoopSysMetricExecutor","streamName":"hadoopSysMetric"},"desc":"alert
 executor for hadoop system stream"}]'
+
+## AlertStreamSchemaService: schema for event from alert stream
+echo ""
+echo "Importing AlertStreamSchemaService for HADOOP METRIC ... "
+curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 
'Content-Type:application/json' 
"http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService";
 -d 
'[{"tags":{"dataSource":"hadoop","attrName":"host","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":"metric
 
host","attrDisplayName":"host"},{"tags":{"dataSource":"hadoop","attrName":"site","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":"metric
 
site","attrDisplayName":"site"},{"tags":{"dataSource":"hadoop","attrName":"timestamp","streamName":"hadoopJmxMetric"},"attrType":"long","attrDescription":"metric
 
timestamp","attrDisplayName":"timestamp"},{"tags":{"dataSource":"hadoop","attrName":"value","streamName":"hadoopJmxMetric"},"attrType":"double","attrDescription":"metric
 
value","attrDisplayName":"value"},{"tags":{"dataSource":"hadoop","attrName":"component","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":"
 service 
component","attrDisplayName":"component"},{"tags":{"dataSource":"hadoop","attrName":"metric","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":"metric
 name","attrDisplayName":"metric"}]'
+curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 
'Content-Type:application/json' 
"http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService";
 -d 
'[{"tags":{"dataSource":"hadoop","attrName":"host","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":"metric
 
host","attrDisplayName":"host"},{"tags":{"dataSource":"hadoop","attrName":"site","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":"metric
 
site","attrDisplayName":"site"},{"tags":{"dataSource":"hadoop","attrName":"timestamp","streamName":"hadoopSysMetric"},"attrType":"long","attrDescription":"metric
 
timestamp","attrDisplayName":"timestamp"},{"tags":{"dataSource":"hadoop","attrName":"value","streamName":"hadoopSysMetric"},"attrType":"double","attrDescription":"metric
 
value","attrDisplayName":"value"},{"tags":{"dataSource":"hadoop","attrName":"component","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":"
 service 
component","attrDisplayName":"component"},{"tags":{"dataSource":"hadoop","attrName":"metric","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":"metric
 name","attrDisplayName":"metric"}]'
+
+$EAGLE_HOME/kafka-stream-monitor.sh hadoopJmxMetric hadoopJmxMetricExecutor 
$EAGLE_HOME/conf/sandbox-hadoopjmx-topology.conf
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-assembly/src/main/bin/hbase-securitylog-schema-create.sh
----------------------------------------------------------------------
diff --git a/eagle-assembly/src/main/bin/hbase-securitylog-schema-create.sh 
b/eagle-assembly/src/main/bin/hbase-securitylog-schema-create.sh
old mode 100644
new mode 100755

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-assembly/src/main/bin/hdfs-securitylog-metadata-create.sh
----------------------------------------------------------------------
diff --git a/eagle-assembly/src/main/bin/hdfs-securitylog-metadata-create.sh 
b/eagle-assembly/src/main/bin/hdfs-securitylog-metadata-create.sh
old mode 100644
new mode 100755

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-assembly/src/main/bin/kafka-stream-monitor.sh
----------------------------------------------------------------------
diff --git a/eagle-assembly/src/main/bin/kafka-stream-monitor.sh 
b/eagle-assembly/src/main/bin/kafka-stream-monitor.sh
new file mode 100644
index 0000000..bf9181b
--- /dev/null
+++ b/eagle-assembly/src/main/bin/kafka-stream-monitor.sh
@@ -0,0 +1,61 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+source $(dirname $0)/eagle-env.sh
+
+if [ -e ${EAGLE_HOME}/lib/topology/eagle-topology-*-assembly.jar ];then
+       export jar_name=$(ls 
${EAGLE_HOME}/lib/topology/eagle-topology-*-assembly.jar)
+else
+       echo "ERROR: ${EAGLE_HOME}/lib/topology/eagle-topology-*-assembly.jar 
not found"
+       exit 1
+fi
+
+export main_class=org.apache.eagle.datastream.storm.KafkaStreamMonitor
+
+export alert_stream=$1
+export alert_executor=$2
+export config_file=$3
+
+if [ "$#" -lt "2" ];then
+       echo "ERROR: parameter required"
+       echo "Usage: kafka-stream-monitor.sh [alert_stream alert_executor 
config_file] or [alert_stream config_file]"
+       echo ""
+       exit 1
+fi
+if [ "$#" == "2" ];then
+       config_file=$2
+       alert_executor="${alert_stream}Executor"
+fi
+
+which storm >/dev/null 2>&1
+if [ $? != 0 ];then
+    echo "ERROR: storm not found"
+    exit 1
+else
+       export EAGLE_CLASSPATH=$EAGLE_CLASSPATH:$jar_name:`storm classpath`
+fi
+
+cmd="java -cp $EAGLE_CLASSPATH $main_class -D eagle.stream.name=$alert_stream 
-D eagle.stream.executor=$alert_executor -D config.file=$config_file -D 
envContextConfig.jarFile=$jar_name"
+
+echo "=========="
+echo "Alert Stream: $alert_stream"
+echo "Alert Executor: $alert_executor"
+echo "Alert Config: $config_file"
+echo "=========="
+echo "Run Cmd: $cmd"
+echo $cmd
+$cmd
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-assembly/src/main/bin/pipeline-runner.sh
----------------------------------------------------------------------
diff --git a/eagle-assembly/src/main/bin/pipeline-runner.sh 
b/eagle-assembly/src/main/bin/pipeline-runner.sh
new file mode 100755
index 0000000..65bf3f3
--- /dev/null
+++ b/eagle-assembly/src/main/bin/pipeline-runner.sh
@@ -0,0 +1,52 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+source $(dirname $0)/eagle-env.sh
+
+if [ -e ${EAGLE_HOME}/lib/topology/eagle-topology-*-assembly.jar ];then
+       export jar_name=$(ls 
${EAGLE_HOME}/lib/topology/eagle-topology-*-assembly.jar)
+else
+       echo "ERROR: ${EAGLE_HOME}/lib/topology/eagle-topology-*-assembly.jar 
not found"
+       exit 1
+fi
+if [ -e ${EAGLE_HOME}/conf/pipeline.conf ];then
+       export common_conf=$(ls ${EAGLE_HOME}/conf/pipeline.conf)
+else
+       echo "WARN: ${EAGLE_HOME}/conf/pipeline.conf not found"
+fi
+
+main_class=org.apache.eagle.stream.pipeline.Pipeline
+
+if [ "$#" == "0" ];then
+       echo "ERROR: parameter required"
+       echo "Usage: $0 [pipeline-config]"
+
+       echo ""
+       exit 1
+fi
+
+which storm >/dev/null 2>&1
+if [ $? != 0 ];then
+    echo "ERROR: storm not found"
+    exit 1
+else
+       export EAGLE_CLASSPATH=$EAGLE_CLASSPATH:$jar_name:`storm classpath`
+fi
+
+cmd="java -cp $EAGLE_CLASSPATH $main_class --conf $common_conf --pipeline $1 
-D envContextConfig.jarFile=$jar_name"
+echo $cmd
+$cmd
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-assembly/src/main/conf/pipeline.conf
----------------------------------------------------------------------
diff --git a/eagle-assembly/src/main/conf/pipeline.conf 
b/eagle-assembly/src/main/conf/pipeline.conf
new file mode 100644
index 0000000..c1c7758
--- /dev/null
+++ b/eagle-assembly/src/main/conf/pipeline.conf
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing software
+# distributed under the License is distributed on an "AS IS" BASIS
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+       "envContextConfig": {
+               "env" : "storm"
+               "mode" : "cluster"
+               "nimbusHost":"sandbox.hortonworks.com",
+               "nimbusThriftPort":6627
+       }
+       "eagleProps" : {
+               "dataJoinPollIntervalSec" : 30
+               "mailHost" : "smtp.server.host"
+               "mailSmtpPort":"25"
+               "mailDebug" : "true"
+               "eagleService": {
+                       "host": "localhost"
+                       "port": 9099
+                       "username": "admin"
+                       "password": "secret"
+               }
+       }
+       "dynamicConfigSource" : {
+               "enabled" : true
+               "initDelayMillis" : 0
+               "delayMillis" : 30000
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-assembly/src/main/conf/sandbox-hadoopjmx-pipeline.conf
----------------------------------------------------------------------
diff --git a/eagle-assembly/src/main/conf/sandbox-hadoopjmx-pipeline.conf 
b/eagle-assembly/src/main/conf/sandbox-hadoopjmx-pipeline.conf
new file mode 100644
index 0000000..2a75c59
--- /dev/null
+++ b/eagle-assembly/src/main/conf/sandbox-hadoopjmx-pipeline.conf
@@ -0,0 +1,49 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing software
+# distributed under the License is distributed on an "AS IS" BASIS
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+       config {
+               envContextConfig {
+                       "topologyName" : "sandbox-hadoopjmx-pipeline"
+               }
+               eagleProps {
+                       "site" : "sandbox"
+                       "dataSource": "HADOOP"
+               }
+       }
+
+       dataflow {
+               KafkaSource.hadoopNNJmxStream {
+                       parallism = 1000
+                       topic = "nn_jmx_metric_sandbox"
+                       zkConnection = "sandbox.hortonworks.com:2181"
+                       zkConnectionTimeoutMS = 15000
+                       consumerGroupId = "Consumer"
+                       fetchSize = 1048586
+                       transactionZKServers = "sandbox.hortonworks.com"
+                       transactionZKPort = 2181
+                       transactionZKRoot = "/consumers"
+                       transactionStateUpdateMS = 2000
+                       deserializerClass = 
"org.apache.eagle.datastream.storm.JsonMessageDeserializer"
+               }
+
+               Alert.hadoopNNJmxStreamAlertExecutor {
+                       upStreamNames = [hadoopNNJmxStream]
+                       alertExecutorId = hadoopNNJmxStreamAlertExecutor
+               }
+
+               hadoopNNJmxStream -> hadoopNNJmxStreamAlertExecutor{}
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-assembly/src/main/conf/sandbox-hadoopjmx-topology.conf
----------------------------------------------------------------------
diff --git a/eagle-assembly/src/main/conf/sandbox-hadoopjmx-topology.conf 
b/eagle-assembly/src/main/conf/sandbox-hadoopjmx-topology.conf
new file mode 100644
index 0000000..4787de2
--- /dev/null
+++ b/eagle-assembly/src/main/conf/sandbox-hadoopjmx-topology.conf
@@ -0,0 +1,68 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+  "envContextConfig" : {
+    "env" : "storm",
+    "mode" : "cluster",
+    "topologyName" : "sandbox-hadoopjmx-topology",
+    "stormConfigFile" : "security-auditlog-storm.yaml",
+    "parallelismConfig" : {
+      "kafkaMsgConsumer" : 1,
+      "hadoopJmxMetricExecutor*" : 1
+    },
+    "nimbusHost":"sandbox.hortonworks.com",
+    "nimbusThriftPort":6627
+  },
+  "dataSourceConfig": {
+    "topic" : "nn_jmx_metric_sandbox",
+    "zkConnection" : "sandbox.hortonworks.com:2181",
+    "zkConnectionTimeoutMS" : 15000,
+    "consumerGroupId" : "EagleConsumer",
+    "fetchSize" : 1048586,
+    "deserializerClass" : 
"org.apache.eagle.security.hbase.parse.HbaseAuditLogKafkaDeserializer",
+    "transactionZKServers" : "sandbox.hortonworks.com",
+    "transactionZKPort" : 2181,
+    "transactionZKRoot" : "/consumers",
+    "consumerGroupId" : "eagle.hadoop.consumer",
+    "transactionStateUpdateMS" : 2000
+  },
+  "alertExecutorConfigs" : {
+     "hadoopJmxMetricExecutor" : {
+       "parallelism" : 1,
+       "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
+       "needValidation" : "true"
+     }
+  },
+  "eagleProps" : {
+    "site" : "sandbox",
+    "dataSource": "hadoop",
+    "dataJoinPollIntervalSec" : 30,
+    "mailHost" : "mailHost.com",
+    "mailSmtpPort":"25",
+    "mailDebug" : "true",
+    "eagleService": {
+      "host": "localhost",
+      "port": 9099
+      "username": "admin",
+      "password": "secret"
+    }
+  },
+  "dynamicConfigSource" : {
+    "enabled" : true,
+    "initDelayMillis" : 0,
+    "delayMillis" : 30000
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-core/eagle-alert/eagle-alert-process/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/pom.xml 
b/eagle-core/eagle-alert/eagle-alert-process/pom.xml
index 4161904..012cc64 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/pom.xml
+++ b/eagle-core/eagle-alert/eagle-alert-process/pom.xml
@@ -95,7 +95,7 @@
                     <artifactId>jdk.tools</artifactId>
                 </exclusion>
             </exclusions>
-               </dependency>           
+               </dependency>
                <dependency>
                        <groupId>org.wso2.siddhi</groupId>
                        <artifactId>siddhi-extension-string</artifactId>
@@ -103,10 +103,6 @@
                <dependency>
                <groupId>log4j</groupId>
                        <artifactId>log4j</artifactId>
-        </dependency>        
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-log4j12</artifactId>
         </dependency>
        <dependency>
                        <groupId>org.slf4j</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-core/eagle-data-process/eagle-stream-pipeline/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/pom.xml 
b/eagle-core/eagle-data-process/eagle-stream-pipeline/pom.xml
index e8965f5..0c2732b 100644
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/pom.xml
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/pom.xml
@@ -26,10 +26,10 @@
     <modelVersion>4.0.0</modelVersion>
     <artifactId>eagle-stream-pipeline</artifactId>
     <dependencies>
-        <dependency>
-            <groupId>org.reflections</groupId>
-            <artifactId>reflections</artifactId>
-        </dependency>
+        <!--<dependency>-->
+            <!--<groupId>org.reflections</groupId>-->
+            <!--<artifactId>reflections</artifactId>-->
+        <!--</dependency>-->
         <dependency>
             <groupId>eagle</groupId>
             <artifactId>eagle-service-base</artifactId>
@@ -59,6 +59,10 @@
                     <artifactId>servlet-api</artifactId>
                     <groupId>javax.servlet</groupId>
                 </exclusion>
+                <exclusion>
+                    <artifactId>slf4j-log4j12</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
             </exclusions>
         </dependency>
         <dependency>
@@ -67,14 +71,19 @@
             <version>${project.version}</version>
         </dependency>
         <dependency>
-            <groupId>org.scala-lang</groupId>
-            <artifactId>scala-reflect</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.scala-lang</groupId>
-            <artifactId>scala-compiler</artifactId>
-            <version>${scala.version}.0</version>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <scope>test</scope>
         </dependency>
+        <!--<dependency>-->
+            <!--<groupId>org.scala-lang</groupId>-->
+            <!--<artifactId>scala-reflect</artifactId>-->
+        <!--</dependency>-->
+        <!--<dependency>-->
+            <!--<groupId>org.scala-lang</groupId>-->
+            <!--<artifactId>scala-compiler</artifactId>-->
+            <!--<version>${scala.version}.0</version>-->
+        <!--</dependency>-->
         <dependency>
             <groupId>org.scalatest</groupId>
             <artifactId>scalatest_${scala.version}</artifactId>
@@ -85,22 +94,12 @@
             <artifactId>eagle-stream-process-api</artifactId>
             <version>${project.version}</version>
         </dependency>
-        <dependency>
-            <groupId>com.typesafe.akka</groupId>
-            <artifactId>akka-actor_${scala.version}</artifactId>
-            <version>${akka.actor.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>com.typesafe.akka</groupId>
-            <artifactId>akka-testkit_${scala.version}</artifactId>
-            <version>${akka.actor.version}</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-api</artifactId>
-            <scope>compile</scope>
-        </dependency>
+        <!--<dependency>-->
+            <!--<groupId>com.typesafe.akka</groupId>-->
+            <!--<artifactId>akka-testkit_${scala.version}</artifactId>-->
+            <!--<version>${akka.actor.version}</version>-->
+            <!--<scope>test</scope>-->
+        <!--</dependency>-->
     </dependencies>
     <build>
         <plugins>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/extension/ModuleManager.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/extension/ModuleManager.scala
 
b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/extension/ModuleManager.scala
index 9c3e9d5..2174560 100644
--- 
a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/extension/ModuleManager.scala
+++ 
b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/extension/ModuleManager.scala
@@ -9,7 +9,7 @@ import org.apache.eagle.datastream.core._
 import org.apache.eagle.partition.PartitionStrategy
 import org.apache.eagle.stream.pipeline.parser.Processor
 import org.apache.kafka.clients.producer.{Callback, KafkaProducer, 
ProducerRecord, RecordMetadata}
-import org.slf4j.LoggerFactory
+//import org.slf4j.LoggerFactory
 
 import scala.collection.JavaConverters._
 
@@ -106,11 +106,16 @@ object AggregatorProducer extends ModuleMapper{
   }
 }
 
+object KafkaSinkExecutor{
+//  val LOG = LoggerFactory.getLogger(classOf[KafkaSinkExecutor])
+}
+
 /**
   * @todo currently support single topic now, should support topic selector
   * @param config
   */
 case class KafkaSinkExecutor(config:Map[String,AnyRef]) extends ((AnyRef) => 
Unit) with Serializable{
+
   val TOPIC_KEY = "topic"
   def getDefaultProps = {
     val props = new Properties()
@@ -132,19 +137,17 @@ case class KafkaSinkExecutor(config:Map[String,AnyRef]) 
extends ((AnyRef) => Uni
   @transient var topic:String = null
   @transient var timeoutMs:Long = 3000
 
-  val LOG = LoggerFactory.getLogger(classOf[KafkaSinkExecutor])
-
   private def init():Unit = {
     if(this.initialized != null && this.initialized.get()){
-      LOG.info("Already initialized, skip")
+//      LOG.info("Already initialized, skip")
       return
     }
     this.initialized = new AtomicBoolean(false)
     if (producer != null) {
-      LOG.info(s"Closing $producer")
+//      LOG.info(s"Closing $producer")
       producer.close()
     }
-    LOG.info("Initializing and creating Kafka Producer")
+//    LOG.info("Initializing and creating Kafka Producer")
     if (config.contains(TOPIC_KEY)) {
       this.topic = config.get(TOPIC_KEY).get.asInstanceOf[String]
     } else {
@@ -153,7 +156,7 @@ case class KafkaSinkExecutor(config:Map[String,AnyRef]) 
extends ((AnyRef) => Uni
     val props = getDefaultProps
     props.putAll((config - TOPIC_KEY).asJava)
     producer = new KafkaProducer[String, AnyRef](props)
-    LOG.info(s"Created new KafkaProducer: $producer")
+//    LOG.info(s"Created new KafkaProducer: $producer")
     initialized.set(true)
   }
 
@@ -174,7 +177,8 @@ case class KafkaSinkExecutor(config:Map[String,AnyRef]) 
extends ((AnyRef) => Uni
     producer.send(record,new Callback(){
       override def onCompletion(metadata: RecordMetadata, exception: 
Exception): Unit = {
         if(exception!=null){
-          LOG.error(s"Failed to send record $value to topic: $topic",exception)
+//          LOG.error(s"Failed to send record $value to topic: 
$topic",exception)
+          throw new IllegalStateException(s"Failed to send record $value to 
topic: $topic",exception)
         }
       }
     })

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/application.conf
 
b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/application.conf
index d285a6f..3e8f69c 100644
--- 
a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/application.conf
+++ 
b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/application.conf
@@ -21,7 +21,7 @@
                "mailDebug" : "true"
                "eagleService": {
                        "host": "localhost"
-                       "port": 38080
+                       "port": 9099
                        "username": "admin"
                        "password": "secret"
                }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_4.conf
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_4.conf
 
b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_4.conf
index 609e905..9054f8f 100644
--- 
a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_4.conf
+++ 
b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_4.conf
@@ -21,7 +21,7 @@
                        "topologyName" : "dsl-topology"
                        "parallelismConfig" : {
                                "kafkaMsgConsumer" : 1
-                       }
+                       },
                }
                alertExecutorConfigs {
                        defaultAlertExecutor  {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_5.conf
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_5.conf
 
b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_5.conf
new file mode 100644
index 0000000..d3e5f1d
--- /dev/null
+++ 
b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_5.conf
@@ -0,0 +1,110 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing software
+# distributed under the License is distributed on an "AS IS" BASIS
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+       config {
+               envContextConfig {
+                       "env" : "storm"
+                       "mode" : "cluster"
+                       "topologyName" : "dynamical-topology-5"
+                       "parallelismConfig" : {
+                               "kafkaMsgConsumer" : 1
+                       },
+                       "nimbusHost":"sandbox.hortonworks.com",
+                       "nimbusThriftPort":6627
+               }
+               alertExecutorConfigs {
+                       defaultAlertExecutor  {
+                               "parallelism" : 1
+                               "partitioner" : 
"org.apache.eagle.policy.DefaultPolicyPartitioner"
+                               "needValidation" : "true"
+                       }
+               }
+               eagleProps {
+                       "site" : "sandbox"
+                       "dataSource": "HADOOP"
+               }
+       }
+       
+       dataflow {
+               KafkaSource.JmxStreamOne {
+                       parallism = 1000
+                       topic = "metric_event_1"
+                       zkConnection = "sandbox.hortonworks.com:2181"
+                       zkConnectionTimeoutMS = 15000
+                       consumerGroupId = "Consumer"
+                       fetchSize = 1048586
+                       transactionZKServers = "sandbox.hortonworks.com"
+                       transactionZKPort = 2181
+                       transactionZKRoot = "/consumers"
+                       transactionStateUpdateMS = 2000
+                       deserializerClass = 
"org.apache.eagle.datastream.storm.JsonMessageDeserializer"
+               }
+
+               KafkaSource.JmxStreamTwo {
+                       parallism = 1000
+                       topic = "metric_event_2"
+                       zkConnection = "sandbox.hortonworks.com:2181"
+                       zkConnectionTimeoutMS = 15000
+                       consumerGroupId = "Consumer"
+                       fetchSize = 1048586
+                       transactionZKServers = "sandbox.hortonworks.com"
+                       transactionZKPort = 2181
+                       transactionZKRoot = "/consumers"
+                       transactionStateUpdateMS = 2000
+                       deserializerClass = 
"org.apache.eagle.datastream.storm.JsonMessageDeserializer"
+               }
+
+               KafkaSource.JmxStreamThree{
+                       parallism = 1000
+                       topic = "metric_event_3"
+                       zkConnection = "sandbox.hortonworks.com:2181"
+                       zkConnectionTimeoutMS = 15000
+                       consumerGroupId = "Consumer"
+                       fetchSize = 1048586
+                       transactionZKServers = "sandbox.hortonworks.com"
+                       transactionZKPort = 2181
+                       transactionZKRoot = "/consumers"
+                       transactionStateUpdateMS = 2000
+                       deserializerClass = 
"org.apache.eagle.datastream.storm.JsonMessageDeserializer"
+               }
+
+               Console.printer {
+                       format = "%s"
+               }
+
+               KafkaSink.metricStore {
+                       "topic" = "metric_event_persist"
+                       "bootstrap.servers" = "sandbox.hortonworks.com:6667"
+               }
+
+               Alert.defaultAlertExecutor {
+                       // upStreamNames = 
[JmxStreamOne,JmxStreamTwo,JmxStreamThree]
+                       // alertExecutorId = defaultAlertExecutor
+               }
+
+               JmxStreamOne|JmxStreamTwo|JmxStreamThree -> 
defaultAlertExecutor {
+                       grouping = shuffle
+               }
+
+               JmxStreamOne|JmxStreamTwo|JmxStreamThree -> metricStore {
+                       grouping = shuffle
+               }
+
+               JmxStreamOne|JmxStreamTwo|JmxStreamThree -> printer {
+                       grouping = shuffle
+               }
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/PipelineSpec.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/PipelineSpec.scala
 
b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/PipelineSpec.scala
index c87475b..5e2007d 100644
--- 
a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/PipelineSpec.scala
+++ 
b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/PipelineSpec.scala
@@ -52,6 +52,10 @@ object PipelineSpec_4 extends App {
   Pipeline(args).submit[storm]("pipeline_4.conf")
 }
 
+object PipelineSpec_5 extends App {
+  Pipeline(args).submit[storm]("pipeline_5.conf")
+}
+
 object PipelineCLISpec extends App{
   Pipeline.main(args)
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyExecutorImpl.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyExecutorImpl.scala
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyExecutorImpl.scala
index 1f753d1..5d64c4c 100644
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyExecutorImpl.scala
+++ 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyExecutorImpl.scala
@@ -23,6 +23,7 @@ import backtype.storm.generated.StormTopology
 import backtype.storm.utils.Utils
 import backtype.storm.{Config, LocalCluster, StormSubmitter}
 import org.apache.eagle.datastream.core.AbstractTopologyExecutor
+import org.apache.thrift7.transport.TTransportException
 import org.slf4j.LoggerFactory
 import org.yaml.snakeyaml.Yaml
 
@@ -37,6 +38,7 @@ case class StormTopologyExecutorImpl(topology: StormTopology, 
config: com.typesa
     conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, Int.box(32))
     conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, Int.box(16384))
     conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, Int.box(16384))
+    conf.put(Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE, Int.box(20480000))
 
     if(config.hasPath("envContextConfig.stormConfigFile")) {
       val file = new File(config.getString("envContextConfig.stormConfigFile"))
@@ -59,8 +61,32 @@ case class StormTopologyExecutorImpl(topology: 
StormTopology, config: com.typesa
 
     val topologyName = config.getString("envContextConfig.topologyName")
     if (!localMode) {
+      if(config.hasPath("envContextConfig.nimbusHost")) {
+        LOG.info(s"Setting ${backtype.storm.Config.NIMBUS_HOST} as 
${config.getString("envContextConfig.nimbusHost")}")
+        conf.put(backtype.storm.Config.NIMBUS_HOST, 
config.getString("envContextConfig.nimbusHost"))
+      }
+
+      if(config.hasPath("envContextConfig.nimbusThriftPort")) {
+        LOG.info(s"Setting ${backtype.storm.Config.NIMBUS_THRIFT_PORT} as 
${config.getString("envContextConfig.nimbusThriftPort")}")
+        conf.put(backtype.storm.Config.NIMBUS_THRIFT_PORT, 
config.getNumber("envContextConfig.nimbusThriftPort"))
+      }
+
+      if(config.hasPath("envContextConfig.jarFile")){
+        LOG.info(s"Setting storm.jar as 
${config.getString("envContextConfig.jarFile")}")
+        
System.setProperty("storm.jar",config.getString("envContextConfig.jarFile"))
+      }
+
       LOG.info("Submitting as cluster mode")
-      StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, 
topology)
+      try {
+        StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, 
topology)
+      } catch {
+        case e:TTransportException =>
+          LOG.error(s"Got thrift exception, type: ${e.getType}")
+          throw e
+      }
+      finally {
+        System.clearProperty("storm.jar")
+      }
     } else {
       LOG.info("Submitting as local mode")
       val cluster: LocalCluster = new LocalCluster
@@ -68,8 +94,7 @@ case class StormTopologyExecutorImpl(topology: StormTopology, 
config: com.typesa
       while(true) {
         try {
           Utils.sleep(Integer.MAX_VALUE)
-        }
-        catch {
+        } catch {
           case _: Throwable => () // Do nothing
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-core/eagle-embed/eagle-embed-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-embed/eagle-embed-hbase/pom.xml 
b/eagle-core/eagle-embed/eagle-embed-hbase/pom.xml
index 5325fe6..ab9a253 100644
--- a/eagle-core/eagle-embed/eagle-embed-hbase/pom.xml
+++ b/eagle-core/eagle-embed/eagle-embed-hbase/pom.xml
@@ -43,10 +43,10 @@
             <groupId>log4j</groupId>
             <artifactId>log4j</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-log4j12</artifactId>
-        </dependency>
+        <!--<dependency>-->
+            <!--<groupId>org.slf4j</groupId>-->
+            <!--<artifactId>slf4j-log4j12</artifactId>-->
+        <!--</dependency>-->
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>log4j-over-slf4j</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-core/eagle-embed/eagle-embed-server/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-embed/eagle-embed-server/pom.xml 
b/eagle-core/eagle-embed/eagle-embed-server/pom.xml
index e16f852..d5b5e50 100644
--- a/eagle-core/eagle-embed/eagle-embed-server/pom.xml
+++ b/eagle-core/eagle-embed/eagle-embed-server/pom.xml
@@ -57,10 +57,10 @@
                <groupId>log4j</groupId>
                        <artifactId>log4j</artifactId>
         </dependency>        
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-log4j12</artifactId>
-        </dependency>
+        <!--<dependency>-->
+            <!--<groupId>org.slf4j</groupId>-->
+            <!--<artifactId>slf4j-log4j12</artifactId>-->
+        <!--</dependency>-->
        <dependency>
                        <groupId>org.slf4j</groupId>
                        <artifactId>log4j-over-slf4j</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-core/eagle-query/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/pom.xml b/eagle-core/eagle-query/pom.xml
index af9a26e..b6068ee 100644
--- a/eagle-core/eagle-query/pom.xml
+++ b/eagle-core/eagle-query/pom.xml
@@ -54,10 +54,6 @@
                </dependency>
                <dependency>
                        <groupId>org.slf4j</groupId>
-                       <artifactId>slf4j-log4j12</artifactId>
-               </dependency>
-               <dependency>
-                       <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-api</artifactId>
                </dependency>
                <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-external/eagle-log4jkafka/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-external/eagle-log4jkafka/pom.xml 
b/eagle-external/eagle-log4jkafka/pom.xml
index 30567ac..2c4871b 100644
--- a/eagle-external/eagle-log4jkafka/pom.xml
+++ b/eagle-external/eagle-log4jkafka/pom.xml
@@ -34,10 +34,10 @@
             <groupId>log4j</groupId>
             <artifactId>log4j</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-log4j12</artifactId>
-        </dependency>
+        <!--<dependency>-->
+            <!--<groupId>org.slf4j</groupId>-->
+            <!--<artifactId>slf4j-log4j12</artifactId>-->
+        <!--</dependency>-->
         <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka-clients</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-external/hadoop_jmx_collector/config.json
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/config.json 
b/eagle-external/hadoop_jmx_collector/config.json
index 7ca926e..1f5cb7e 100644
--- a/eagle-external/hadoop_jmx_collector/config.json
+++ b/eagle-external/hadoop_jmx_collector/config.json
@@ -13,7 +13,7 @@
    "output": {
      "kafka": {
        "topic": "nn_jmx_metric_sandbox",
-       "brokerList": ["localhost:9092"]
+       "brokerList": ["sandbox.hortonworks.com:6667"]
      }
    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-external/hadoop_jmx_collector/util_func.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/util_func.py 
b/eagle-external/hadoop_jmx_collector/util_func.py
index bebc2ed..c02abbd 100644
--- a/eagle-external/hadoop_jmx_collector/util_func.py
+++ b/eagle-external/hadoop_jmx_collector/util_func.py
@@ -41,7 +41,7 @@ def kafka_close(kafka, producer):
 def send_output_message(producer, topic, kafka_dict, metric, value):
     kafka_dict["timestamp"] = int(round(time.time() * 1000))
     kafka_dict["metric"] = metric.lower()
-    kafka_dict["value"] = str(value)
+    kafka_dict["value"] = float(str(value))
     kafka_json = json.dumps(kafka_dict)
 
     if producer != None:

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-topology-assembly/src/assembly/eagle-topology-assembly.xml
----------------------------------------------------------------------
diff --git a/eagle-topology-assembly/src/assembly/eagle-topology-assembly.xml 
b/eagle-topology-assembly/src/assembly/eagle-topology-assembly.xml
index e15e77d..e397efa 100644
--- a/eagle-topology-assembly/src/assembly/eagle-topology-assembly.xml
+++ b/eagle-topology-assembly/src/assembly/eagle-topology-assembly.xml
@@ -46,6 +46,8 @@
                 <exclude>log4j:log4j</exclude>
                 <exclude>asm:asm</exclude>
                 <exclude>org.apache.log4j.wso2:log4j</exclude>
+                <exclude>log4j:apache-log4j-extras</exclude>
+                <exclude>org.apache.zookeeper:zookeeper</exclude>
             </excludes>
         </dependencySet>
     </dependencySets>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9d13f74..65b852a 100755
--- a/pom.xml
+++ b/pom.xml
@@ -193,9 +193,10 @@
         <!-- Utility -->
         <joda-time.version>2.7</joda-time.version>
         <joda-convert.version>1.7</joda-convert.version>
+        <!--<log4j.version>1.2.17</log4j.version>-->
         <log4j.version>1.2.17</log4j.version>
-        <slf4j.version>1.7.5</slf4j.version>
-        <log4j-over-slf4j.version>1.7.2</log4j-over-slf4j.version>
+        <slf4j.version>1.6.5</slf4j.version>
+        <log4j-over-slf4j.version>1.6.6</log4j-over-slf4j.version>
         <quartz.version>2.2.1</quartz.version>
         <scopt.version>3.3.0</scopt.version>
         <akka.actor.version>2.3.14</akka.actor.version>
@@ -335,17 +336,22 @@
                 <version>${slf4j.version}</version>
                 <scope>compile</scope>
             </dependency>
+            <!--<dependency>-->
+                <!--<groupId>org.slf4j</groupId>-->
+                <!--<artifactId>slf4j-log4j12</artifactId>-->
+                <!--<version>${slf4j.version}</version>-->
+                <!--<scope>compile</scope>-->
+            <!--</dependency>-->
             <dependency>
                 <groupId>org.slf4j</groupId>
-                <artifactId>slf4j-log4j12</artifactId>
-                <version>${slf4j.version}</version>
-                <scope>compile</scope>
+                <artifactId>log4j-over-slf4j</artifactId>
+                <version>${log4j-over-slf4j.version}</version>
             </dependency>
             <dependency>
                 <groupId>org.slf4j</groupId>
-                <artifactId>log4j-over-slf4j</artifactId>
+                <artifactId>slf4j-simple</artifactId>
                 <version>${slf4j.version}</version>
-                <scope>compile</scope>
+                <scope>test</scope>
             </dependency>
             <dependency>
                 <groupId>log4j</groupId>
@@ -910,8 +916,7 @@
                                 <exclude>**/dev-supports/**/*.json</exclude>
                                 
<exclude>**/dev-supports/**/useractivity-agg-json.txt</exclude>
                                 
<exclude>**/conf/sandbox-userprofile-topology.conf</exclude>
-                                <exclude>**/kafka-python/**</exclude>
-                                <exclude>**/six/**</exclude>
+                                <exclude>**/hadoop_jmx_collector/**</exclude>
                                 <!-- Fonts and Images -->
                                 <exclude>**/fonts/**</exclude>
                                 <exclude>**/images/**</exclude>

Reply via email to