Ottomata has submitted this change and it was merged.

Change subject: Refactor MirrorMaker puppetization
......................................................................


Refactor MirrorMaker puppetization

This change makes MirrorMaker work with the latest 0.8.2.1-4 jessie-wikimedia
.deb release at http://apt.wikimedia.org/wikimedia/pool/main/k/kafka/

Adds a kafka::mirror::jmxtrans and kafka::mirror::monitoring defines.

Bug: T124077
Change-Id: I836e20a8f009ed89452630f95e570f02c522c837
---
M README.md
M manifests/defaults.pp
M manifests/mirror.pp
M manifests/mirror/consumer.pp
A manifests/mirror/init.pp
A manifests/mirror/jmxtrans.pp
A manifests/mirror/monitoring.pp
M manifests/server/jmxtrans.pp
M manifests/server/monitoring.pp
D templates/consumer.properties.erb
D templates/kafka-mirror.default.erb
M templates/log4j.properties.erb
A templates/mirror/consumer.properties.erb
A templates/mirror/kafka-mirror.default.erb
A templates/mirror/kafka-mirror.systemd.erb
A templates/mirror/producer.properties.erb
D templates/producer.properties.erb
17 files changed, 697 insertions(+), 309 deletions(-)

Approvals:
  Ottomata: Looks good to me, approved
  jenkins-bot: Verified



diff --git a/README.md b/README.md
index 31e28e0..7543730 100644
--- a/README.md
+++ b/README.md
@@ -72,82 +72,36 @@
 `zookeeper_chroot` is optional, and allows you to specify a Znode under
 which Kafka will store its metadata in Zookeeper.  This is useful if you
 want to use a single Zookeeper cluster to manage multiple Kafka clusters.
-See below for information on how to create this Znode in Zookeeper.
 
-
-
-## Custom Zookeeper Chroot
-
-If Kafka will share a Zookeeper cluster with other users, you might want to
-create a Znode in zookeeper in which to store your Kafka cluster's data.
-You can set the `zookeeper_chroot` parameter on the `kafka` class to do this.
-
-First, you'll need to create the znode manually yourself.  You can use
-`zkCli.sh` that ships with Zookeeper, or you can use the kafka built in
-`zookeeper-shell`:
-
-
-```
-$ kafka zookeeper-shell <zookeeper_host>:2182
-Connecting to kraken-zookeeper
-Welcome to ZooKeeper!
-JLine support is enabled
-
-WATCHER::
-
-WatchedEvent state:SyncConnected type:None path:null
-[zk: kraken-zookeeper(CONNECTED) 0] create /my_kafka kafka
-Created /my_kafka
-```
-
-You can use whatever chroot znode path you like.  The second argument
-(`data`) is arbitrary.  I used 'kafka' here.
-
-Then:
-```puppet
-class { 'kafka::server':
-    brokers => {
-        'kafka-node01.example.com' => { 'id' => 1, 'port' => 12345 },
-        'kafka-node02.example.com' => { 'id' => 2 },
-    },
-    zookeeper_hosts => ['zk-node01:2181', 'zk-node02:2181', 'zk-node03:2181'],
-    # set zookeeper_chroot on the kafka class.
-    zookeeper_chroot => '/kafka/clusterA',
-}
-```
 
 ## Kafka Mirror
 
-Kafka MirrorMaker is usually used for inter data center Kafka cluster 
replication
-and aggregation.  You can consume from any number of source Kafka clusters, and
-produce to a single destination Kafka cluster.
+Kafka MirrorMaker will allow you to mirror data from multiple Kafka clusters
+into another.  This is useful for cross DC replication and for aggregation.
 
 ```puppet
-# Configure kafka-mirror to produce to Kafka Brokers which are
-# part of our kafka aggregator cluster.
-class { 'kafka::mirror':
-    destination_brokers => {
-        'kafka-aggregator01.example.com' => { 'id' => 11 },
-        'kafka-aggregator02.example.com' => { 'id' => 12 },
-    },
-    topic_whitelist => 'webrequest.*',
+# Mirror the 'main' and 'secondary' Kafka clusters
+# to the 'aggregate' Kafka cluster.
+kafka::mirror::consumer { 'main':
+    mirror_name   => 'aggregate',
+    zookeeper_url => 'zk:2181/kafka/main',
 }
-
-# Configure kafka-mirror to consume from both clusterA and clusterB
-kafka::mirror::consumer { 'clusterA':
-    zookeeper_hosts  => ['zk-node01:2181', 'zk-node02:2181', 'zk-node03:2181'],
-    zookeeper_chroot => ['/kafka/clusterA'],
+kafka::mirror::consumer { 'secondary':
+    mirror_name   => 'aggregate',
+    zookeeper_url => 'zk:2181/kafka/secondary',
 }
-kafka::mirror::consumer { 'clusterB':
-    zookeeper_hosts  => ['zk-node01:2181', 'zk-node02:2181', 'zk-node03:2181'],
-    zookeeper_chroot => ['/kafka/clusterB'],
+kafka::mirror { 'aggregate':
+    destination_brokers => ['ka01:9092','ka02:9092'],
+    whitelist           => 'these_topics_only.*',
 }
 ```
+Note that the kafka-mirror service does not subscribe to its config files.  If
+you make changes, you will have to restart the service manually.
 
 ## jmxtrans monitoring
 
-This module contains a class called `kafka::server::jmxtrans`.  It contains
-a useful jmxtrans JSON config object that can be used to tell jmxtrans to send
+`kafka::server::jmxtrans` and `kafka::mirror::jmxtrans`  configure
+useful jmxtrans JSON config objects that can be used to tell jmxtrans to send
 to any output writer (Ganglia, Graphite, etc.).  To you use this, you will need
 the [puppet-jmxtrans](https://github.com/wikimedia/puppet-jmxtrans) module.
 
@@ -157,8 +111,17 @@
     ganglia => 'ganglia.example.com:8649',
 }
 ```
-
-This will install jmxtrans and start render JSON config files for sending
+This will install jmxtrans and render JSON config files for sending
 JVM and Kafka Broker stats to Ganglia.
 See [kafka-jmxtrans.json.md](kafka-jmxtrans.json.md) for a fully
-rendered jmxtrans Kafka JSON config file.
+rendered jmxtrans Kafka Broker JSON config file.
+
+```
+# Declare this define on hosts where you run Kafka MirrorMaker.
+kafka::mirror::jmxtrans { 'aggregate':
+    statsd => 'statsd.example.org:8125'
+}
+
+```
+This will install jmxtrans and render JSON config files for sending JVM and
+Kafka MirrorMaker (consumers and producer) stats to statsd.
diff --git a/manifests/defaults.pp b/manifests/defaults.pp
index 3808562..1b657ff 100644
--- a/manifests/defaults.pp
+++ b/manifests/defaults.pp
@@ -23,9 +23,6 @@
 
     $kafka_log_file                      = '/var/log/kafka/kafka.log'
     $log_max_backup_index                = 4
-    $producer_type                       = 'async'
-    $producer_batch_num_messages         = 200
-    $consumer_group_id                   = 'test-consumer-group'
 
     # Broker Server settings
     $java_home                           = undef
@@ -70,24 +67,12 @@
     # Kafka package version.
     $version                             = 'installed'
 
-    # MirrorMaker settings
-    $topic_whitelist                     = '.*'
-    $topic_blacklist                     = undef
-    $num_streams                         = 1
-    $num_producers                       = 1
-    $queue_size                          = 10000
-
     # Default puppet paths to template config files.
     # This allows us to use custom template config files
     # if we want to override more settings than this
     # module yet supports.
-    $producer_properties_template        = 'kafka/producer.properties.erb'
-    $consumer_properties_template        = 'kafka/consumer.properties.erb'
     $log4j_properties_template           = 'kafka/log4j.properties.erb'
     $server_properties_template          = 'kafka/server.properties.erb'
     $server_systemd_override_template    = 
'kafka/kafka.systemd.override.conf.erb'
     $server_default_template             = 'kafka/kafka.default.erb'
-    $mirror_default_template             = 'kafka/kafka-mirror.default.erb'
-
-
 }
diff --git a/manifests/mirror.pp b/manifests/mirror.pp
index 7c3e5af..d9885d7 100644
--- a/manifests/mirror.pp
+++ b/manifests/mirror.pp
@@ -1,92 +1,186 @@
-# == Class kafka::mirror
-# Sets up a Kafka MirrorMaker and ensures that it is running.
+# == Define kafka::mirror
+# Sets up a Kafka MirrorMaker instance and ensures that it is running.
+# You must declare your kafka::mirror::consumers before this class.
 #
-# == Parameters:
-# $enabled                    - If false, Kafka Mirror Maker service will not 
be
-#                               started.  Default: true.
+# NOTE: This does not work without systemd.  Make sure you are including
+# this define on a node that supports systemd.
 #
-# $destination_brokers        - Hash of Kafka Broker to which you want to 
produce configs keyed by
-#                               fqdn of each kafka broker node.  This Hash
-#                               should be of the form:
-#                               { 'hostA' => { 'id' => 1, 'port' => 12345 }, 
'hostB' => { 'id' => 2 }, ... }
-#                               'port' is optional, and will default to 9092.
-
-# $jmx_port                    - Port on which to expose JMX metrics.  
Default: 9999
+# TODO: support ensure => 'absent'
 #
-class kafka::mirror(
-    $enabled                               = true,
-    $destination_brokers                   = $kafka::defaults::brokers,
+# == Usage
+#
+#   # Mirror the 'main' and 'secondary' Kafka clusters
+#   # to the 'aggregate' Kafka cluster.
+#   kafka::mirror::consumer { 'main':
+#       mirror_name   => 'aggregate',
+#       zookeeper_url => 'zk:2181/kafka/main',
+#   }
+#   kafka::mirror::consumer { 'secondary':
+#       mirror_name   => 'aggregate',
+#       zookeeper_url => 'zk:2181/kafka/secondary',
+#   }
+#
+#   kafka::mirror { 'aggregate':
+#       destination_brokers => ['ka01:9092','ka02:9092']
+#       ...,
+#   }
+#
+# == Parameters
+#
+# $destination_brokers         - Array of Kafka brokers hosts in your
+#                                destination cluster.  These brokers
+#                                will be used for bootstrapping the producers
+#                                configs and metadata.
+#
+# $enabled                     - If false, kafka mirror-maker service will not
+#                                be started.  Default: true.
+#
+# $whitelist                   - Java regex matching topics to mirror.
+#                                You must set either this or $topic_blacklist
+#                                Default: '.*'
+#
+# $blacklist                   - Java regex matching topics to not mirror.
+#                                Default: undef
+#                                You must set either this or $topic_whitelist
+#
+# $num_producers               - Number of producer threads. Default: 1
+#
+# $num_streams                 - Number of consumer threads. Default: 1
+#
+# $queue_size                  - Size of intermediate consumer -> producer
+#                                queue.  Note that this is different than
+#                                $queue_buffering_max_messages, which is the
+#                                queue size of messages in async producers.
+#                                Default: 10000
+#
+# $heap_opts                   - Heap options to pass to JVM on startup.
+#                                Default: undef
+#
+# $request_required_acks       - Required number of acks for a produce request.
+#                                Default: -1 (all replicas)
+# $producer_type               - sync or async.  Default: async
+#
+# $compression_codec           - none, gzip, or snappy.  Default: snappy
+#
+# $batch_num_messages          - If async producer, the number of messages
+#                                to batch together in a single produce request.
+#                                Default: 200
+#
+# queue_buffering_max_ms       - Maximum time to buffer data when using async
+#                                mode. For example a setting of 100 will try to
+#                                batch together 100ms of messages to send at
+#                                once. Default: 5000
+#
+# queue_buffering_max_messages - The maximum number of unsent messages that can
+#                                be queued up the producer when using async
+#                                mode before either the producer must be
+#                                blocked or data must be dropped.
+#                                Default: 10000
+#
+# queue_enqueue_timeout_ms     - The amount of time to block before dropping
+#                                messages when running in async mode and the
+#                                buffer has reached
+#                                queue.buffering.max.messages. If set to 0
+#                                events will be enqueued immediately or dropped
+#                                if the queue is full (the producer send call
+#                                will never block). If set to -1 the producer
+#                                will block indefinitely and never willingly
+#                                drop a send. Default: -1
+#
+# $jmx_port                    - Port on which to expose MirrorMaker
+#                                JMX metrics. Default: 9998
+#
+define kafka::mirror(
+    $destination_brokers,
+    $enabled                      = true,
 
-    $jmx_port                              = $kafka::defaults::jmx_port,
+    $whitelist                    = '.*',
+    $blacklist                    = undef,
 
-    $topic_whitelist                       = $kafka::defaults::topic_whitelist,
-    $topic_blacklist                       = $kafka::defaults::topic_blacklist,
+    $num_producers                = 1,
+    $num_streams                  = 1,
+    $queue_size                   = 10000,
+    $heap_opts                    = undef,
 
-    $producer_type                         = $kafka::defaults::producer_type,
-    $producer_compression_codec            = 
$kafka::defaults::producer_compression_codec,
-    $producer_serializer_class             = 
$kafka::defaults::producer_serializer_class,
-    $producer_queue_buffering_max_ms       = 
$kafka::defaults::producer_queue_buffering_max_ms,
-    $producer_queue_buffering_max_messages = 
$kafka::defaults::producer_queue_buffering_max_messages,
-    $producer_queue_enqueue_timeout_ms     = 
$kafka::defaults::producer_queue_enqueue_timeout_ms,
-    $producer_batch_num_messages           = 
$kafka::defaults::producer_batch_num_messages,
+    # Producer Settings
+    $request_required_acks        = -1,
+    $producer_type                = 'async',
+    $compression_codec            = 'snappy',
 
-    # TODO: check on names here, can we make these consistent?
-    $num_streams                           = $kafka::defaults::num_streams,
-    $num_producers                         = $kafka::defaults::num_producers,
-    $queue_size                            = $kafka::defaults::queue_size,
+    # Async Producer Settings
+    $batch_num_messages           = 200,
+    $queue_buffering_max_ms       = 5000,
+    $queue_buffering_max_messages = 10000,
+    $queue_enqueue_timeout_ms     = -1,
 
-    $producer_properties_template          = 
$kafka::defaults::producer_properties_template,
-    $default_template                      = 
$kafka::defaults::mirror_default_template
+    $jmx_port                     = 9998,
 
-) inherits kafka::defaults
+    $producer_properties_template = 'kafka/mirror/producer.properties.erb',
+    $systemd_service_template     = 'kafka/mirror/kafka-mirror.systemd.erb',
+    $default_template             = 'kafka/mirror/kafka-mirror.default.erb',
+    $log4j_properties_template    = 'kafka/log4j.properties.erb',
+)
 {
-    # Kafka class must be included before kafka::mirror.
-    # Using 'require' here rather than an explicit class dependency
-    # so that this class can be used without having to manually
-    # include the base kafka class.  This is for elegance only.
-    # You'd only need to manually include the base kafka class if
-    # you need to explicitly set the version of the Kafka package
-    # you want installed.
-    require ::kafka
-
-    package { 'kafka-mirror':
-        ensure => $::kafka::version
+    if (!$whitelist and !$blacklist) or ($whitelist and $blacklist) {
+        fail('Must set only one of $whitelist or $blacklist.')
     }
 
-    file { '/etc/default/kafka-mirror':
+    include kafka::mirror::init
+
+    $mirror_name = $title
+    file { "/etc/default/kafka-mirror-${mirror_name}":
         content => template($default_template),
         require => Package['kafka-mirror'],
     }
 
-    # Make sure /etc/kafka/mirror is a directory.
-    # MirrorMaker will read consumer and producer
-    # properties files out of this directory.
-    file { '/etc/kafka/mirror':
-        ensure => 'directory',
+    file { "/etc/kafka/mirror/${mirror_name}":
+        ensure  => 'directory',
+        recurse => true,
+        purge   => true,
+    }
+
+    # Log to custom log file for this MirrorMaker instance.
+    $kafka_log_file = "/var/log/kafka/kafka-mirror-${mirror_name}.log"
+    file { "/etc/kafka/mirror/${mirror_name}/log4j.properties":
+        content => template($log4j_properties_template),
+    }
+
+    file { "/etc/kafka/mirror/${mirror_name}/producer.properties":
+        content => template($producer_properties_template),
         require => Package['kafka-mirror'],
     }
 
-    # MirrorMaker will produce to this cluster
-    # of Kafka Brokers.
-    $brokers = $destination_brokers
-    file { '/etc/kafka/mirror/producer.properties':
-        content => template($producer_properties_template),
+    # Realize all consumer properties files for this MirrorMaker instance.
+    # --consumer.configs will be passed a wildcard matching all of the
+    # files in /etc/kafka/mirror/$mirror_name/consumer*.properties
+    File <| tag == "kafka-mirror-${mirror_name}-consumer" |>
+
+    # Render a systemd service unit file
+    file { "/lib/systemd/system/kafka-mirror-${mirror_name}.service":
+        content => template($systemd_service_template),
         require => Package['kafka-mirror'],
+    }
+
+    # systemd needs a reload to pick up changes to this file.
+    exec { "systemd-reload-for-kafka-mirror-${mirror_name}":
+        command     => '/bin/systemctl daemon-reload',
+        refreshonly => true,
+        subscribe   => 
File["/lib/systemd/system/kafka-mirror-${mirror_name}.service"],
     }
 
     # Start the Kafka MirrorMaker daemon.
     # We don't want to subscribe to the config files here.
     # It will be better to manually restart Kafka MirrorMaker
     # when the config files changes.
-    $kafka_mirror_ensure = $enabled ? {
+    $service_ensure = $enabled ? {
         false   => 'stopped',
         default => 'running',
     }
-    service { 'kafka-mirror':
-        ensure     => $kafka_mirror_ensure,
+    service { "kafka-mirror-${mirror_name}":
+        ensure     => $service_ensure,
         require    => [
-            File['/etc/kafka/mirror/producer.properties'],
-            File['/etc/default/kafka-mirror'],
+            File["/etc/kafka/mirror/${mirror_name}/producer.properties"],
+            Exec["systemd-reload-for-kafka-mirror-${mirror_name}"],
         ],
         hasrestart => true,
         hasstatus  => true,
diff --git a/manifests/mirror/consumer.pp b/manifests/mirror/consumer.pp
index faba482..51fa206 100644
--- a/manifests/mirror/consumer.pp
+++ b/manifests/mirror/consumer.pp
@@ -1,24 +1,17 @@
 # == Define kafka::mirror::consumer
-# Kafka MirrorMaker takes multiple consumer.property files.
-# Each one of these property files defines a Zookeeper URL
-# (hosts/chroot) from which information about a Kafka Broker
-# cluster may be read.  This is a define so that nay number
-# of source Kafka clusters may be configured.
+# kafka::mirror::consumers are grouped together by $mirror_name.
+# $mirror_name should match the title of the kafka::mirror instance
+# you want this consumer to be associated with.
 #
 # == Usage:
 #   kafka::mirror::consumer { 'clusterA':
-#       $zookeeper_hosts  => ['zk1.example.com', 'zk2.example.com', 
'zk3.example.com'],
-#       $zookeeper_chroot => '/kafka/clusterA',
+#       mirror_name   => 'aggregate',
+#       zookeeper_url => 'zk:2181/kafka/clusterA',
 #   }
 #
 # == Parameters:
-# $zookeeper_hosts                  - Array of zookeeper hostname/IP(:port)s.
-#
-# $zookeeper_chroot                 - Znode Path in zookeeper in which to keep 
Kafka data.
-#                                     Default: undef (the root znode).  Note 
that if you set
-#                                     this paramater, the Znode will not be 
created for you.
-#                                     You must do so manually yourself.  See 
the README
-#                                     for instructions on how to do so.
+# $zookeeper_url                    - URL to Kafka in zookeeper, including
+#                                     chroot.
 #
 # $zookeeper_connection_timeout_ms  - Timeout in ms for connecting to 
zookeeper.
 #                                     Default: 6000
@@ -26,17 +19,24 @@
 # $zookeeper_session_timeout_ms     - Timeout in ms for session to zookeeper.
 #                                     Default: 6000
 #
-# $consumer_group_id                - Consumer ID.  This will be used to save 
the
-#                                     consumed high water mark for this 
consumer
-#                                     in Zookeeper.
+# $auto_commit_enable               - If true, periodically commit to zookeeper
+#                                     the offset of messages already fetched by
+#                                     the consumer. Default: true
+# $auto_commit_interval_ms          - The frequency in ms that the consumer
+#                                     offsets are committed to zookeeper.
+#                                     Default: 6000
+# $auto_offset_reset                - smallest, largest, or throw exception.
+#                                     Default: largest
 #
 define kafka::mirror::consumer(
-    $zookeeper_hosts,
-    $zookeeper_chroot                   = undef,
+    $mirror_name,
+    $zookeeper_url,
     $zookeeper_connection_timeout_ms    = 6000,
     $zookeeper_session_timeout_ms       = 6000,
-    $consumer_group_id                  = "mirror${zookeeper_chroot}",
-    $consumer_properties_template       = 'kafka/consumer.properties.erb'
+    $auto_commit_enable                 = true,
+    $auto_commit_interval_ms            = 6000,
+    $auto_offset_reset                  = 'largest',
+    $consumer_properties_template       = 
'kafka/mirror/consumer.properties.erb'
 )
 {
     # Kafka class must be included before kafka::mirror::consumer.
@@ -48,8 +48,9 @@
     # you want installed.
     require ::kafka
 
-    file { "/etc/kafka/mirror/consumer.${name}.properties":
+    @file { "/etc/kafka/mirror/${$mirror_name}/consumer.${title}.properties":
         content => template($consumer_properties_template),
-        before  => Service['kafka-mirror'],
+        tag     => ["kafka-mirror-${mirror_name}-consumer"],
+        before  => Service["kafka-mirror-${mirror_name}"]
     }
-}
\ No newline at end of file
+}
diff --git a/manifests/mirror/init.pp b/manifests/mirror/init.pp
new file mode 100644
index 0000000..d0ecf2c
--- /dev/null
+++ b/manifests/mirror/init.pp
@@ -0,0 +1,38 @@
+# == Class kafka::mirror::init
+# This class only exists because kafka::mirror is a define.  These
+# resources are common to every kafka mirror define and we don't
+# want to declare them more than once.
+#
+class kafka::mirror::init {
+    # Kafka class must be included before kafka::mirror.
+    # Using 'require' here rather than an explicit class dependency
+    # so that this class can be used without having to manually
+    # include the base kafka class.  This is for elegance only.
+    # You'd only need to manually include the base kafka class if
+    # you need to explicitly set the version of the Kafka package
+    # you want installed.
+    require ::kafka
+
+    package { 'kafka-mirror':
+        ensure => $::kafka::version
+    }
+
+    # Remove the kafka-mirror .deb provided kafka-mirror files.
+    # This define will install instance specific ones.
+    file { [
+            '/etc/init.d/kafka-mirror',
+            '/lib/systemd/system/kafka-mirror.service',
+            '/etc/default/kafka-mirror',
+        ]:
+        ensure => 'absent'
+    }
+
+    # Have puppet manage totally manage this directory.
+    # Anything it doesn't know about will be removed.
+    file { '/etc/kafka/mirror':
+        ensure  => 'directory',
+        recurse => true,
+        purge   => true,
+        force   => true,
+    }
+}
diff --git a/manifests/mirror/jmxtrans.pp b/manifests/mirror/jmxtrans.pp
new file mode 100644
index 0000000..062cff4
--- /dev/null
+++ b/manifests/mirror/jmxtrans.pp
@@ -0,0 +1,259 @@
+# == Class kafka::mirror::jmxtrans
+# Sets up a jmxtrans instance for a Kafka MirrorMaker instance
+# running on the current host.
+# Note: This requires the jmxtrans puppet module found at
+# https://github.com/wikimedia/puppet-jmxtrans.
+#
+# == Parameters
+# $title         - Should be the same as the kafka::mirror instance you want
+#                  to monitor.
+# $jmx_port      - Kafka MirrorMaker JMX port
+# $ganglia       - Ganglia host:port
+# $graphite      - Graphite host:port
+# $statsd        - statsd host:port
+# $outfile       - outfile to which Kafka stats will be written.
+# $objects       - objects parameter to pass to jmxtrans::metrics.  Only use
+#                  this if you need to override the default ones that this
+#                  class provides.
+# group_prefix   - If set, this will be prefixed to all metrics.  Default: 
undef
+# $run_interval  - How often jmxtrans should run.        Default: 15
+# $log_level     - level at which jmxtrans should log.   Default: info
+#
+# == Usage
+# kafka::mirror::jmxtrans { 'aggregate':
+#     statsd => 'statsd.example.org:8125'
+# }
+#
+define kafka::mirror::jmxtrans(
+    $jmx_port       = 9998,
+    $ganglia        = undef,
+    $graphite       = undef,
+    $statsd         = undef,
+    $outfile        = undef,
+    $group_prefix   = undef,
+    $objects        = undef,
+    $run_interval   = 15,
+    $log_level      = 'info',
+)
+{
+    # NOTE: $title should match title of kafka::mirror
+    # instance (AKA $mirror_name).
+    $client_id = "kafka-mirror-${title}"
+    $jmx = "${::fqdn}:${jmx_port}"
+
+    # query for metrics from Kafka's JVM
+    jmxtrans::metrics::jvm { $jmx:
+        ganglia              => $ganglia,
+        graphite             => $graphite,
+        statsd               => $statsd,
+        outfile              => $outfile,
+        group_prefix         => $group_prefix,
+    }
+
+    # DRY up some often used JMX attributes.
+    $kafka_rate_jmx_attrs = {
+        'Count'             => { 'slope' => 'positive', 'bucketType' => 'g' },
+        'FifteenMinuteRate' => { 'slope' => 'both',     'bucketType' => 'g' },
+        'FiveMinuteRate'    => { 'slope' => 'both',     'bucketType' => 'g' },
+        'OneMinuteRate'     => { 'slope' => 'both',     'bucketType' => 'g' },
+        'MeanRate'          => { 'slope' => 'both',     'bucketType' => 'g' },
+    }
+    $kafka_timing_jmx_attrs = {
+        '50thPercentile'     => { 'slope' => 'both',     'bucketType' => 'g' },
+        '75ththPercentile'   => { 'slope' => 'both',     'bucketType' => 'g' },
+        '95thPercentile'     => { 'slope' => 'both',     'bucketType' => 'g' },
+        '98thPercentile'     => { 'slope' => 'both',     'bucketType' => 'g' },
+        '99thPercentile'     => { 'slope' => 'both',     'bucketType' => 'g' },
+        '999thPercentile'    => { 'slope' => 'both',     'bucketType' => 'g' },
+        'Count'              => { 'slope' => 'positive', 'bucketType' => 'g' },
+        'Max'                => { 'slope' => 'both',     'bucketType' => 'g' },
+        'Mean'               => { 'slope' => 'both',     'bucketType' => 'g' },
+        'Min'                => { 'slope' => 'both',     'bucketType' => 'g' },
+        'StdDev'             => { 'slope' => 'both',     'bucketType' => 'g' },
+    }
+    $kafka_value_jmx_attrs = {
+        'Value'             => { 'slope' => 'both',     'bucketType' => 'g' },
+    }
+
+    $kafka_objects = $objects ? {
+        # if $objects was not set, then use this as the
+        # default set of Kafka JMX MBean objects to query.
+        undef   => [
+            #
+            # DataChannel Metrics
+            #
+            {
+                'name'          => 
'kafka.tools:type=DataChannel,name=MirrorMaker-DataChannel-Size',
+                'resultAlias'   => 'kafka.tools.MirrorMaker.DataChannel.Size',
+                'attrs'         => $kafka_timing_jmx_attrs,
+            },
+            {
+                'name'          => 
'kafka.tools:type=DataChannel,name=MirrorMaker-DataChannel-WaitOnPut',
+                'resultAlias'   => 
'kafka.tools.MirrorMaker.DataChannel.WaitOnPut',
+                'attrs'         => $kafka_rate_jmx_attrs,
+            },
+            {
+                'name'          => 
'kafka.tools:type=DataChannel,name=MirrorMaker-DataChannel-WaitOnTake',
+                'resultAlias'   => 
'kafka.tools.MirrorMaker.DataChannel.WaitOnTake',
+                'attrs'         => $kafka_rate_jmx_attrs,
+            },
+
+            #
+            # Consumer Metrics
+            #
+
+            # ConsumerFetcheManager (MaxLag, MinFetchRate)
+            {
+                'name'          => 
"kafka.consumer:type=ConsumerFetcherManager,name=*,clientId=${client_id}",
+                'resultAlias'   => "kafka.consumer.ConsumerFetcherManager",
+                'typeNames'     => ['name'],
+                'attrs'         => $kafka_value_jmx_attrs,
+            },
+
+            # All Topic Consumer Metrics
+            {
+                'name'          => 
"kafka.consumer:type=ConsumerTopicMetrics,name=*,clientId=${client_id}",
+                'resultAlias'   => 
'kafka.consumer.ConsumerTopicMetrics-AllTopics',
+                'typeNames'     => ['name'],
+                'attrs'         => $kafka_rate_jmx_attrs,
+            },
+            # Per Topic Consumer Metrics
+            {
+                'name'          => 
"kafka.consumer:type=ConsumerTopicMetrics,name=BytesPerSec,clientId=${client_id},topic=*",
+                'resultAlias'   => 
'kafka.consumer.ConsumerTopicMetrics.BytesPerSec',
+                'typeNames'     => ['topic'],
+                'attrs'         => $kafka_rate_jmx_attrs,
+            },
+            {
+                'name'          => 
"kafka.consumer:type=ConsumerTopicMetrics,name=MessagesPerSec,clientId=${client_id},topic=*",
+                'resultAlias'   => 
'kafka.consumer.ConsumerTopicMetrics.MessagesPerSec',
+                'typeNames'     => ['topic'],
+                'attrs'         => $kafka_rate_jmx_attrs,
+            },
+
+            # Overall Consumer Fetch Request and Response Metrics
+            {
+                'name'          => 
"kafka.consumer:type=FetchRequestAndResponseMetrics,name=*,clientId=${client_id}",
+                'resultAlias'   => 
'kafka.consumer.FetchRequestAndResponseMetrics',
+                'typeNames'     => ['name'],
+                'attrs'         => $kafka_timing_jmx_attrs,
+            },
+            # Per Broker Fetch Request and Response Metrics
+            {
+                'name'          => 
"kafka.consumer:type=FetchRequestAndResponseMetrics,name=FetchRequestRateAndTimeMs,clientId=${client_id},brokerHost=*,brokerPort=*",
+                'resultAlias'   => 
'kafka.consumer.FetchRequestAndResponseMetrics.FetchRequestRateAndTimeMs',
+                'typeNames'     => ['brokerHost', 'brokerPort'],
+                'attrs'         => $kafka_timing_jmx_attrs,
+            },
+            {
+                'name'          => 
"kafka.consumer:type=FetchRequestAndResponseMetrics,name=FetchResponseSize,clientId=${client_id},brokerHost=*,brokerPort=*",
+                'resultAlias'   => 
'kafka.consumer.FetchRequestAndResponseMetrics.FetchResponseSize',
+                'attrs'         => $kafka_timing_jmx_attrs,
+                'attrs'         => $kafka_timing_jmx_attrs,
+            },
+
+            # Consumer Lag
+            {
+                'name'          => 
"kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=${client_id},topic=*,partition=*",
+                'resultAlias'   => 
'kafka.server.FetcherLagMetrics.ConsumerLag',
+                'typeNames'     => ['topic', 'partition'],
+                'attrs'         => $kafka_value_jmx_attrs,
+            },
+            # Fetcher Stats
+            {
+                'name'          => 
"kafka.server:type=FetcherStats,name=BytesPerSec,clientId=${client_id},brokerHost=*,brokerPort=*",
+                'resultAlias'   => 'kafka.server.FetcherStats.BytesPerSec',
+                'typeNames'     => ['brokerHost', 'brokerPort'],
+                'attrs'         => $kafka_rate_jmx_attrs,
+            },
+            {
+                'name'          => 
"kafka.server:type=FetcherStats,name=RequestsPerSec,clientId=${client_id},brokerHost=*,brokerPort=*",
+                'resultAlias'   => 'kafka.server.FetcherStats.RequestsPerSec',
+                'typeNames'     => ['brokerHost', 'brokerPort'],
+                'attrs'         => $kafka_rate_jmx_attrs,
+            },
+
+            # Commit Metrics
+            {
+                'name'          => 
"kafka.consumer:type=ZookeeperConsumerConnector,name=KafkaCommitsPerSec,clientId=${client_id}",
+                'resultAlias'   => 
'kafka.consumer.ZookeeperConsumerConnector.KafkaCommitsPerSec',
+                'attrs'         => $kafka_rate_jmx_attrs,
+            },
+            {
+                'name'          => 
"kafka.consumer:type=ZookeeperConsumerConnector,name=ZooKeeperCommitsPerSec,clientId=${client_id}",
+                'resultAlias'   => 
'kafka.consumer.ZookeeperConsumerConnector.ZooKeeperCommitsPerSec',
+                'attrs'         => $kafka_rate_jmx_attrs,
+            },
+
+
+            #
+            # Producer Topic Metrics
+            #
+
+            # Producer client.ids are suffixed with the producer number.
+            # This means that we can't clean up the resultAlias well for
+            # producer metrics.
+
+            # All Topic Producer Metrics
+            {
+                'name'          => 
'kafka.producer:type=ProducerTopicMetrics,name=BytesPerSec,clientId=*',
+                'resultAlias'   => 
'kafka.producer.ProducerTopicMetrics-AllTopics.BytesPerSec',
+                'typeNames'     => ['clientId'],
+                'attrs'         => $kafka_rate_jmx_attrs,
+            },
+            {
+                'name'          => 
'kafka.producer:type=ProducerTopicMetrics,name=DroppedMessagesPerSec,clientId=*',
+                'resultAlias'   => 
'kafka.producer.ProducerTopicMetrics-AllTopics.DroppedMessagesPerSec',
+                'typeNames'     => ['clientId'],
+                'attrs'         => $kafka_rate_jmx_attrs,
+            },
+            {
+                'name'          => 
'kafka.producer:type=ProducerTopicMetrics,name=MessagesPerSec,clientId=*',
+                'resultAlias'   => 
'kafka.producer.ProducerTopicMetrics-AllTopics.MessagesPerSec',
+                'typeNames'     => ['clientId'],
+                'attrs'         => $kafka_rate_jmx_attrs,
+            },
+            # Per Topic Producer Metrics
+            {
+                'name'          => 
'kafka.producer:type=ProducerTopicMetrics,name=BytesPerSec,clientId=*',
+                'resultAlias'   => 
'kafka.producer.ProducerTopicMetrics.BytesPerSec',
+                'typeNames'     => ['clientId', 'topic'],
+                'attrs'         => $kafka_rate_jmx_attrs,
+            },
+            {
+                'name'          => 
'kafka.producer:type=ProducerTopicMetrics,name=DroppedMessagesPerSec,clientId=*,topic=*',
+                'resultAlias'   => 
'kafka.producer.ProducerTopicMetrics.DroppedMessagesPerSec',
+                'typeNames'     => ['clientId', 'topic'],
+                'attrs'         => $kafka_rate_jmx_attrs,
+            },
+            {
+                'name'          => 
'kafka.producer:type=ProducerTopicMetrics,name=MessagesPerSec,clientId=*,topic=*',
+                'resultAlias'   => 
'kafka.producer.ProducerTopicMetrics.MessagesPerSec',
+                'typeNames'     => ['clientId', 'topic'],
+                'attrs'         => $kafka_rate_jmx_attrs,
+            },
+
+            # Async Producer Metrics
+            {
+                'name'          => 
'kafka.producer.async:type=ProducerSendThread,name=ProducerQueueSize,clientId=*',
+                'resultAlias'   => 
'kafka.producer.ProducerSendThread.ProducerQueueSize',
+                'typeNames'     => ['clientId'],
+                'attrs'         => $kafka_value_jmx_attrs,
+            },
+        ],
+        default => $objects,
+    }
+
+    # query kafka for jmx metrics
+    jmxtrans::metrics { "kafka-mirror-${client_id}-${jmx_port}":
+        jmx                  => $jmx,
+        outfile              => $outfile,
+        ganglia              => $ganglia,
+        ganglia_group_name   => "${group_prefix}kafka-mirror",
+        graphite             => $graphite,
+        graphite_root_prefix => "${group_prefix}kafka-mirror",
+        statsd               => $statsd,
+        statsd_root_prefix   => "${group_prefix}kafka-mirror",
+        objects              => $kafka_objects,
+    }
+}
diff --git a/manifests/mirror/monitoring.pp b/manifests/mirror/monitoring.pp
new file mode 100644
index 0000000..3acdfba
--- /dev/null
+++ b/manifests/mirror/monitoring.pp
@@ -0,0 +1,38 @@
+# == Define kafka::mirror::monitoring
+# Wikimedia Foundation specific monitoring class.
+# Only include this if you are using this kafka puppet module
+# with the Wikimedia puppet repository from
+# github.com/wikimedia/operations-puppet.
+#
+# == Parameters
+# $title               - Should be the same as the kafka::mirror instance
+#                        you want.
+#                        to monitor.
+# $jmx_port            - Default: 9998
+# $nagios_servicegroup - Nagios Service group to use for alerts.  Default: 
undef
+# $group_prefix        - $group_prefix passed to kafka::mirror::jmxtrans.
+#                        This will be used for graphite based alerts.
+#                        Default: undef
+#
+define kafka::mirror::monitoring(
+    $jmx_port            = 9998,
+    $nagios_servicegroup = undef,
+    $group_prefix        = undef,
+) {
+    # Generate icinga alert if Kafka Server is not running.
+    nrpe::monitor_service { "kafka-mirror-${title}":
+        description  => "Kafka MirrorMaker ${title}",
+        nrpe_command => "/usr/lib/nagios/plugins/check_procs -c 1:1 -C java  
--ereg-argument-array 
'kafka.tools.MirrorMaker.+/etc/kafka/mirror/${title}/producer\.properties'",
+        require      => Class['::kafka::server'],
+        critical     => true,
+    }
+
+    if !defined(Nrpe::Monitor_service['jmxtrans']) {
+        # Generate icinga alert if this jmxtrans instance is not running.
+        nrpe::monitor_service { 'jmxtrans':
+            description  => 'jmxtrans',
+            nrpe_command => '/usr/lib/nagios/plugins/check_procs -c 1:1 -C 
java --ereg-argument-array "-jar.+jmxtrans-all.jar"',
+            require      => Service['jmxtrans'],
+        }
+    }
+}
diff --git a/manifests/server/jmxtrans.pp b/manifests/server/jmxtrans.pp
index 5d247c0..7da01c4 100644
--- a/manifests/server/jmxtrans.pp
+++ b/manifests/server/jmxtrans.pp
@@ -13,6 +13,7 @@
 # $objects       - objects parameter to pass to jmxtrans::metrics.  Only use
 #                  this if you need to override the default ones that this
 #                  class provides.
+# group_prefix   - If set, this will be prefixed to all metrics.  Default: 
undef
 # $run_interval  - How often jmxtrans should run.        Default: 15
 # $log_level     - level at which jmxtrans should log.   Default: info
 # $legacy_jmx    - JMX Bean names changed in 0.8.2.  If this is true
diff --git a/manifests/server/monitoring.pp b/manifests/server/monitoring.pp
index 959a428..6210ef6 100644
--- a/manifests/server/monitoring.pp
+++ b/manifests/server/monitoring.pp
@@ -12,9 +12,9 @@
 #                        Default: undef
 #
 class kafka::server::monitoring(
-    $jmx_port = 9999,
+    $jmx_port            = 9999,
     $nagios_servicegroup = undef,
-    $group_prefix = undef,
+    $group_prefix        = undef,
 ) {
     # Generate icinga alert if Kafka Server is not running.
     nrpe::monitor_service { 'kafka':
@@ -24,15 +24,17 @@
         critical     => true,
     }
 
-    # Generate icinga alert if this jmxtrans instance is not running.
-    nrpe::monitor_service { 'jmxtrans':
-        description  => 'jmxtrans',
-        nrpe_command => '/usr/lib/nagios/plugins/check_procs -c 1:1 -C java 
--ereg-argument-array "-jar.+jmxtrans-all.jar"',
-        require      => Class['::kafka::server::jmxtrans'],
+    if !defined(Nrpe::Monitor_service['jmxtrans']) {
+        # Generate icinga alert if this jmxtrans instance is not running.
+        nrpe::monitor_service { 'jmxtrans':
+            description  => 'jmxtrans',
+            nrpe_command => '/usr/lib/nagios/plugins/check_procs -c 1:1 -C 
java --ereg-argument-array "-jar.+jmxtrans-all.jar"',
+            require      => Service['jmxtrans'],
+        }
     }
 
-    # jmxtrans statsd writer emits Kafka Broker fqdns in keys
-    # by substiting '.' with '_' and suffixing the Broker port.
+    # jmxtrans statsd writer emits fqdns in keys
+    # by substiting '.' with '_' and suffixing the jmx port.
     $graphite_broker_key = regsubst("${::fqdn}_${jmx_port}", '\.', '_', 'G')
 
     # Alert if any Kafka has under replicated partitions.
diff --git a/templates/consumer.properties.erb 
b/templates/consumer.properties.erb
deleted file mode 100644
index 346aa4c..0000000
--- a/templates/consumer.properties.erb
+++ /dev/null
@@ -1,23 +0,0 @@
-# Note: This file is managed by Puppet.
-
-#
-# see kafka.consumer.ConsumerConfig for more details
-#
-
-# Zookeeper connection string
-# comma separated host:port pairs, each corresponding to a zk
-# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-zookeeper.connect=<%= @zookeeper_hosts.sort.join(',') %><%= @zookeeper_chroot 
if @zookeeper_chroot %>
-
-# The max time that the client waits while establishing a connection to 
Zookeeper.
-zookeeper.connection.timeout.ms=<%= @zookeeper_connection_timeout_ms %>
-
-# ZooKeeper session timeout. If the consumer fails to heartbeat to Zookeeper
-# for this period of time it is considered dead and a rebalance will occur.
-zookeeper.session.timeout.ms=<%= @zookeeper_session_timeout_ms %>
-
-#consumer group id
-group.id=<%= @consumer_group_id %>
-
-#consumer timeout
-#consumer.timeout.ms=5000
diff --git a/templates/kafka-mirror.default.erb 
b/templates/kafka-mirror.default.erb
deleted file mode 100644
index cb47a43..0000000
--- a/templates/kafka-mirror.default.erb
+++ /dev/null
@@ -1,48 +0,0 @@
-# Note: This file is managed by Puppet.
-
-# whether to allow init.d script to start a Kafka MirrorMaker daemon ("yes", 
"no")
-KAFKA_MIRROR_START=yes
-
-# User and group to run as
-KAFKA_USER=kafka
-KAFKA_GROUP=kafka
-KAFKA_CONFIG=/etc/kafka
-
-# The default JMX_PORT for Kafka MirrorMaker is 9993.
-# Set JMX_PORT to something else to override this.
-JMX_PORT=<%= @jmx_port %>
-<% if @jmx_opts -%>
-KAFKA_JMX_OPTS=${KAFKA_JMX_OPTS:="<%= @jmx_opts %>"}
-<% else -%>
-#KAFKA_JMX_OPTS=${KAFKA_JMX_OPTS:="-Dcom.sun.management.jmxremote 
-Dcom.sun.management.jmxremote.authenticate=false 
-Dcom.sun.management.jmxremote.ssl=false"}
-<% end -%>
-
-# Memory sizes, and logging configuration
-# Memory sizes, and logging configuration
-<% if @heap_opts -%>
-KAFKA_HEAP_OPTS=<%= @heap_opts %>
-<% else -%>
-#KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
-<% end -%>
-#KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseCompressedOops -XX:+UseParNewGC 
-XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled 
-XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC"
-#KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${KAFKA_CONFIG}/log4j.properties"
-#KAFKA_OPTS=""
-
-#
-# MirrorMaker options:
-#
-
-# Set this to a space separated list of consumer.properties files.
-# By default, /etc/init.d/kafka-mirror will use any files that match
-# /etc/kafka/mirror/consumer.*
-#KAFKA_MIRROR_CONSUMER_CONFIGS=
-
-#KAFKA_MIRROR_PRODUCER_CONFIG=/etc/kafka/mirror/producer.properties
-
-# Only set one of the following.
-#KAFKA_MIRROR_WHITELIST='.*'
-#KAFKA_MIRROR_BLACKLIST=''
-
-#KAFKA_MIRROR_NUM_STREAMS=1
-#KAFKA_MIRROR_NUM_PRODUCERS=1
-#KAFKA_MIRROR_QUEUE_SIZE=10000
diff --git a/templates/log4j.properties.erb b/templates/log4j.properties.erb
index 40d7cfd..53460e6 100644
--- a/templates/log4j.properties.erb
+++ b/templates/log4j.properties.erb
@@ -10,7 +10,7 @@
 log4j.appender.fileAppender=org.apache.log4j.RollingFileAppender
 log4j.appender.fileAppender.File=<%= @kafka_log_file %>
 log4j.appender.fileAppender.MaxFileSize=256MB
-log4j.appender.fileAppender.MaxBackupIndex=<%= @log_max_backup_index %>
+log4j.appender.fileAppender.MaxBackupIndex=<%= @log_max_backup_index ? 
@log_max_backup_index : '4' %>
 log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout
 log4j.appender.fileAppender.layout.ConversionPattern=[%d] %-4r [%t] %-5p %c %x 
- %m%n
 log4j.appender.fileAppender.Threshold=INFO
diff --git a/templates/mirror/consumer.properties.erb 
b/templates/mirror/consumer.properties.erb
new file mode 100644
index 0000000..b81ff63
--- /dev/null
+++ b/templates/mirror/consumer.properties.erb
@@ -0,0 +1,36 @@
+# Note: This file is managed by Puppet.
+
+#
+# see kafka.consumer.ConsumerConfig for more details
+#
+
+# Kafka Consumer group id
+group.id=kafka-mirror-<%= @mirror_name %>
+
+# Zookeeper connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zookeeper.connect=<%= @zookeeper_url %>
+
+# The max time that the client waits while establishing a connection
+# to Zookeeper.
+zookeeper.connection.timeout.ms=<%= @zookeeper_connection_timeout_ms %>
+
+# ZooKeeper session timeout. If the consumer fails to heartbeat to Zookeeper
+# for this period of time it is considered dead and a rebalance will occur.
+zookeeper.session.timeout.ms=<%= @zookeeper_session_timeout_ms %>
+
+# If true, periodically commit to zookeeper the offset of messages already
+# fetched by the consumer. This committed offset will be used when the process
+# fails as the position from which the new consumer will begin.
+auto.commit.enable=<%= @auto_commit_enable %>
+
+# The frequency in ms that the consumer offsets are committed to zookeeper.
+auto.commit.interval.ms=<%= @auto_commit_interval_ms %>
+
+# What to do when there is no initial offset in Zookeeper or if an offset
+# is out of range:
+# * smallest: automatically reset the offset to the smallest offset
+# * largest: automatically reset the offset to the largest offset
+# * anything else: throw exception to the consumer.
+auto.offset.reset=<%= auto_offset_reset %>
diff --git a/templates/mirror/kafka-mirror.default.erb 
b/templates/mirror/kafka-mirror.default.erb
new file mode 100644
index 0000000..12dace6
--- /dev/null
+++ b/templates/mirror/kafka-mirror.default.erb
@@ -0,0 +1,28 @@
+# Note: This file is managed by Puppet.
+
+# whether to allow init.d script to start a Kafka MirrorMaker daemon ("yes", 
"no")
+KAFKA_MIRROR_START=yes
+
+# User and group to run as
+KAFKA_USER=kafka
+KAFKA_GROUP=kafka
+
+# The default JMX_PORT for Kafka MirrorMaker is 9998.
+# Set JMX_PORT to something else to override this.
+JMX_PORT=<%= @jmx_port %>
+<% if @jmx_opts -%>
+KAFKA_JMX_OPTS=${KAFKA_JMX_OPTS:="<%= @jmx_opts %>"}
+<% else -%>
+#KAFKA_JMX_OPTS=${KAFKA_JMX_OPTS:="-Dcom.sun.management.jmxremote 
-Dcom.sun.management.jmxremote.authenticate=false 
-Dcom.sun.management.jmxremote.ssl=false"}
+<% end -%>
+
+# Memory sizes, and logging configuration
+# Memory sizes, and logging configuration
+<% if @heap_opts -%>
+KAFKA_HEAP_OPTS=<%= @heap_opts %>
+<% else -%>
+#KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
+<% end -%>
+#KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseCompressedOops -XX:+UseParNewGC 
-XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled 
-XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC"
+KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:/etc/kafka/mirror/<%= 
@mirror_name %>/log4j.properties"
+#KAFKA_OPTS=""
diff --git a/templates/mirror/kafka-mirror.systemd.erb 
b/templates/mirror/kafka-mirror.systemd.erb
new file mode 100644
index 0000000..e37a605
--- /dev/null
+++ b/templates/mirror/kafka-mirror.systemd.erb
@@ -0,0 +1,28 @@
+[Unit]
+Description=Kafka MirrorMaker
+
+[Service]
+User=kafka
+Group=kafka
+
+# Set java.awt.headless=true if JAVA_OPTS is not set so the
+# Xalan XSL transformer can work without X11 display on JDK 1.4+
+Environment="JAVA_OPTS=-Djava.awt.headless=true"
+
+# Load any environment overrides from this file.
+EnvironmentFile=-/etc/default/kafka-mirror-<%= @mirror_name %>
+
+ExecStart=/usr/bin/kafka mirror-maker \
+<% if @whitelist -%>
+--whitelist '<%= @whitelist %>' \
+<% else -%>
+--blacklist '<%= @blacklist %>' \
+<% end -%>
+--num.producers <%= @num_producers %> \
+--num.streams <%= @num_streams %> \
+--queue.size <%= @queue_size %> \
+--producer.config /etc/kafka/mirror/<%= @mirror_name %>/producer.properties \
+--consumer.configs /etc/kafka/mirror/<%= @mirror_name %>/consumer*.properties
+
+[Install]
+WantedBy=multi-user.target
diff --git a/templates/mirror/producer.properties.erb 
b/templates/mirror/producer.properties.erb
new file mode 100644
index 0000000..ace250a
--- /dev/null
+++ b/templates/mirror/producer.properties.erb
@@ -0,0 +1,50 @@
+# Note: This file is managed by Puppet.
+
+#
+# see kafka.producer.ProducerConfig for more details
+#
+
+############################# Producer Basics #############################
+
+
+# list of brokers used for bootstrapping
+# format: host1:port1,host2:port2 ...
+metadata.broker.list=<%= Array(@destination_brokers).join(',') %>
+
+# Logically identifies the application making the request.
+client.id=kafka-mirror-<%= @mirror_name %>
+
+# specifies whether the messages are sent asynchronously (async) or 
synchronously (sync)
+producer.type=<%= @producer_type %>
+
+# Required number of acks
+request.required.acks=<%= @request_required_acks %>
+
+# specify the compression codec for all data generated: none , gzip, snappy.
+# the old config values work as well: 0, 1, 2 for none, gzip, snappy, 
respectivally
+compression.codec=<%= compression_codec %>
+
+<% if @producer_type == 'async' -%>
+############################# Async Producer #############################
+
+# Number of messages batched at the producer.  Messages are grouped together
+# and produced to partitions in batches of this number.
+batch.num.messages=<%= @batch_num_messages %>
+
+# Maximum time, in milliseconds, for buffering data on the producer queue
+queue.buffering.max.ms=<%= @queue_buffering_max_ms %>
+
+# Maximum size of the blocking queue for buffering on the producer
+# This is the size of the producer's queue.  If this fills up
+# and queue.enqueue.timeout.ms is positive, messages in this queue
+# will be dropped.  If queue.enqueue.timeout.ms is negative,
+# the producer will block until the queue is reduced.
+queue.buffering.max.messages=<%= @queue_buffering_max_messages %>
+
+# Timeout for event enqueue:
+# 0: events will be enqueued immediately or dropped if the queue is full
+# -ve: enqueue will block indefinitely if the queue is full
+# +ve: enqueue will block up to this many milliseconds if the queue is full
+queue.enqueue.timeout.ms=<%= @queue_enqueue_timeout_ms %>
+
+<% end -%>
diff --git a/templates/producer.properties.erb 
b/templates/producer.properties.erb
deleted file mode 100644
index b4e5618..0000000
--- a/templates/producer.properties.erb
+++ /dev/null
@@ -1,64 +0,0 @@
-# Note: This file is managed by Puppet.
-<%
-# @hosts is a hash of the form
-# {
-#   'hostA' => { 'id' => 1, 'port' => 12345 },
-#   'hostB' => { 'id' => 2 }
-# }
-# 'port' is not required to be specified in the hash,
-# so we fetch it with a default of 9092
-#
-# Sort hosts and loop through the resulting array
-# to get a list of ['hostname1:port1', 'hostname2:port2', ...]
-# for the broker.list setting.
-broker_list=[]
[email protected] do |broker_config|
-    hostname = broker_config[0]
-    port     = broker_config[1].fetch('port', 9092).to_s
-    broker_list += ["#{hostname}:#{port}"]
-end
--%>
-
-#
-# see kafka.producer.ProducerConfig for more details
-#
-
-############################# Producer Basics #############################
-
-
-# list of brokers used for bootstrapping
-# format: host1:port1,host2:port2 ...
-metadata.broker.list=<%= broker_list.join(',') %>
-
-# specifies whether the messages are sent asynchronously (async) or 
synchronously (sync)
-producer.type=<%= @producer_type %>
-
-# name of the partitioner class for partitioning events; default partition 
spreads data randomly
-#partitioner.class=
-
-# specify the compression codec for all data generated: none , gzip, snappy.
-# the old config values work as well: 0, 1, 2 for none, gzip, snappy, 
respectivally
-compression.codec=none
-
-# message encoder
-serializer.class=kafka.serializer.StringEncoder
-
-# allow topic level compression
-#compressed.topics=
-
-############################# Async Producer #############################
-
-# maximum time, in milliseconds, for buffering data on the producer queue
-#queue.buffering.max.ms=
-
-# the maximum size of the blocking queue for buffering on the producer
-#queue.buffering.max.messages=
-
-# Timeout for event enqueue:
-# 0: events will be enqueued immediately or dropped if the queue is full
-# -ve: enqueue will block indefinitely if the queue is full
-# +ve: enqueue will block up to this many milliseconds if the queue is full
-#queue.enqueue.timeout.ms=
-
-# the number of messages batched at the producer
-batch.num.messages=<%= @producer_batch_num_messages %>

-- 
To view, visit https://gerrit.wikimedia.org/r/265789
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I836e20a8f009ed89452630f95e570f02c522c837
Gerrit-PatchSet: 5
Gerrit-Project: operations/puppet/kafka
Gerrit-Branch: master
Gerrit-Owner: Ottomata <[email protected]>
Gerrit-Reviewer: Ottomata <[email protected]>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to