METRON-1445: Update performance tuning guide with more explicit parameter 
instructions (mmiklavc via mmiklavc) closes apache/metron#988


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/e0949142
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/e0949142
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/e0949142

Branch: refs/heads/feature/METRON-1090-stellar-assignment
Commit: e0949142dd682a84e59fea09066de8024f106f13
Parents: daf543b
Author: mmiklavc <michael.miklav...@gmail.com>
Authored: Tue Apr 17 12:31:37 2018 -0600
Committer: Michael Miklavcic <michael.miklav...@gmail.com>
Committed: Tue Apr 17 12:31:37 2018 -0600

----------------------------------------------------------------------
 metron-platform/Performance-tuning-guide.md     | 244 +++++++++++-
 metron-platform/metron-common/README.md         |  32 ++
 .../src/main/scripts/cluster_info.py            | 389 +++++++++++++++++++
 3 files changed, 659 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/e0949142/metron-platform/Performance-tuning-guide.md
----------------------------------------------------------------------
diff --git a/metron-platform/Performance-tuning-guide.md 
b/metron-platform/Performance-tuning-guide.md
index 7d79ace..e2d1ae2 100644
--- a/metron-platform/Performance-tuning-guide.md
+++ b/metron-platform/Performance-tuning-guide.md
@@ -17,6 +17,14 @@ limitations under the License.
 -->
 # Metron Performance Tuning Guide
 
+- [Overview](#overview)
+- [General Tuning Suggestions](#general-tuning-suggestions)
+- [Component Tuning Levers](#component-tuning-levers)
+- [Use Case Specific Tuning Suggestions](#use-case-specific-tuning-suggestions)
+- [Debugging](#debugging)
+- [Issues](#issues)
+- [Reference](#reference)
+
 ## Overview
 
 This document provides guidance from our experiences tuning the Apache Metron 
Storm topologies for maximum performance. You'll find
@@ -31,20 +39,33 @@ pipe, and the majority of these options assist in tweaking 
the various pipe widt
 
 ## General Tuning Suggestions
 
+### Storm Executors vs. Tasks
+
 Note that there is currently no method for specifying the number of tasks from 
the number of executors in Flux topologies (enrichment,
  indexing). By default, the number of tasks will equal the number of 
executors. Logically, setting the number of tasks equal to the number
 of executors is sensible. Storm enforces num executors <= num tasks. The 
reason you might set the number of tasks higher than the number of
 executors is for future performance tuning and rebalancing without the need to 
bring down your topologies. The number of tasks is fixed
 at topology startup time whereas the number of executors can be increased up 
to a maximum value equal to the number of tasks.
 
-When configuring Storm Kafka spouts, we found that the default values for 
poll.timeout.ms, offset.commit.period.ms, and max.uncommitted.offsets worked 
well in nearly all cases.
-As a general rule, it was optimal to set spout parallelism equal to the number 
of partitions used in your Kafka topic. Any greater
+### Kafka Spout Configuration
+
+When configuring Storm Kafka spouts, we found that the default values for
+
+- `poll.timeout.ms`
+- `offset.commit.period.ms`
+- `max.uncommitted.offsets `
+
+worked well in nearly all cases. As a general rule, it was optimal to set 
spout parallelism equal to the number of partitions used in your Kafka topic. 
Any greater
 parallelism will leave you with idle consumers since Kafka limits the max 
number of consumers to the number of partitions. This is
 important because Kafka has certain ordering guarantees for message delivery 
per partition that would not be possible if more than
 one consumer in a given consumer group were able to read from that partition.
 
 ## Component Tuning Levers
 
+### High Level Overview
+
+There are a number of levers that can be set while tuning a Metron cluster. 
The main services to interact with for performance tuning are: Kafka, Storm, 
HDFS, and indexing (Elasticsearch or Solr). For each service, here is a high 
level breakdown of the major knobs and levers that can be modified while tuning 
your cluster.
+
 - Kafka
     - Number partitions
 - Storm
@@ -70,12 +91,15 @@ for more details.
 
 ### Storm Tuning
 
+#### Overview
+
 There are quite a few options you will be confronted with when tuning your 
Storm topologies and this is largely trial and error. As a general rule of 
thumb,
 we recommend starting with the defaults and smaller numbers in terms of 
parallelism while iteratively working up until the desired performance is 
achieved.
 You will find the offset lag tool indispensable while verifying your settings.
 
 We won't go into a full discussion about Storm's architecture - see references 
section for more info - but there are some general rules of thumb that should be
 followed. It's first important to understand the ways you can impact 
parallelism in a Storm topology.
+
 - num tasks
 - num executors (parallelism hint)
 - num workers
@@ -83,10 +107,10 @@ followed. It's first important to understand the ways you 
can impact parallelism
 Tasks are instances of a given spout or bolt, executors are threads in a 
process, and workers are jvm processes. You'll want the number of tasks as a 
multiple of the number of executors,
 the number of executors as multiple of the number of workers, and the number 
of workers as a multiple of the number of machines. The main reason for this 
approach is
  that it will give a uniform distribution of work to each machine and jvm 
process. More often than not, your number of tasks will be equal to the number 
of executors, which
- is the default in Storm. Flux does not actually provide a way to 
independently set number of tasks, so for enrichments and indexing which use 
Flux, num tasks will always equal
+ is the default in Storm. Flux does not actually provide a way to 
independently set number of tasks, so for enrichments and indexing, which use 
Flux, num tasks will always equal
  num executors.
 
-You can change the number of workers via the property `topology.workers`
+You can change the number of workers via the Storm property `topology.workers`
 
 __Other Storm Settings__
 
@@ -96,12 +120,15 @@ topology.max.spout.pending
 This is the maximum number of tuples that can be in flight (ie, not yet acked) 
at any given time within your topology. You set this as a form of backpressure 
to ensure
 you don't flood your topology.
 
+
 ```
 topology.ackers.executors
 ```
+
 This specifies how many threads should be dedicated to tuple acking. We found 
that setting this equal to the number of partitions in your inbound Kafka topic 
worked well.
 
 __spout-config.json__
+
 ```
 {
     ...
@@ -111,15 +138,146 @@ __spout-config.json__
 }
 ```
 
-These are the spout recommended defaults from Storm and are currently the 
defaults provided in the Kafka spout itself. In fact, if you find the 
recommended defaults work fine for you,
+Above is a snippet for configuring parsers. These are the spout recommended 
defaults from Storm and are currently the defaults provided in the Kafka spout 
itself. In fact, if you find the recommended defaults work fine for you,
 then you can omit these settings altogether.
 
+#### Where to Find Tuning Properties
+
+**Important:** The parser topologies are deployed via a builder pattern that 
takes parameters from the CLI as set via Ambari. The enrichment and indexing 
topologies are configured
+using a Storm Flux file, a configuration properties file, and Ambari. Here is 
a setting materialization summary for each of the topology types:
+
+- Parsers
+       - Management UI -> parser json config and CLI -> Storm
+- Enrichment
+       - Ambari UI -> properties file -> Flux -> Storm
+- Indexing
+       - Ambari UI -> properties file -> Flux -> Storm
+
+**Parsers**
+
+This is a mapping of the various performance tuning properties for parsers and 
how they are materialized.
+
+See more detail on starting parsers 
[here](https://github.com/apache/metron/blob/master/metron-platform/metron-parsers/README.md#starting-the-parser-topology)
+
+| Category                    | Management UI Property Name                | 
JSON Config File Property Name     | CLI Option                                 
                                                    | Storm Property Name       
      |  Notes                                                                  
      |
+|-----------------------------|--------------------------------------------|------------------------------------|------------------------------------------------------------------------------------------------|---------------------------------|-------------------------------------------------------------------------------|
+| Storm topology config       | Num Workers                                | 
n/a                                | -nw,--num_workers <NUM_WORKERS>            
                                                    | topology.workers          
      |                                                                         
      |
+|                             | Num Ackers                                 | 
n/a                                | -na,--num_ackers <NUM_ACKERS>              
                                                    | topology.acker.executors  
      |                                                                         
      |
+|                             | Storm Config                               | 
topology.max.spout.pending         | -e,--extra_topology_options <JSON_FILE>, 
e.g. { "topology.max.spout.pending" : NUM }           | 
topology.max.spout.pending      | Put property in JSON format in a file named 
`storm-<MY_PARSER>-config.json`   |
+| Kafka spout                 | Spout Parallelism                          | 
n/a                                | -sp,--spout_p <SPOUT_PARALLELISM_HINT>     
                                                    | n/a                       
      |                                                                         
      |
+|                             | Spout Num Tasks                            | 
n/a                                | -snt,--spout_num_tasks <NUM_TASKS>         
                                                    | n/a                       
      |                                                                         
      |
+|                             | Spout Config                               | 
spout.pollTimeoutMs                | -esc,--extra_kafka_spout_config 
<JSON_FILE>, e.g. { "spout.pollTimeoutMs" : 200 }              | n/a            
                 | Put property in JSON format in a file named 
`spout-<MY_PARSER>-config.json`   |
+|                             | Spout Config                               | 
spout.maxUncommittedOffsets        | -esc,--extra_kafka_spout_config 
<JSON_FILE>, e.g. { "spout.maxUncommittedOffsets" : 10000000 } | n/a            
                 | Put property in JSON format in a file named 
`spout-<MY_PARSER>-config.json`   |
+|                             | Spout Config                               | 
spout.offsetCommitPeriodMs         | -esc,--extra_kafka_spout_config 
<JSON_FILE>, e.g. { "spout.offsetCommitPeriodMs" : 30000 }     | n/a            
                 | Put property in JSON format in a file named 
`spout-<MY_PARSER>-config.json`   |
+| Parser bolt                 | Parser Num Tasks                           | 
n/a                                | -pnt,--parser_num_tasks <NUM_TASKS>        
                                                    | n/a                       
      |                                                                         
      |
+|                             | Parser Parallelism                         | 
n/a                                | -pp,--parser_p <PARALLELISM_HINT>          
                                                    | n/a                       
      |                                                                         
      |
+|                             | Parser Parallelism                         | 
n/a                                | -pp,--parser_p <PARALLELISM_HINT>          
                                                    | n/a                       
      |                                                                         
      |
+
+**Enrichment**
+
+This is a mapping of the various performance tuning properties for enrichments 
and how they are materialized.
+
+Flux file found here - $METRON_HOME/flux/enrichment/remote.yaml
+
+_Note 1:_ Changes to Flux file properties that are managed by Ambari will 
render Ambari unable to further manage the property.
+
+_Note 2:_ Many of these settings will be irrelevant in the alternate 
non-split-join topology
+
+| Category                    | Ambari Property Name                       | 
enrichment.properties property                         | Flux Property          
                                | Flux Section Location               | Storm 
Property Name             | Notes                                  |
+|-----------------------------|--------------------------------------------|--------------------------------------------------------|--------------------------------------------------------|-------------------------------------|---------------------------------|----------------------------------------|
+| Storm topology config       | enrichment_workers                         | 
enrichment.workers                                     | topology.workers       
                                | line 18, config                     | 
topology.workers                |                                        |
+|                             | enrichment_acker_executors                 | 
enrichment.acker.executors                             | 
topology.acker.executors                               | line 18, config        
             | topology.acker.executors        |                                
        |
+|                             | enrichment_topology_max_spout_pending      | 
topology.max.spout.pending                             | 
topology.max.spout.pending                             | line 18, config        
             | topology.max.spout.pending      |                                
        |
+| Kafka spout                 | enrichment_kafka_spout_parallelism         | 
kafka.spout.parallelism                                | parallelism            
                                | line 245, id: kafkaSpout            | n/a     
                        |                                        |
+|                             | n/a                                        | 
session.timeout.ms                                     | session.timeout.ms     
                                | line 201, id: kafkaProps            | n/a     
                        | Kafka consumer client property         |
+|                             | n/a                                        | 
enable.auto.commit                                     | enable.auto.commit     
                                | line 201, id: kafkaProps            | n/a     
                        | Kafka consumer client property         |
+|                             | n/a                                        | 
n/a                                                    | setPollTimeoutMs       
                                | line 230, id: kafkaConfig           | n/a     
                        | Kafka consumer client property         |
+|                             | n/a                                        | 
n/a                                                    | 
setMaxUncommittedOffsets                               | line 230, id: 
kafkaConfig           | n/a                             | Kafka consumer client 
property         |
+|                             | n/a                                        | 
n/a                                                    | 
setOffsetCommitPeriodMs                                | line 230, id: 
kafkaConfig           | n/a                             | Kafka consumer client 
property         |
+| Enrichment splitter         | enrichment_split_parallelism               | 
enrichment.split.parallelism                           | parallelism            
                                | line 253, id: enrichmentSplitBolt   | n/a     
                        |                                        |
+| Enrichment joiner           | enrichment_join_parallelism                | 
enrichment.join.parallelism                            | parallelism            
                                | line 316, id: enrichmentJoinBolt    | n/a     
                        |                                        |
+| Threat intel splitter       | threat_intel_split_parallelism             | 
threat.intel.split.parallelism                         | parallelism            
                                | line 338, id: threatIntelSplitBolt  | n/a     
                        |                                        |
+| Threat intel joiner         | threat_intel_join_parallelism              | 
threat.intel.join.parallelism                          | parallelism            
                                | line 376, id: threatIntelJoinBolt   | n/a     
                        |                                        |
+| Output bolt                 | kafka_writer_parallelism                   | 
kafka.writer.parallelism                               | parallelism            
                                | line 397, id: outputBolt            | n/a     
                        |                                        |
+
+When adding Kafka spout properties, there are 3 ways you'll do this.
+
+1. Ambari: If they are properties managed by Ambari (noted in the table under 
'Ambari Property Name'), look for the setting in Ambari.
+
+1. Flux -> kafkaProps: add a new key/value to the kafkaProps section HashMap 
on line 201. For example, if you want to set the Kafka Spout consumer's 
session.timeout.ms to 30 seconds, you would add the following:
+
+    ```
+           -   name: "put"
+               args:
+                   - "session.timeout.ms"
+                   - 30000
+    ```
+
+1. Flux -> kafkaConfig: add a new setter to the kafkaConfig section on line 
230. For example, if you want to set the Kafka Spout consumer's poll timeout to 
200 milliseconds, you would add the following under `configMethods`:
+
+    ```
+             -   name: "setPollTimeoutMs"
+                 args:
+                     - 200
+    ```
+
+**Indexing (Batch)**
+
+This is a mapping of the various performance tuning properties for indexing 
and how they are materialized.
+
+Flux file can be found here - $METRON_HOME/flux/indexing/batch/remote.yaml.
+
+Note: Changes to Flux file properties that are managed by Ambari will render 
Ambari unable to further manage the property.
+
+| Category                    | Ambari Property Name                       | 
hdfs.properties property                               | Flux Property          
                                | Flux Section Location               | Storm 
Property Name             | Notes                                  |
+|-----------------------------|--------------------------------------------|--------------------------------------------------------|--------------------------------------------------------|-------------------------------------|---------------------------------|----------------------------------------|
+| Storm topology config       | enrichment_workers                         | 
enrichment.workers                                     | topology.workers       
                                | line 19, config                     | 
topology.workers                |                                        |
+|                             | enrichment_acker_executors                 | 
enrichment.acker.executors                             | 
topology.acker.executors                               | line 19, config        
             | topology.acker.executors        |                                
        |
+|                             | enrichment_topology_max_spout_pending      | 
topology.max.spout.pending                             | 
topology.max.spout.pending                             | line 19, config        
             | topology.max.spout.pending      |                                
        |
+| Kafka spout                 | batch_indexing_kafka_spout_parallelism     | 
kafka.spout.parallelism                                | parallelism            
                                | line 123, id: kafkaSpout            | n/a     
                        |                                        |
+|                             | n/a                                        | 
session.timeout.ms                                     | session.timeout.ms     
                                | line 80, id: kafkaProps             | n/a     
                        | Kafka consumer client property         |
+|                             | n/a                                        | 
enable.auto.commit                                     | enable.auto.commit     
                                | line 80, id: kafkaProps             | n/a     
                        | Kafka consumer client property         |
+|                             | n/a                                        | 
n/a                                                    | setPollTimeoutMs       
                                | line 108, id: kafkaConfig           | n/a     
                        | Kafka consumer client property         |
+|                             | n/a                                        | 
n/a                                                    | 
setMaxUncommittedOffsets                               | line 108, id: 
kafkaConfig           | n/a                             | Kafka consumer client 
property         |
+|                             | n/a                                        | 
n/a                                                    | 
setOffsetCommitPeriodMs                                | line 108, id: 
kafkaConfig           | n/a                             | Kafka consumer client 
property         |
+| Output bolt                 | hdfs_writer_parallelism                    | 
hdfs.writer.parallelism                                | parallelism            
                                | line 133, id: hdfsIndexingBolt      | n/a     
                        |                                        |
+|                             | n/a                                        | 
n/a                                                    | hdfsSyncPolicy (see 
notes below)                       | line 47, id: hdfsWriter             | n/a  
                           | See notes below about adding this prop |
+|                             | bolt_hdfs_rotation_policy_units            | 
bolt.hdfs.rotation.policy.units                        | constructorArgs        
                                | line 41, id: hdfsRotationPolicy     | n/a     
                        |                                        |
+|                             | bolt_hdfs_rotation_policy_count            | 
bolt.hdfs.rotation.policy.count                        | constructorArgs        
                                | line 41, id: hdfsRotationPolicy     | n/a     
                        |                                        |
+
+_Note_: HDFS sync policy is not currently managed via Ambari. You will need to 
modify the Flux file directly to accommodate this setting. e.g.
+
+Add a new setter to the hdfsWriter around line 56. Lines 53-55 provided for 
context.
+
+```
+ 53             -   name: "withRotationPolicy"
+ 54                 args:
+ 55                     - ref: "hdfsRotationPolicy
+ 56             -   name: "withSyncPolicy"
+ 57                 args:
+ 58                     - ref: "hdfsSyncPolicy
+```
+
+Add an hdfsSyncPolicy after the hdfsRotationPolicy that appears on line 41. 
e.g.
+
+```
+ 41     -   id: "hdfsRotationPolicy"
+...
+ 45           - "${bolt.hdfs.rotation.policy.units}"
+ 46
+ 47     -   id: "hdfsSyncPolicy"
+ 48         className: "org.apache.storm.hdfs.bolt.sync.CountSyncPolicy"
+ 49         constructorArgs:
+ 50           -  100000
+```
+
 ## Use Case Specific Tuning Suggestions
 
 The below discussion outlines a specific tuning exercise we went through for 
driving 1 Gbps of traffic through a Metron cluster running with 4 Kafka brokers 
and 4
 Storm Supervisors.
 
 General machine specs
+
 - 10 Gb network cards
 - 256 GB memory
 - 12 disks
@@ -174,6 +332,7 @@ ${KAFKA_HOME}/bin/kafka-consumer-groups.sh \
 
 This will return a table with the following output depicting offsets for all 
partitions and consumers associated with the specified
 consumer group:
+
 ```
 GROUP                          TOPIC              PARTITION  CURRENT-OFFSET  
LOG-END-OFFSET  LAG             OWNER
 enrichments                    enrichments        9          29746066        
29746067        1               consumer-2_/xxx.xxx.xxx.xxx
@@ -212,6 +371,7 @@ We started with a single partition for the inbound Kafka 
topics and eventually w
 The default is 'null' which would result in no limit.
 
 __storm-bro.config__
+
 ```
 {
     ...
@@ -223,6 +383,7 @@ __storm-bro.config__
 And the following default spout settings. Again, this can be ommitted entirely 
since we are using the defaults.
 
 __spout-bro.config__
+
 ```
 {
     ...
@@ -252,6 +413,7 @@ though you could certainly do so if necessary. Notice that 
we only needed 1 work
 ```
 
 From the usage docs, here are the options we've used. The full reference can 
be found 
[here](../metron-platform/metron-parsers/README.md#Starting_the_Parser_Topology).
+
 ```
 usage: start_parser_topology.sh
  -e,--extra_topology_options <JSON_FILE>               Extra options in the 
form
@@ -290,6 +452,7 @@ Note that the main Metron-specific option we've changed to 
accomodate the desire
 More information on Flux can be found here - 
http://storm.apache.org/releases/1.0.1/flux.html
 
 __General storm settings__
+
 ```
 topology.workers: 8
 topology.acker.executors: 48
@@ -297,6 +460,7 @@ topology.max.spout.pending: 2000
 ```
 
 __Spout and Bolt Settings__
+
 ```
 kafkaSpout
     parallelism=48
@@ -341,6 +505,7 @@ cat ${METRON_HOME}/config/zookeeper/indexing/bro.json
 And here are the settings we used for the indexing topology
 
 __General storm settings__
+
 ```
 topology.workers: 4
 topology.acker.executors: 24
@@ -348,6 +513,7 @@ topology.max.spout.pending: 2000
 ```
 
 __Spout and Bolt Settings__
+
 ```
 hdfsSyncPolicy
     org.apache.storm.hdfs.bolt.sync.CountSyncPolicy
@@ -372,12 +538,14 @@ PCAP is a specialized topology that is a Spout-only 
topology. Both Kafka topic c
 avoid the additional network hop required if using an additional bolt.
 
 __General Storm topology properties__
+
 ```
 topology.workers=16
 topology.ackers.executors: 0
 ```
 
 __Spout and Bolt properties__
+
 ```
 kafkaSpout
     parallelism: 128
@@ -403,6 +571,69 @@ writerConfig
         dfs.blocksize=1073741824
 ```
 
+## Debugging
+
+Set the following env vars accordingly for your cluster. This is how we would 
configure it for the Metron full dev development environment.
+
+```
+export HDP_HOME=/usr/hdp/current
+export KAFKA_HOME=$HDP_HOME/kafka-broker
+export STORM_UI=http://node1:8744
+export ELASTIC=http://node1:9200
+export ZOOKEEPER=node1:2181
+export METRON_VERSION=0.4.3
+export METRON_HOME=/usr/metron/${METRON_VERSION}
+```
+
+Note that the output from Storm will be a flattened blob of JSON. In order to 
pretty print for readability, you can pipe it through a JSON formatter, e.g.
+
+```
+[some Storm curl command] | python -m json.tool
+```
+
+**Getting Storm Configuration Details**
+
+Storm has a useful REST API you can use to get full details about your running 
topologies. This is generally more convenient and complete for troubleshooting 
performance problems than going to the Storm UI alone. See Storm's [REST API 
docs](http://storm.apache.org/releases/1.1.0/STORM-UI-REST-API.html) for more 
details.
+
+```
+# get Storm cluster summary info including version
+curl -XGET ${STORM_UI}'/api/v1/cluster/summary'
+```
+
+```
+# get overall Storm cluster configuration
+curl -XGET ${STORM_UI}'/api/v1/cluster/configuration'
+```
+
+```
+# get list of topologies and brief summary detail
+curl -XGET ${STORM_UI}'/api/v1/topology/summary'
+```
+
+```
+# get all topology runtime settings. Plugin the ID for your topology, which 
you can get from the topology summary command or from the Storm UI. Passing 
sys=1 will also return system stats.
+curl -XGET ${STORM_UI}'/api/v1/topology/:id?sys=1​'
+```
+
+**Getting Kafka Configuration Details**
+
+```
+# Get list of Kafka topics
+${HDP_HOME}/kafka-broker/bin/kafka-topics.sh --zookeeper $ZOOKEEPER --list
+```
+
+```
+# Get Kafka topic details - plugin the desired topic name in place of 
"enrichments"
+${HDP_HOME}/kafka-broker/bin/kafka-topics.sh --zookeeper $ZOOKEEPER --topic 
enrichments --describe
+```
+
+**Getting Metron Topology Zookeeper Configuration**
+
+```
+# Provides a full listing of all Metron parser, enrichment, and indexing 
topology configuration
+$METRON_HOME/bin/zk_load_configs.sh -m DUMP -z $ZOOKEEPER
+```
+
 ## Issues
 
 __Error__
@@ -423,11 +654,12 @@ modifying the options outlined above, increasing the poll 
timeout, or both.
 ## Reference
 
 * [Enrichment Performance](metron-enrichment/Performance.md)
-* http://storm.apache.org/releases/1.0.1/flux.html
+* http://storm.apache.org/releases/1.1.0/flux.html
 * 
https://stackoverflow.com/questions/17257448/what-is-the-task-in-storm-parallelism
 * 
http://storm.apache.org/releases/current/Understanding-the-parallelism-of-a-Storm-topology.html
 * 
http://www.malinga.me/reading-and-understanding-the-storm-ui-storm-ui-explained/
 * 
https://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/
 * 
https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.1/bk_storm-component-guide/content/storm-kafkaspout-perf.html
+* http://storm.apache.org/releases/1.1.0/STORM-UI-REST-API.html
 
 

http://git-wip-us.apache.org/repos/asf/metron/blob/e0949142/metron-platform/metron-common/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/README.md 
b/metron-platform/metron-common/README.md
index ab90a66..b25fbc8 100644
--- a/metron-platform/metron-common/README.md
+++ b/metron-platform/metron-common/README.md
@@ -23,6 +23,7 @@ limitations under the License.
 * [Management Utility](#management-utility)
 * [Topology Errors](topology-errors)
 * [Performance Logging](#performance-logging)
+* [Metron Debugging](#metron-debugging)
 
 # Stellar Language
 
@@ -400,3 +401,34 @@ __Side Effects__
 Calling the mark() method multiple times simply resets the start time to the 
current nano time. Calling log() with a non-existent mark name will log 0 ns 
elapsed time with a warning indicating that log has been invoked for a mark 
name that does not exist.
 The class is not thread-safe and makes no attempt at keeping multiple threads 
from modifying the same markers.
 
+# Metron Debugging
+
+A Python script is provided for gathering information useful in debugging your 
Metron cluster. Run from the node that has Metron installed on it. All options 
listed below are required.
+
+_Note:_ Be aware that no anonymization/scrubbing is performed on the captured 
configuration details.
+
+```
+# $METRON_HOME/bin/cluster_info.py -h
+Usage: cluster_info.py [options]
+
+Options:
+  -h, --help            show this help message and exit
+  -a HOST:PORT, --ambari-host=HOST:PORT
+                        Connect to Ambari via the supplied host:port
+  -c NAME, --cluster-name=NAME
+                        Name of cluster in Ambari to retrieve info for
+  -o DIRECTORY, --out-dir=DIRECTORY
+                        Write debugging data to specified root directory
+  -s HOST:PORT, --storm-host=HOST:PORT
+                        Connect to Storm via the supplied host:port
+  -b HOST1:PORT,HOST2:PORT, --broker_list=HOST1:PORT,HOST2:PORT
+                        Connect to Kafka via the supplied comma-delimited
+                        host:port list
+  -z HOST1:PORT,HOST2:PORT, --zookeeper_quorum=HOST1:PORT,HOST2:PORT
+                        Connect to Zookeeper via the supplied comma-delimited
+                        host:port quorum list
+  -m DIRECTORY, --metron_home=DIRECTORY
+                        Metron home directory
+  -p DIRECTORY, --hdp_home=DIRECTORY
+                        HDP home directory
+```

http://git-wip-us.apache.org/repos/asf/metron/blob/e0949142/metron-platform/metron-common/src/main/scripts/cluster_info.py
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/scripts/cluster_info.py 
b/metron-platform/metron-common/src/main/scripts/cluster_info.py
new file mode 100755
index 0000000..6e853c0
--- /dev/null
+++ b/metron-platform/metron-common/src/main/scripts/cluster_info.py
@@ -0,0 +1,389 @@
+#!/usr/bin/python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+from optparse import OptionParser
+from requests.auth import HTTPBasicAuth
+from contextlib import closing
+import datetime
+import getpass
+import json
+import os
+import os.path
+import requests
+import shutil
+import subprocess
+import sys
+import tarfile
+import zlib
+
+INDENT_SIZE = 2
+
+class UserPrompt(object):
+    
+    def __init__(self, prompt):
+        self.prompt = prompt
+
+    def get_hidden(self):
+        return getpass.getpass(self.prompt)
+
+class FileWriter(object):
+
+    def write(self, path, content):
+        print "Writing config to " + path
+        if not os.path.exists(os.path.dirname(path)):
+            try:
+                os.makedirs(os.path.dirname(path))
+            except OSError as exc: # Guard against race condition
+                if exc.errno != errno.EEXIST:
+                    raise
+        with open(path, 'w') as outfile:
+            outfile.write(content)
+        print "...done"
+
+class ShellHandler(object):
+
+    def __init__(self):
+        pass
+
+    # returns full stdout of process call
+    def call(self, command):
+        try:
+            return subprocess.call(command)
+        except OSError as e:
+            print >> sys.stderr, "Execution failed:", e
+    
+    # partly hijacked from Python 2.7+ check_output for use in 2.6
+    def ret_output(self, cmd):
+        process = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
+        output, unused_err = process.communicate()
+        retcode = process.poll()
+        if retcode:
+            raise subprocess.CalledProcessError(retcode, cmd, output=output)
+        return output
+
+class InfoGatherer(object):
+
+    def __init__(self, name):
+        self.name = name
+
+class AmbariInfo(InfoGatherer):
+
+    def __init__(self, host_info, cluster_name):
+        super(AmbariInfo, self).__init__('Ambari')
+        self.cluster_name = cluster_name
+        self.ambari_config_url = 
'http://{0}/api/v1/clusters/{1}/configurations/service_config_versions'.format(host_info,
 cluster_name)
+        self.params_payload = { 'is_current' : 'true' }
+
+    def collect(self, out_dir):
+        print "Ambari request URL: " + self.ambari_config_url
+        ambari_user = UserPrompt('Ambari username: ').get_hidden()
+        ambari_pass = UserPrompt('Ambari password: ').get_hidden()
+        self.get_cluster_config(out_dir, ambari_user, ambari_pass)
+
+    def get_cluster_config(self, out_dir, ambari_user, ambari_pass):
+        # set encoding to 'identity' to keep Ambari from passing back gzipped 
content for large requests
+        headers = {
+                    'X-Requested-By' : 'ambari',
+                    'Authorization' : 'Basic',
+                    'Accept-Encoding': 'identity'
+                  }
+        # Retrieving Ambari config detail
+        response = requests.get(self.ambari_config_url, headers=headers, 
params=self.params_payload, stream=True, auth=HTTPBasicAuth(ambari_user, 
ambari_pass))
+        if response.status_code == 200:
+            file_name = 'ambari-cluster-config.json'
+            full_out_path = os.path.join(out_dir, self.name.lower(), file_name)
+            FileWriter().write(full_out_path, response.text)
+        else:
+            print "Request failed with status code: " + 
str(response.status_code)
+
+class StormInfo(InfoGatherer):
+
+    def __init__(self, host_info):
+        super(StormInfo, self).__init__('Storm')
+        url_base = 'http://{0}/api/v1'.format(host_info)
+        self.url_cluster_summary = url_base + '/cluster/summary'
+        self.url_cluster_configuration = url_base + '/cluster/configuration'
+        self.url_topology_summary = url_base + '/topology/summary'
+        self.url_topology_stats_summary = url_base + '/topology/{0}?sys=1'
+
+    def collect(self, out_dir):
+        self.get_cluster_summary(out_dir)
+        self.get_cluster_configuration(out_dir)
+        self.get_topology_summary(out_dir)
+        self.get_topology_stats_summary(out_dir)
+
+    def get_cluster_summary(self, out_dir):
+        response = requests.get(self.url_cluster_summary)
+        if response.status_code == 200:
+            file_name = 'cluster-summary.json'
+            full_out_path = os.path.join(out_dir, self.name.lower(), file_name)
+            FileWriter().write(full_out_path, json.dumps(response.json(), 
indent=INDENT_SIZE))
+        else:
+            print "Request failed with status code: " + 
str(response.status_code)
+
+    def get_cluster_configuration(self, out_dir):
+        response = requests.get(self.url_cluster_configuration)
+        if response.status_code == 200:
+            file_name = 'cluster-configuration.json'
+            full_out_path = os.path.join(out_dir, self.name.lower(), file_name)
+            FileWriter().write(full_out_path, json.dumps(response.json(), 
indent=INDENT_SIZE))
+        else:
+            print "Request failed with status code: " + 
str(response.status_code)
+
+    def get_topology_summary(self, out_dir):
+        response = requests.get(self.url_topology_summary)
+        if response.status_code == 200:
+            file_name = 'topology-summary.json'
+            full_out_path = os.path.join(out_dir, self.name.lower(), file_name)
+            FileWriter().write(full_out_path, json.dumps(response.json(), 
indent=INDENT_SIZE))
+        else:
+            print "Request failed with status code: " + 
str(response.status_code)
+
+    def get_topology_stats_summary(self, out_dir):
+        summary_response = requests.get(self.url_topology_summary)
+        if summary_response.status_code == 200:
+            for feature, value in summary_response.json().iteritems():
+                if feature == 'topologies':
+                    for topology in value:
+                        for k, v in topology.iteritems():
+                            if k == 'id':
+                                print "Retrieving Storm topology stats summary 
for topology-id " + v
+                                response = 
requests.get(self.url_topology_stats_summary.format(v))
+                                if response.status_code == 200:
+                                    file_name = 
'topology-{0}-stats-summary.json'.format(v)
+                                    full_out_path = os.path.join(out_dir, 
self.name.lower(), 'stats-summaries', file_name)
+                                    FileWriter().write(full_out_path, 
json.dumps(response.json(), indent=INDENT_SIZE))
+                                else:
+                                    print "Request failed with status code: " 
+ str(response.status_code)
+        else:
+            print "Topology listing request failed with status code: " + 
str(summary_response.status_code)
+
+class KafkaInfo(InfoGatherer):
+
+    def __init__(self, broker_list, zookeeper_quorum, hdp_home):
+        super(KafkaInfo, self).__init__('Kafka')
+        self.broker_list = broker_list
+        self.zookeeper_quorum = zookeeper_quorum
+        self.hdp_home = hdp_home
+        # note, need to escape the last single quote with the trim command so 
the string literal works
+        self.cmd_broker_id = '''{0}/kafka-broker/bin/zookeeper-shell.sh {1} 
<<< "ls /brokers/ids" | grep -e '\[.*\]' | tr -d [] | tr , ' 
\''''.format(self.hdp_home, self.zookeeper_quorum)
+        # broker id is dynamic and replaced later
+        self.cmd_broker_info = '''echo "get /brokers/ids/{0}" | 
{1}/kafka-broker/bin/zookeeper-shell.sh {2} 2>&1'''.format('{0}', 
self.hdp_home, self.zookeeper_quorum)
+        self.cmd_kafka_topics = '''{0}/kafka-broker/bin/kafka-topics.sh 
--zookeeper {1} --list'''.format(self.hdp_home, self.zookeeper_quorum)
+        self.cmd_topic_detail = '''{0}/kafka-broker/bin/kafka-topics.sh 
--zookeeper {1} --topic {2} --describe'''.format(self.hdp_home, 
self.zookeeper_quorum, '{0}')
+
+    def collect(self, out_dir):
+        print "Retrieving Kafka detail"
+        self.get_broker_info(out_dir)
+        self.get_kafka_topics(out_dir)
+        self.get_topic_detail(out_dir)
+
+    def get_broker_info(self, out_dir):
+        print "Retrieving Kafka broker info"
+        broker_ids = ShellHandler().ret_output(self.cmd_broker_id)
+        for broker in broker_ids.strip().split(','):
+            file_name = 'kafka-broker-{0}-info.txt'.format(broker)
+            full_out_path = os.path.join(out_dir, self.name.lower(), 
'broker-info', file_name)
+            broker_data = 
ShellHandler().ret_output(self.cmd_broker_info.format(broker))
+            FileWriter().write(full_out_path, broker_data)
+
+    def get_kafka_topics(self, out_dir):
+        file_name = 'kafka-topics.txt'
+        full_out_path = os.path.join(out_dir, self.name.lower(), file_name)
+        topic_list = ShellHandler().ret_output(self.cmd_kafka_topics)
+        FileWriter().write(full_out_path, topic_list)
+
+    def get_topic_detail(self, out_dir):
+        file_name = 'kafka-enrichments-topic.txt'
+        full_out_path = os.path.join(out_dir, self.name.lower(), file_name)
+        enrichment_topic_detail = 
ShellHandler().ret_output(self.cmd_topic_detail.format('enrichments'))
+        FileWriter().write(full_out_path, enrichment_topic_detail)
+
+        file_name = 'kafka-indexing-topic.txt'
+        full_out_path = os.path.join(out_dir, self.name.lower(), file_name)
+        indexing_topic_detail = 
ShellHandler().ret_output(self.cmd_topic_detail.format('indexing'))
+        FileWriter().write(full_out_path, indexing_topic_detail)
+
+class MetronInfo(InfoGatherer):
+
+    def __init__(self, metron_home, zookeeper_quorum):
+        super(MetronInfo, self).__init__('Metron')
+        self.metron_home = metron_home
+        self.zookeeper_quorum = zookeeper_quorum
+        self.cmd_zk_load_configs = '''{0}/bin/zk_load_configs.sh -m DUMP -z 
{1}'''.format(self.metron_home, self.zookeeper_quorum)
+        self.cmd_metron_lib_list = '''ls -al 
{0}/lib'''.format(self.metron_home)
+
+    def collect(self, out_dir):
+        self.get_metron_config(out_dir)
+        self.get_metron_flux(out_dir)
+        self.get_metron_zk_config(out_dir)
+        self.get_lib_listing(out_dir)
+        self.get_rpm_listing(out_dir)
+    
+    def get_metron_config(self, out_dir):
+        print 'Copying ' + self.metron_home + '/config'
+        full_out_path = os.path.join(out_dir, self.name.lower(), 'config')
+        shutil.copytree(self.metron_home + '/config', full_out_path)
+
+    def get_metron_flux(self, out_dir):
+        print 'Copying ' + self.metron_home + '/flux'
+        full_out_path = os.path.join(out_dir, self.name.lower(), 'flux')
+        shutil.copytree(self.metron_home + '/flux', full_out_path)
+
+    def get_metron_zk_config(self, out_dir):
+        zk_config_dump = ShellHandler().ret_output(self.cmd_zk_load_configs)
+        full_out_path = os.path.join(out_dir, self.name.lower(), 
'zk-configs.txt')
+        FileWriter().write(full_out_path, zk_config_dump)
+
+    def get_lib_listing(self, out_dir):
+        metron_lib_list = ShellHandler().ret_output(self.cmd_metron_lib_list)
+        full_out_path = os.path.join(out_dir, self.name.lower(), 
'metron-libs-dir.txt')
+        FileWriter().write(full_out_path, metron_lib_list)
+
+    def get_rpm_listing(self, out_dir):
+        metron_rpm_list = ShellHandler().ret_output('''rpm -qa | grep 
'metron\|elasticsearch\|kibana\'''')
+        full_out_path = os.path.join(out_dir, self.name.lower(), 
'metron-rpm-list.txt')
+        FileWriter().write(full_out_path, metron_rpm_list)
+
+class HdpInfo(InfoGatherer):
+
+    def __init__(self, hdp_home):
+        super(HdpInfo, self).__init__('HDP')
+        self.hdp_home = hdp_home
+
+    def collect(self, out_dir):
+        hadoop_version_info = ShellHandler().ret_output('hadoop version')
+        full_out_path = os.path.join(out_dir, self.name.lower(), 
'version-info.txt')
+        FileWriter().write(full_out_path, hadoop_version_info)
+
+class ClusterInfo:
+
+    def __init__(self):
+        pass
+
+    def main(self):
+        (options, args) = self.get_cli_args()
+        self.collect_data(options.out_dir,
+                          options.ambari_host,
+                          options.cluster_name,
+                          options.storm_host,
+                          options.broker_list,
+                          options.zookeeper_quorum,
+                          options.metron_home,
+                          options.hdp_home)
+
+    def get_cli_args(self):
+        parser = OptionParser()
+        parser.add_option("-a", "--ambari-host", 
+                          action="store",
+                          type="string",
+                          dest="ambari_host",
+                          help="Connect to Ambari via the supplied host:port",
+                          default="node1:8080",
+                          metavar="HOST:PORT")
+        parser.add_option("-c", "--cluster-name", 
+                          action="store",
+                          type="string",
+                          dest="cluster_name",
+                          help="Name of cluster in Ambari to retrieve info 
for",
+                          default="metron_cluster",
+                          metavar="NAME")
+        parser.add_option("-o", "--out-dir", 
+                          action="store",
+                          type="string",
+                          dest="out_dir",
+                          help="Write debugging data to specified root 
directory",
+                          default=".",
+                          metavar="DIRECTORY")
+        parser.add_option("-s", "--storm-host", 
+                          action="store",
+                          type="string",
+                          dest="storm_host",
+                          help="Connect to Storm via the supplied host:port",
+                          default="node1:8744",
+                          metavar="HOST:PORT")
+        parser.add_option("-b", "--broker_list", 
+                          action="store",
+                          type="string",
+                          dest="broker_list",
+                          help="Connect to Kafka via the supplied 
comma-delimited host:port list",
+                          default="node1:6667",
+                          metavar="HOST1:PORT,HOST2:PORT")
+        parser.add_option("-z", "--zookeeper_quorum", 
+                          action="store",
+                          type="string",
+                          dest="zookeeper_quorum",
+                          help="Connect to Zookeeper via the supplied 
comma-delimited host:port quorum list",
+                          default="node1:2181",
+                          metavar="HOST1:PORT,HOST2:PORT")
+        parser.add_option("-m", "--metron_home", 
+                          action="store",
+                          type="string",
+                          dest="metron_home",
+                          help="Metron home directory",
+                          default="/usr/metron/0.4.3",
+                          metavar="DIRECTORY")
+        parser.add_option("-p", "--hdp_home", 
+                          action="store",
+                          type="string",
+                          dest="hdp_home",
+                          help="HDP home directory",
+                          default="/usr/hdp/current",
+                          metavar="DIRECTORY")
+
+        return parser.parse_args()
+    
+    def collect_data(self, 
+                     out_dir_base,
+                     ambari_host,
+                     cluster_name,
+                     storm_host,
+                     broker_list,
+                     zookeeper_quorum,
+                     metron_home,
+                     hdp_home):
+        out_dir = self.get_out_dirname(out_dir_base)
+        info_getters = [
+                AmbariInfo(ambari_host, cluster_name),
+                StormInfo(storm_host),
+                KafkaInfo(broker_list, zookeeper_quorum, hdp_home),
+                MetronInfo(metron_home, zookeeper_quorum),
+                HdpInfo(hdp_home)
+        ]
+        for getter in info_getters:
+            getter.collect(out_dir)
+        self.compress_files(out_dir)
+        print "Finished gathering debug info"
+
+    # creates dir w/timestamp to drop all configs
+    # e.g. metron-debug-2018-03-24_06-50-34
+    def get_out_dirname(self, out_dir_base):
+        return os.path.join(out_dir_base, 'metron-debug-' + 
datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S'))
+
+    def compress_files(self, out_dir):
+        tarball_name = out_dir + '.tgz'
+        print "Creating tarfile bundle with all configs: 
'{0}'".format(tarball_name)
+        with closing(tarfile.open(tarball_name, 'w:gz')) as tar:
+            tar.add(out_dir, arcname=os.path.basename(out_dir))
+        print "...done"
+
+if __name__ == "__main__":
+    ClusterInfo().main()
+

Reply via email to