Ottomata has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/265789

Change subject: Refactor MirrorMaker puppetization
......................................................................

Refactor MirrorMaker puppetization

This change makes MirrorMaker work with the latest 0.8.2.1-4 jessie-wikimedia
.deb release at http://apt.wikimedia.org/wikimedia/pool/main/k/kafka/

Bug: T124077
Change-Id: I836e20a8f009ed89452630f95e570f02c522c837
---
M README.md
M manifests/defaults.pp
M manifests/mirror.pp
M manifests/mirror/consumer.pp
D templates/consumer.properties.erb
M templates/log4j.properties.erb
A templates/mirror/consumer.properties.erb
R templates/mirror/kafka-mirror.default.erb
A templates/mirror/producer.properties.erb
D templates/producer.properties.erb
10 files changed, 293 insertions(+), 233 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/operations/puppet/kafka 
refs/changes/89/265789/1

diff --git a/README.md b/README.md
index 31e28e0..fcf32ed 100644
--- a/README.md
+++ b/README.md
@@ -72,52 +72,12 @@
 `zookeeper_chroot` is optional, and allows you to specify a Znode under
 which Kafka will store its metadata in Zookeeper.  This is useful if you
 want to use a single Zookeeper cluster to manage multiple Kafka clusters.
-See below for information on how to create this Znode in Zookeeper.
 
-
-
-## Custom Zookeeper Chroot
-
-If Kafka will share a Zookeeper cluster with other users, you might want to
-create a Znode in zookeeper in which to store your Kafka cluster's data.
-You can set the `zookeeper_chroot` parameter on the `kafka` class to do this.
-
-First, you'll need to create the znode manually yourself.  You can use
-`zkCli.sh` that ships with Zookeeper, or you can use the kafka built in
-`zookeeper-shell`:
-
-
-```
-$ kafka zookeeper-shell <zookeeper_host>:2182
-Connecting to kraken-zookeeper
-Welcome to ZooKeeper!
-JLine support is enabled
-
-WATCHER::
-
-WatchedEvent state:SyncConnected type:None path:null
-[zk: kraken-zookeeper(CONNECTED) 0] create /my_kafka kafka
-Created /my_kafka
-```
-
-You can use whatever chroot znode path you like.  The second argument
-(`data`) is arbitrary.  I used 'kafka' here.
-
-Then:
-```puppet
-class { 'kafka::server':
-    brokers => {
-        'kafka-node01.example.com' => { 'id' => 1, 'port' => 12345 },
-        'kafka-node02.example.com' => { 'id' => 2 },
-    },
-    zookeeper_hosts => ['zk-node01:2181', 'zk-node02:2181', 'zk-node03:2181'],
-    # set zookeeper_chroot on the kafka class.
-    zookeeper_chroot => '/kafka/clusterA',
-}
-```
 
 ## Kafka Mirror
 
+TODO: UPDATE THIS XXX
+
 Kafka MirrorMaker is usually used for inter data center Kafka cluster 
replication
 and aggregation.  You can consume from any number of source Kafka clusters, and
 produce to a single destination Kafka cluster.
diff --git a/manifests/defaults.pp b/manifests/defaults.pp
index 54f168a..feb7ae9 100644
--- a/manifests/defaults.pp
+++ b/manifests/defaults.pp
@@ -23,9 +23,6 @@
 
     $kafka_log_file                      = '/var/log/kafka/kafka.log'
     $log_max_backup_index                = 4
-    $producer_type                       = 'async'
-    $producer_batch_num_messages         = 200
-    $consumer_group_id                   = 'test-consumer-group'
 
     # Broker Server settings
     $java_home                           = undef
@@ -69,24 +66,12 @@
     # Kafka package version.
     $version                             = 'installed'
 
-    # MirrorMaker settings
-    $topic_whitelist                     = '.*'
-    $topic_blacklist                     = undef
-    $num_streams                         = 1
-    $num_producers                       = 1
-    $queue_size                          = 10000
-
     # Default puppet paths to template config files.
     # This allows us to use custom template config files
     # if we want to override more settings than this
     # module yet supports.
-    $producer_properties_template        = 'kafka/producer.properties.erb'
-    $consumer_properties_template        = 'kafka/consumer.properties.erb'
     $log4j_properties_template           = 'kafka/log4j.properties.erb'
     $server_properties_template          = 'kafka/server.properties.erb'
     $server_systemd_override_template    = 
'kafka/kafka.systemd.override.conf.erb'
     $server_default_template             = 'kafka/kafka.default.erb'
-    $mirror_default_template             = 'kafka/kafka-mirror.default.erb'
-
-
 }
diff --git a/manifests/mirror.pp b/manifests/mirror.pp
index 7c3e5af..a23c1c6 100644
--- a/manifests/mirror.pp
+++ b/manifests/mirror.pp
@@ -1,44 +1,131 @@
 # == Class kafka::mirror
-# Sets up a Kafka MirrorMaker and ensures that it is running.
+# Sets up a Kafka MirrorMaker instance and ensures that it is running.
+# You must declare your kafka::mirror::consumers before this class.
 #
-# == Parameters:
+# NOTE: This does not work without systemd.  Make sure you are including
+# this define on a node that supports systemd.
+#
+# == Usage
+#
+#   # Mirror the 'main' and 'secondary' Kafka clusters
+#   # to the 'aggregate' Kafka cluster.
+#   kafka::mirror::consumer { 'main':
+#       mirror_name   => 'aggregate',
+#       zookeeper_url => 'zk:2181/kafka/main',
+#   }
+#   kafka::mirror::consumer { 'secondary':
+#       mirror_name   => 'aggregate',
+#       zookeeper_url => 'zk:2181/kafka/secondary',
+#   }
+#
+#   kafka::mirror { 'aggregate':
+#       destination_brokers => {
+#           'aggregateA' => { 'id' => 10 },
+#           'aggregateB' => { 'id' => 11 },
+#       },
+#       ...,
+#   }
+#
+# == Parameters
+#
+# $destination_brokers        - Hash of Kafka Broker to which you want to
+#                               produce configs keyed by fqdn of each
+#                               broker node.  This Hash should be of the form:
+#                               {
+#                                   'hostA' => { 'id' => 1, 'port' => 12345 },
+#                                   'hostB' => { 'id' => 2 },\
+#                                   ...
+#                               }
+#                               'port' is optional, and will default to 9092.
+#
 # $enabled                    - If false, Kafka Mirror Maker service will not 
be
 #                               started.  Default: true.
 #
-# $destination_brokers        - Hash of Kafka Broker to which you want to 
produce configs keyed by
-#                               fqdn of each kafka broker node.  This Hash
-#                               should be of the form:
-#                               { 'hostA' => { 'id' => 1, 'port' => 12345 }, 
'hostB' => { 'id' => 2 }, ... }
-#                               'port' is optional, and will default to 9092.
-
-# $jmx_port                    - Port on which to expose JMX metrics.  
Default: 9999
+# $topic_whitelist            - Java regex matching topics to mirror.
+#                               You must set either this or $topic_blacklist
+#                               Default: '.*'
 #
-class kafka::mirror(
-    $enabled                               = true,
-    $destination_brokers                   = $kafka::defaults::brokers,
+# $topic_blacklist            - Java regex matching topics to not mirror.
+#                               Default: undef
+#                               You must set either this or $topic_whitelist
+#
+# $num_producers               - Number of producer threads. Default: 1
+#
+# $num_streams                 - Number of consumer threads. Default: 1
+#
+# $queue_size                  - Size of intermediate consumer -> producer
+#                                queue.  Note that this is different than
+#                                $queue_buffering_max_messages, which is the
+#                                queue size of messages in async producers.
+#                                Default: 10000
+#
+# $heap_opts                   - Heap options to pass to JVM on startup.
+#                                Default: undef
+#
+# $request_required_acks       - Required number of acks for a produce request.
+#                                Default: -1 (all replicas)
+# $producer_type               - sync or async.  Default: async
+#
+# $compression_codec           - none, gzip, or snappy.  Default: snappy
+#
+# $batch_num_messages          - If async producer, the number of messages
+#                                to batch together in a single produce request.
+#                                Default: 200
+#
+# queue_buffering_max_ms       - Maximum time to buffer data when using async
+#                                mode. For example a setting of 100 will try to
+#                                batch together 100ms of messages to send at
+#                                once. Default: 5000
+#
+# queue_buffering_max_messages - The maximum number of unsent messages that can
+#                                be queued up the producer when using async
+#                                mode before either the producer must be
+#                                blocked or data must be dropped.
+#                                Default: 10000
+#
+# queue_enqueue_timeout_ms     - The amount of time to block before dropping
+#                                messages when running in async mode and the
+#                                buffer has reached
+#                                queue.buffering.max.messages. If set to 0
+#                                events will be enqueued immediately or dropped
+#                                if the queue is full (the producer send call
+#                                will never block). If set to -1 the producer
+#                                will block indefinitely and never willingly
+#                                drop a send. Default: -1
+#
+# $jmx_port                    - Port on which to expose MirrorMaker
+#                                JMX metrics. Default: 9998
+#
+define kafka::mirror(
+    $destination_brokers,
+    $enabled                      = true,
 
-    $jmx_port                              = $kafka::defaults::jmx_port,
+    $topic_whitelist              = '.*',
+    $topic_blacklist              = undef,
 
-    $topic_whitelist                       = $kafka::defaults::topic_whitelist,
-    $topic_blacklist                       = $kafka::defaults::topic_blacklist,
+    $num_producers                = 1,
+    $num_streams                  = 1,
+    $queue_size                   = 10000,
+    $heap_opts                    = undef,
 
-    $producer_type                         = $kafka::defaults::producer_type,
-    $producer_compression_codec            = 
$kafka::defaults::producer_compression_codec,
-    $producer_serializer_class             = 
$kafka::defaults::producer_serializer_class,
-    $producer_queue_buffering_max_ms       = 
$kafka::defaults::producer_queue_buffering_max_ms,
-    $producer_queue_buffering_max_messages = 
$kafka::defaults::producer_queue_buffering_max_messages,
-    $producer_queue_enqueue_timeout_ms     = 
$kafka::defaults::producer_queue_enqueue_timeout_ms,
-    $producer_batch_num_messages           = 
$kafka::defaults::producer_batch_num_messages,
+    # Producer Settings
+    $request_required_acks        = -1,
+    $producer_type                = 'async',
+    $compression_codec            = 'snappy',
 
-    # TODO: check on names here, can we make these consistent?
-    $num_streams                           = $kafka::defaults::num_streams,
-    $num_producers                         = $kafka::defaults::num_producers,
-    $queue_size                            = $kafka::defaults::queue_size,
+    # Async Producer Settings
+    $batch_num_messages           = 200,
+    $queue_buffering_max_ms       = 5000,
+    $queue_buffering_max_messages = 10000,
+    $queue_enqueue_timeout_ms     = -1,
 
-    $producer_properties_template          = 
$kafka::defaults::producer_properties_template,
-    $default_template                      = 
$kafka::defaults::mirror_default_template
+    $jmx_port                     = 9998,
 
-) inherits kafka::defaults
+    $producer_properties_template = 'kafka/mirror/producer.properties.erb',
+    $systemd_service_template     = 'kafka/mirror/kafka-mirror.systemd.erb',
+    $default_template             = 'kafka/mirror/kafka-mirror.default.erb',
+    $log4j_properties_template    = 'kafka/log4j.properties.erb',
+)
 {
     # Kafka class must be included before kafka::mirror.
     # Using 'require' here rather than an explicit class dependency
@@ -53,40 +140,68 @@
         ensure => $::kafka::version
     }
 
-    file { '/etc/default/kafka-mirror':
+    # Remove the kafka-mirror .deb provided kafka-mirror files.
+    # This define will install instance specific ones.
+    file { [
+        '/lib/systemd/system/kafka-mirror.service',
+        '/etc/default/kafka-mirror',
+        '/etc/kafka/mirror/log4j.properties',
+        ]:
+        ensure => 'absent'
+    }
+
+    $mirror_name = $title
+    file { "/etc/default/kafka-mirror-${mirror_name}":
         content => template($default_template),
         require => Package['kafka-mirror'],
     }
 
-    # Make sure /etc/kafka/mirror is a directory.
-    # MirrorMaker will read consumer and producer
-    # properties files out of this directory.
-    file { '/etc/kafka/mirror':
+    file { "/etc/kafka/mirror/${mirror_name}":
         ensure => 'directory',
+    }
+
+    # Log to custom log file for this MirrorMaker instance.
+    $kafka_log_file = "/var/log/kafka/kafka-mirror-${mirror_name}.log"
+    file { "/etc/kafka/mirror/${mirror_name}/log4j.properties":
+        content => template($log4j_properties_template),
+    }
+
+    file { "/etc/kafka/mirror/${mirror_name}/producer.properties":
+        content => template($producer_properties_template),
         require => Package['kafka-mirror'],
     }
 
-    # MirrorMaker will produce to this cluster
-    # of Kafka Brokers.
-    $brokers = $destination_brokers
-    file { '/etc/kafka/mirror/producer.properties':
-        content => template($producer_properties_template),
+    # Realize all consumer properties files for this MirrorMaker instance.
+    # --consumer.configs will be passed a wildcard matching all of the
+    # files in /etc/kafka/mirror/$mirror_name/consumer*.properties
+    File <| tag == "kafka-mirror-${mirror_name}-consumer" |>
+
+    # Render a systemd service unit file
+    file { "/lib/systemd/system/kafka-mirror-${mirror_name}.service":
+        content => template($systemd_service_template),
         require => Package['kafka-mirror'],
+    }
+
+    # systemd needs a reload to pick up changes to this file.
+    exec { "systemd-reload-for-kafka-mirror-${mirror_name}":
+        command     => '/bin/systemctl daemon-reload',
+        refreshonly => true,
+        subscribe   => 
File["/lib/systemd/system/kafka-mirror-${mirror_name}.service"],
     }
 
     # Start the Kafka MirrorMaker daemon.
     # We don't want to subscribe to the config files here.
     # It will be better to manually restart Kafka MirrorMaker
     # when the config files changes.
-    $kafka_mirror_ensure = $enabled ? {
+    $service_ensure = $enabled ? {
         false   => 'stopped',
         default => 'running',
     }
-    service { 'kafka-mirror':
-        ensure     => $kafka_mirror_ensure,
+    service { "kafka-mirror-${mirror_name}":
+        ensure     => $service_ensure,
         require    => [
-            File['/etc/kafka/mirror/producer.properties'],
-            File['/etc/default/kafka-mirror'],
+            File["/etc/kafka/mirror/${mirror_name}/producer.properties"],
+            Exec["systemd-reload-for-kafka-mirror-${mirror_name}"],
         ],
         hasrestart => true,
         hasstatus  => true,
diff --git a/manifests/mirror/consumer.pp b/manifests/mirror/consumer.pp
index faba482..51fa206 100644
--- a/manifests/mirror/consumer.pp
+++ b/manifests/mirror/consumer.pp
@@ -1,24 +1,17 @@
 # == Define kafka::mirror::consumer
-# Kafka MirrorMaker takes multiple consumer.property files.
-# Each one of these property files defines a Zookeeper URL
-# (hosts/chroot) from which information about a Kafka Broker
-# cluster may be read.  This is a define so that nay number
-# of source Kafka clusters may be configured.
+# kafka::mirror::consumers are grouped together by $mirror_name.
+# $mirror_name should match the title of the kafka::mirror instance
+# you want this consumer to be associated with.
 #
 # == Usage:
 #   kafka::mirror::consumer { 'clusterA':
-#       $zookeeper_hosts  => ['zk1.example.com', 'zk2.example.com', 
'zk3.example.com'],
-#       $zookeeper_chroot => '/kafka/clusterA',
+#       mirror_name   => 'aggregate',
+#       zookeeper_url => 'zk:2181/kafka/clusterA',
 #   }
 #
 # == Parameters:
-# $zookeeper_hosts                  - Array of zookeeper hostname/IP(:port)s.
-#
-# $zookeeper_chroot                 - Znode Path in zookeeper in which to keep 
Kafka data.
-#                                     Default: undef (the root znode).  Note 
that if you set
-#                                     this paramater, the Znode will not be 
created for you.
-#                                     You must do so manually yourself.  See 
the README
-#                                     for instructions on how to do so.
+# $zookeeper_url                    - URL to Kafka in zookeeper, including
+#                                     chroot.
 #
 # $zookeeper_connection_timeout_ms  - Timeout in ms for connecting to 
zookeeper.
 #                                     Default: 6000
@@ -26,17 +19,24 @@
 # $zookeeper_session_timeout_ms     - Timeout in ms for session to zookeeper.
 #                                     Default: 6000
 #
-# $consumer_group_id                - Consumer ID.  This will be used to save 
the
-#                                     consumed high water mark for this 
consumer
-#                                     in Zookeeper.
+# $auto_commit_enable               - If true, periodically commit to zookeeper
+#                                     the offset of messages already fetched by
+#                                     the consumer. Default: true
+# $auto_commit_interval_ms          - The frequency in ms that the consumer
+#                                     offsets are committed to zookeeper.
+#                                     Default: 6000
+# $auto_offset_reset                - smallest, largest, or throw exception.
+#                                     Default: largest
 #
 define kafka::mirror::consumer(
-    $zookeeper_hosts,
-    $zookeeper_chroot                   = undef,
+    $mirror_name,
+    $zookeeper_url,
     $zookeeper_connection_timeout_ms    = 6000,
     $zookeeper_session_timeout_ms       = 6000,
-    $consumer_group_id                  = "mirror${zookeeper_chroot}",
-    $consumer_properties_template       = 'kafka/consumer.properties.erb'
+    $auto_commit_enable                 = true,
+    $auto_commit_interval_ms            = 6000,
+    $auto_offset_reset                  = 'largest',
+    $consumer_properties_template       = 
'kafka/mirror/consumer.properties.erb'
 )
 {
     # Kafka class must be included before kafka::mirror::consumer.
@@ -48,8 +48,9 @@
     # you want installed.
     require ::kafka
 
-    file { "/etc/kafka/mirror/consumer.${name}.properties":
+    @file { "/etc/kafka/mirror/${$mirror_name}/consumer.${title}.properties":
         content => template($consumer_properties_template),
-        before  => Service['kafka-mirror'],
+        tag     => ["kafka-mirror-${mirror_name}-consumer"],
+        before  => Service["kafka-mirror-${mirror_name}"]
     }
-}
\ No newline at end of file
+}
diff --git a/templates/consumer.properties.erb 
b/templates/consumer.properties.erb
deleted file mode 100644
index 346aa4c..0000000
--- a/templates/consumer.properties.erb
+++ /dev/null
@@ -1,23 +0,0 @@
-# Note: This file is managed by Puppet.
-
-#
-# see kafka.consumer.ConsumerConfig for more details
-#
-
-# Zookeeper connection string
-# comma separated host:port pairs, each corresponding to a zk
-# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-zookeeper.connect=<%= @zookeeper_hosts.sort.join(',') %><%= @zookeeper_chroot 
if @zookeeper_chroot %>
-
-# The max time that the client waits while establishing a connection to 
Zookeeper.
-zookeeper.connection.timeout.ms=<%= @zookeeper_connection_timeout_ms %>
-
-# ZooKeeper session timeout. If the consumer fails to heartbeat to Zookeeper
-# for this period of time it is considered dead and a rebalance will occur.
-zookeeper.session.timeout.ms=<%= @zookeeper_session_timeout_ms %>
-
-#consumer group id
-group.id=<%= @consumer_group_id %>
-
-#consumer timeout
-#consumer.timeout.ms=5000
diff --git a/templates/log4j.properties.erb b/templates/log4j.properties.erb
index 40d7cfd..53460e6 100644
--- a/templates/log4j.properties.erb
+++ b/templates/log4j.properties.erb
@@ -10,7 +10,7 @@
 log4j.appender.fileAppender=org.apache.log4j.RollingFileAppender
 log4j.appender.fileAppender.File=<%= @kafka_log_file %>
 log4j.appender.fileAppender.MaxFileSize=256MB
-log4j.appender.fileAppender.MaxBackupIndex=<%= @log_max_backup_index %>
+log4j.appender.fileAppender.MaxBackupIndex=<%= @log_max_backup_index ? 
@log_max_backup_index : '4' %>
 log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout
 log4j.appender.fileAppender.layout.ConversionPattern=[%d] %-4r [%t] %-5p %c %x 
- %m%n
 log4j.appender.fileAppender.Threshold=INFO
diff --git a/templates/mirror/consumer.properties.erb 
b/templates/mirror/consumer.properties.erb
new file mode 100644
index 0000000..a8022b2
--- /dev/null
+++ b/templates/mirror/consumer.properties.erb
@@ -0,0 +1,36 @@
+# Note: This file is managed by Puppet.
+
+#
+# see kafka.consumer.ConsumerConfig for more details
+#
+
+# Kafka Consumer group id
+group.id=kafka-mirror-<%= @mirror_name %>
+
+# Zookeeper connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zookeeper.connect=<%= @zookeeper_url %>
+
+# The max time that the client waits while establishing a connection
+# to Zookeeper.
+zookeeper.connection.timeout.ms=<%= @zookeeper_connection_timeout_ms %>
+
+# ZooKeeper session timeout. If the consumer fails to heartbeat to Zookeeper
+# for this period of time it is considered dead and a rebalance will occur.
+zookeeper.session.timeout.ms=<%= @zookeeper_session_timeout_ms %>
+
+# If true, periodically commit to zookeeper the offset of messages already
+# fetched by the consumer. This committed offset will be used when the process
+# fails as the position from which the new consumer will begin.
+auto.commit.enable=<%= @auto_commit_enable %>
+
+# The frequency in ms that the consumer offsets are committed to zookeeper.
+auto.commit.interval.ms=<% @auto_commit_interval_ms %>
+
+# What to do when there is no initial offset in Zookeeper or if an offset
+# is out of range:
+# * smallest: automatically reset the offset to the smallest offset
+# * largest: automatically reset the offset to the largest offset
+# * anything else: throw exception to the consumer.
+auto.offset.reset=<%= auto_offset_reset %>
diff --git a/templates/kafka-mirror.default.erb 
b/templates/mirror/kafka-mirror.default.erb
similarity index 64%
rename from templates/kafka-mirror.default.erb
rename to templates/mirror/kafka-mirror.default.erb
index cb47a43..0a8409f 100644
--- a/templates/kafka-mirror.default.erb
+++ b/templates/mirror/kafka-mirror.default.erb
@@ -25,24 +25,5 @@
 #KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
 <% end -%>
 #KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseCompressedOops -XX:+UseParNewGC 
-XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled 
-XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC"
-#KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${KAFKA_CONFIG}/log4j.properties"
+KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${KAFKA_CONFIG}/kafka-mirror-<%= 
@mirror_name %>/log4j.properties"
 #KAFKA_OPTS=""
-
-#
-# MirrorMaker options:
-#
-
-# Set this to a space separated list of consumer.properties files.
-# By default, /etc/init.d/kafka-mirror will use any files that match
-# /etc/kafka/mirror/consumer.*
-#KAFKA_MIRROR_CONSUMER_CONFIGS=
-
-#KAFKA_MIRROR_PRODUCER_CONFIG=/etc/kafka/mirror/producer.properties
-
-# Only set one of the following.
-#KAFKA_MIRROR_WHITELIST='.*'
-#KAFKA_MIRROR_BLACKLIST=''
-
-#KAFKA_MIRROR_NUM_STREAMS=1
-#KAFKA_MIRROR_NUM_PRODUCERS=1
-#KAFKA_MIRROR_QUEUE_SIZE=10000
diff --git a/templates/mirror/producer.properties.erb 
b/templates/mirror/producer.properties.erb
new file mode 100644
index 0000000..4a30410
--- /dev/null
+++ b/templates/mirror/producer.properties.erb
@@ -0,0 +1,69 @@
+# Note: This file is managed by Puppet.
+<%
+# @hosts is a hash of the form
+# {
+#   'hostA' => { 'id' => 1, 'port' => 12345 },
+#   'hostB' => { 'id' => 2 }
+# }
+# 'port' is not required to be specified in the hash,
+# so we fetch it with a default of 9092
+#
+# Sort hosts and loop through the resulting array
+# to get a list of ['hostname1:port1', 'hostname2:port2', ...]
+# for the broker.list setting.
+broker_list=[]
+@destination_brokers.sort.each do |broker_config|
+    hostname = broker_config[0]
+    port     = broker_config[1].fetch('port', 9092).to_s
+    broker_list += ["#{hostname}:#{port}"]
+end
+-%>
+
+#
+# see kafka.producer.ProducerConfig for more details
+#
+
+############################# Producer Basics #############################
+
+
+# list of brokers used for bootstrapping
+# format: host1:port1,host2:port2 ...
+metadata.broker.list=<%= broker_list.join(',') %>
+
+# Logically identifies the application making the request.
+client.id=kafka-mirror-<%= @mirror_name %>
+
+# specifies whether the messages are sent asynchronously (async) or 
synchronously (sync)
+producer.type=<%= @producer_type %>
+
+# Required number of acks
+request.required.acks=<%= @request_required_acks %>
+
+# specify the compression codec for all data generated: none , gzip, snappy.
+# the old config values work as well: 0, 1, 2 for none, gzip, snappy, 
respectivally
+compression.codec=<%= compression_codec %>
+
+<% if @producer_type == 'async' -%>
+############################# Async Producer #############################
+
+# Number of messages batched at the producer.  Messages are grouped together
+# and produced to partitions in batches of this number.
+batch.num.messages=<%= @batch_num_messages %>
+
+# Maximum time, in milliseconds, for buffering data on the producer queue
+queue.buffering.max.ms=<%= @queue_buffering_max_ms %>
+
+# Maximum size of the blocking queue for buffering on the producer
+# This is the size of the producer's queue.  If this fills up
+# and queue.enqueue.timeout.ms is positive, messages in this queue
+# will be dropped.  If queue.enqueue.timeout.ms is negative,
+# the producer will block until the queue is reduced.
+queue.buffering.max.messages=<%= @queue_buffering_max_messages %>
+
+# Timeout for event enqueue:
+# 0: events will be enqueued immediately or dropped if the queue is full
+# -ve: enqueue will block indefinitely if the queue is full
+# +ve: enqueue will block up to this many milliseconds if the queue is full
+queue.enqueue.timeout.ms=<%= @queue_enqueue_timeout_ms %>
+
+<% end -%>
diff --git a/templates/producer.properties.erb 
b/templates/producer.properties.erb
deleted file mode 100644
index b4e5618..0000000
--- a/templates/producer.properties.erb
+++ /dev/null
@@ -1,64 +0,0 @@
-# Note: This file is managed by Puppet.
-<%
-# @hosts is a hash of the form
-# {
-#   'hostA' => { 'id' => 1, 'port' => 12345 },
-#   'hostB' => { 'id' => 2 }
-# }
-# 'port' is not required to be specified in the hash,
-# so we fetch it with a default of 9092
-#
-# Sort hosts and loop through the resulting array
-# to get a list of ['hostname1:port1', 'hostname2:port2', ...]
-# for the broker.list setting.
-broker_list=[]
[email protected] do |broker_config|
-    hostname = broker_config[0]
-    port     = broker_config[1].fetch('port', 9092).to_s
-    broker_list += ["#{hostname}:#{port}"]
-end
--%>
-
-#
-# see kafka.producer.ProducerConfig for more details
-#
-
-############################# Producer Basics #############################
-
-
-# list of brokers used for bootstrapping
-# format: host1:port1,host2:port2 ...
-metadata.broker.list=<%= broker_list.join(',') %>
-
-# specifies whether the messages are sent asynchronously (async) or 
synchronously (sync)
-producer.type=<%= @producer_type %>
-
-# name of the partitioner class for partitioning events; default partition 
spreads data randomly
-#partitioner.class=
-
-# specify the compression codec for all data generated: none , gzip, snappy.
-# the old config values work as well: 0, 1, 2 for none, gzip, snappy, 
respectivally
-compression.codec=none
-
-# message encoder
-serializer.class=kafka.serializer.StringEncoder
-
-# allow topic level compression
-#compressed.topics=
-
-############################# Async Producer #############################
-
-# maximum time, in milliseconds, for buffering data on the producer queue
-#queue.buffering.max.ms=
-
-# the maximum size of the blocking queue for buffering on the producer
-#queue.buffering.max.messages=
-
-# Timeout for event enqueue:
-# 0: events will be enqueued immediately or dropped if the queue is full
-# -ve: enqueue will block indefinitely if the queue is full
-# +ve: enqueue will block up to this many milliseconds if the queue is full
-#queue.enqueue.timeout.ms=
-
-# the number of messages batched at the producer
-batch.num.messages=<%= @producer_batch_num_messages %>

-- 
To view, visit https://gerrit.wikimedia.org/r/265789
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I836e20a8f009ed89452630f95e570f02c522c837
Gerrit-PatchSet: 1
Gerrit-Project: operations/puppet/kafka
Gerrit-Branch: master
Gerrit-Owner: Ottomata <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to