Ottomata has submitted this change and it was merged. Change subject: Refactor MirrorMaker puppetization ......................................................................
Refactor MirrorMaker puppetization This change makes MirrorMaker work with the latest 0.8.2.1-4 jessie-wikimedia .deb release at http://apt.wikimedia.org/wikimedia/pool/main/k/kafka/ Adds a kafka::mirror::jmxtrans and kafka::mirror::monitoring defines. Bug: T124077 Change-Id: I836e20a8f009ed89452630f95e570f02c522c837 --- M README.md M manifests/defaults.pp M manifests/mirror.pp M manifests/mirror/consumer.pp A manifests/mirror/init.pp A manifests/mirror/jmxtrans.pp A manifests/mirror/monitoring.pp M manifests/server/jmxtrans.pp M manifests/server/monitoring.pp D templates/consumer.properties.erb D templates/kafka-mirror.default.erb M templates/log4j.properties.erb A templates/mirror/consumer.properties.erb A templates/mirror/kafka-mirror.default.erb A templates/mirror/kafka-mirror.systemd.erb A templates/mirror/producer.properties.erb D templates/producer.properties.erb 17 files changed, 697 insertions(+), 309 deletions(-) Approvals: Ottomata: Looks good to me, approved jenkins-bot: Verified diff --git a/README.md b/README.md index 31e28e0..7543730 100644 --- a/README.md +++ b/README.md @@ -72,82 +72,36 @@ `zookeeper_chroot` is optional, and allows you to specify a Znode under which Kafka will store its metadata in Zookeeper. This is useful if you want to use a single Zookeeper cluster to manage multiple Kafka clusters. -See below for information on how to create this Znode in Zookeeper. - - -## Custom Zookeeper Chroot - -If Kafka will share a Zookeeper cluster with other users, you might want to -create a Znode in zookeeper in which to store your Kafka cluster's data. -You can set the `zookeeper_chroot` parameter on the `kafka` class to do this. - -First, you'll need to create the znode manually yourself. You can use -`zkCli.sh` that ships with Zookeeper, or you can use the kafka built in -`zookeeper-shell`: - - -``` -$ kafka zookeeper-shell <zookeeper_host>:2182 -Connecting to kraken-zookeeper -Welcome to ZooKeeper! -JLine support is enabled - -WATCHER:: - -WatchedEvent state:SyncConnected type:None path:null -[zk: kraken-zookeeper(CONNECTED) 0] create /my_kafka kafka -Created /my_kafka -``` - -You can use whatever chroot znode path you like. The second argument -(`data`) is arbitrary. I used 'kafka' here. - -Then: -```puppet -class { 'kafka::server': - brokers => { - 'kafka-node01.example.com' => { 'id' => 1, 'port' => 12345 }, - 'kafka-node02.example.com' => { 'id' => 2 }, - }, - zookeeper_hosts => ['zk-node01:2181', 'zk-node02:2181', 'zk-node03:2181'], - # set zookeeper_chroot on the kafka class. - zookeeper_chroot => '/kafka/clusterA', -} -``` ## Kafka Mirror -Kafka MirrorMaker is usually used for inter data center Kafka cluster replication -and aggregation. You can consume from any number of source Kafka clusters, and -produce to a single destination Kafka cluster. +Kafka MirrorMaker will allow you to mirror data from multiple Kafka clusters +into another. This is useful for cross DC replication and for aggregation. ```puppet -# Configure kafka-mirror to produce to Kafka Brokers which are -# part of our kafka aggregator cluster. -class { 'kafka::mirror': - destination_brokers => { - 'kafka-aggregator01.example.com' => { 'id' => 11 }, - 'kafka-aggregator02.example.com' => { 'id' => 12 }, - }, - topic_whitelist => 'webrequest.*', +# Mirror the 'main' and 'secondary' Kafka clusters +# to the 'aggregate' Kafka cluster. +kafka::mirror::consumer { 'main': + mirror_name => 'aggregate', + zookeeper_url => 'zk:2181/kafka/main', } - -# Configure kafka-mirror to consume from both clusterA and clusterB -kafka::mirror::consumer { 'clusterA': - zookeeper_hosts => ['zk-node01:2181', 'zk-node02:2181', 'zk-node03:2181'], - zookeeper_chroot => ['/kafka/clusterA'], +kafka::mirror::consumer { 'secondary': + mirror_name => 'aggregate', + zookeeper_url => 'zk:2181/kafka/secondary', } -kafka::mirror::consumer { 'clusterB': - zookeeper_hosts => ['zk-node01:2181', 'zk-node02:2181', 'zk-node03:2181'], - zookeeper_chroot => ['/kafka/clusterB'], +kafka::mirror { 'aggregate': + destination_brokers => ['ka01:9092','ka02:9092'], + whitelist => 'these_topics_only.*', } ``` +Note that the kafka-mirror service does not subscribe to its config files. If +you make changes, you will have to restart the service manually. ## jmxtrans monitoring -This module contains a class called `kafka::server::jmxtrans`. It contains -a useful jmxtrans JSON config object that can be used to tell jmxtrans to send +`kafka::server::jmxtrans` and `kafka::mirror::jmxtrans` configure +useful jmxtrans JSON config objects that can be used to tell jmxtrans to send to any output writer (Ganglia, Graphite, etc.). To you use this, you will need the [puppet-jmxtrans](https://github.com/wikimedia/puppet-jmxtrans) module. @@ -157,8 +111,17 @@ ganglia => 'ganglia.example.com:8649', } ``` - -This will install jmxtrans and start render JSON config files for sending +This will install jmxtrans and render JSON config files for sending JVM and Kafka Broker stats to Ganglia. See [kafka-jmxtrans.json.md](kafka-jmxtrans.json.md) for a fully -rendered jmxtrans Kafka JSON config file. +rendered jmxtrans Kafka Broker JSON config file. + +``` +# Declare this define on hosts where you run Kafka MirrorMaker. +kafka::mirror::jmxtrans { 'aggregate': + statsd => 'statsd.example.org:8125' +} + +``` +This will install jmxtrans and render JSON config files for sending JVM and +Kafka MirrorMaker (consumers and producer) stats to statsd. diff --git a/manifests/defaults.pp b/manifests/defaults.pp index 3808562..1b657ff 100644 --- a/manifests/defaults.pp +++ b/manifests/defaults.pp @@ -23,9 +23,6 @@ $kafka_log_file = '/var/log/kafka/kafka.log' $log_max_backup_index = 4 - $producer_type = 'async' - $producer_batch_num_messages = 200 - $consumer_group_id = 'test-consumer-group' # Broker Server settings $java_home = undef @@ -70,24 +67,12 @@ # Kafka package version. $version = 'installed' - # MirrorMaker settings - $topic_whitelist = '.*' - $topic_blacklist = undef - $num_streams = 1 - $num_producers = 1 - $queue_size = 10000 - # Default puppet paths to template config files. # This allows us to use custom template config files # if we want to override more settings than this # module yet supports. - $producer_properties_template = 'kafka/producer.properties.erb' - $consumer_properties_template = 'kafka/consumer.properties.erb' $log4j_properties_template = 'kafka/log4j.properties.erb' $server_properties_template = 'kafka/server.properties.erb' $server_systemd_override_template = 'kafka/kafka.systemd.override.conf.erb' $server_default_template = 'kafka/kafka.default.erb' - $mirror_default_template = 'kafka/kafka-mirror.default.erb' - - } diff --git a/manifests/mirror.pp b/manifests/mirror.pp index 7c3e5af..d9885d7 100644 --- a/manifests/mirror.pp +++ b/manifests/mirror.pp @@ -1,92 +1,186 @@ -# == Class kafka::mirror -# Sets up a Kafka MirrorMaker and ensures that it is running. +# == Define kafka::mirror +# Sets up a Kafka MirrorMaker instance and ensures that it is running. +# You must declare your kafka::mirror::consumers before this class. # -# == Parameters: -# $enabled - If false, Kafka Mirror Maker service will not be -# started. Default: true. +# NOTE: This does not work without systemd. Make sure you are including +# this define on a node that supports systemd. # -# $destination_brokers - Hash of Kafka Broker to which you want to produce configs keyed by -# fqdn of each kafka broker node. This Hash -# should be of the form: -# { 'hostA' => { 'id' => 1, 'port' => 12345 }, 'hostB' => { 'id' => 2 }, ... } -# 'port' is optional, and will default to 9092. - -# $jmx_port - Port on which to expose JMX metrics. Default: 9999 +# TODO: support ensure => 'absent' # -class kafka::mirror( - $enabled = true, - $destination_brokers = $kafka::defaults::brokers, +# == Usage +# +# # Mirror the 'main' and 'secondary' Kafka clusters +# # to the 'aggregate' Kafka cluster. +# kafka::mirror::consumer { 'main': +# mirror_name => 'aggregate', +# zookeeper_url => 'zk:2181/kafka/main', +# } +# kafka::mirror::consumer { 'secondary': +# mirror_name => 'aggregate', +# zookeeper_url => 'zk:2181/kafka/secondary', +# } +# +# kafka::mirror { 'aggregate': +# destination_brokers => ['ka01:9092','ka02:9092'] +# ..., +# } +# +# == Parameters +# +# $destination_brokers - Array of Kafka brokers hosts in your +# destination cluster. These brokers +# will be used for bootstrapping the producers +# configs and metadata. +# +# $enabled - If false, kafka mirror-maker service will not +# be started. Default: true. +# +# $whitelist - Java regex matching topics to mirror. +# You must set either this or $topic_blacklist +# Default: '.*' +# +# $blacklist - Java regex matching topics to not mirror. +# Default: undef +# You must set either this or $topic_whitelist +# +# $num_producers - Number of producer threads. Default: 1 +# +# $num_streams - Number of consumer threads. Default: 1 +# +# $queue_size - Size of intermediate consumer -> producer +# queue. Note that this is different than +# $queue_buffering_max_messages, which is the +# queue size of messages in async producers. +# Default: 10000 +# +# $heap_opts - Heap options to pass to JVM on startup. +# Default: undef +# +# $request_required_acks - Required number of acks for a produce request. +# Default: -1 (all replicas) +# $producer_type - sync or async. Default: async +# +# $compression_codec - none, gzip, or snappy. Default: snappy +# +# $batch_num_messages - If async producer, the number of messages +# to batch together in a single produce request. +# Default: 200 +# +# queue_buffering_max_ms - Maximum time to buffer data when using async +# mode. For example a setting of 100 will try to +# batch together 100ms of messages to send at +# once. Default: 5000 +# +# queue_buffering_max_messages - The maximum number of unsent messages that can +# be queued up the producer when using async +# mode before either the producer must be +# blocked or data must be dropped. +# Default: 10000 +# +# queue_enqueue_timeout_ms - The amount of time to block before dropping +# messages when running in async mode and the +# buffer has reached +# queue.buffering.max.messages. If set to 0 +# events will be enqueued immediately or dropped +# if the queue is full (the producer send call +# will never block). If set to -1 the producer +# will block indefinitely and never willingly +# drop a send. Default: -1 +# +# $jmx_port - Port on which to expose MirrorMaker +# JMX metrics. Default: 9998 +# +define kafka::mirror( + $destination_brokers, + $enabled = true, - $jmx_port = $kafka::defaults::jmx_port, + $whitelist = '.*', + $blacklist = undef, - $topic_whitelist = $kafka::defaults::topic_whitelist, - $topic_blacklist = $kafka::defaults::topic_blacklist, + $num_producers = 1, + $num_streams = 1, + $queue_size = 10000, + $heap_opts = undef, - $producer_type = $kafka::defaults::producer_type, - $producer_compression_codec = $kafka::defaults::producer_compression_codec, - $producer_serializer_class = $kafka::defaults::producer_serializer_class, - $producer_queue_buffering_max_ms = $kafka::defaults::producer_queue_buffering_max_ms, - $producer_queue_buffering_max_messages = $kafka::defaults::producer_queue_buffering_max_messages, - $producer_queue_enqueue_timeout_ms = $kafka::defaults::producer_queue_enqueue_timeout_ms, - $producer_batch_num_messages = $kafka::defaults::producer_batch_num_messages, + # Producer Settings + $request_required_acks = -1, + $producer_type = 'async', + $compression_codec = 'snappy', - # TODO: check on names here, can we make these consistent? - $num_streams = $kafka::defaults::num_streams, - $num_producers = $kafka::defaults::num_producers, - $queue_size = $kafka::defaults::queue_size, + # Async Producer Settings + $batch_num_messages = 200, + $queue_buffering_max_ms = 5000, + $queue_buffering_max_messages = 10000, + $queue_enqueue_timeout_ms = -1, - $producer_properties_template = $kafka::defaults::producer_properties_template, - $default_template = $kafka::defaults::mirror_default_template + $jmx_port = 9998, -) inherits kafka::defaults + $producer_properties_template = 'kafka/mirror/producer.properties.erb', + $systemd_service_template = 'kafka/mirror/kafka-mirror.systemd.erb', + $default_template = 'kafka/mirror/kafka-mirror.default.erb', + $log4j_properties_template = 'kafka/log4j.properties.erb', +) { - # Kafka class must be included before kafka::mirror. - # Using 'require' here rather than an explicit class dependency - # so that this class can be used without having to manually - # include the base kafka class. This is for elegance only. - # You'd only need to manually include the base kafka class if - # you need to explicitly set the version of the Kafka package - # you want installed. - require ::kafka - - package { 'kafka-mirror': - ensure => $::kafka::version + if (!$whitelist and !$blacklist) or ($whitelist and $blacklist) { + fail('Must set only one of $whitelist or $blacklist.') } - file { '/etc/default/kafka-mirror': + include kafka::mirror::init + + $mirror_name = $title + file { "/etc/default/kafka-mirror-${mirror_name}": content => template($default_template), require => Package['kafka-mirror'], } - # Make sure /etc/kafka/mirror is a directory. - # MirrorMaker will read consumer and producer - # properties files out of this directory. - file { '/etc/kafka/mirror': - ensure => 'directory', + file { "/etc/kafka/mirror/${mirror_name}": + ensure => 'directory', + recurse => true, + purge => true, + } + + # Log to custom log file for this MirrorMaker instance. + $kafka_log_file = "/var/log/kafka/kafka-mirror-${mirror_name}.log" + file { "/etc/kafka/mirror/${mirror_name}/log4j.properties": + content => template($log4j_properties_template), + } + + file { "/etc/kafka/mirror/${mirror_name}/producer.properties": + content => template($producer_properties_template), require => Package['kafka-mirror'], } - # MirrorMaker will produce to this cluster - # of Kafka Brokers. - $brokers = $destination_brokers - file { '/etc/kafka/mirror/producer.properties': - content => template($producer_properties_template), + # Realize all consumer properties files for this MirrorMaker instance. + # --consumer.configs will be passed a wildcard matching all of the + # files in /etc/kafka/mirror/$mirror_name/consumer*.properties + File <| tag == "kafka-mirror-${mirror_name}-consumer" |> + + # Render a systemd service unit file + file { "/lib/systemd/system/kafka-mirror-${mirror_name}.service": + content => template($systemd_service_template), require => Package['kafka-mirror'], + } + + # systemd needs a reload to pick up changes to this file. + exec { "systemd-reload-for-kafka-mirror-${mirror_name}": + command => '/bin/systemctl daemon-reload', + refreshonly => true, + subscribe => File["/lib/systemd/system/kafka-mirror-${mirror_name}.service"], } # Start the Kafka MirrorMaker daemon. # We don't want to subscribe to the config files here. # It will be better to manually restart Kafka MirrorMaker # when the config files changes. - $kafka_mirror_ensure = $enabled ? { + $service_ensure = $enabled ? { false => 'stopped', default => 'running', } - service { 'kafka-mirror': - ensure => $kafka_mirror_ensure, + service { "kafka-mirror-${mirror_name}": + ensure => $service_ensure, require => [ - File['/etc/kafka/mirror/producer.properties'], - File['/etc/default/kafka-mirror'], + File["/etc/kafka/mirror/${mirror_name}/producer.properties"], + Exec["systemd-reload-for-kafka-mirror-${mirror_name}"], ], hasrestart => true, hasstatus => true, diff --git a/manifests/mirror/consumer.pp b/manifests/mirror/consumer.pp index faba482..51fa206 100644 --- a/manifests/mirror/consumer.pp +++ b/manifests/mirror/consumer.pp @@ -1,24 +1,17 @@ # == Define kafka::mirror::consumer -# Kafka MirrorMaker takes multiple consumer.property files. -# Each one of these property files defines a Zookeeper URL -# (hosts/chroot) from which information about a Kafka Broker -# cluster may be read. This is a define so that nay number -# of source Kafka clusters may be configured. +# kafka::mirror::consumers are grouped together by $mirror_name. +# $mirror_name should match the title of the kafka::mirror instance +# you want this consumer to be associated with. # # == Usage: # kafka::mirror::consumer { 'clusterA': -# $zookeeper_hosts => ['zk1.example.com', 'zk2.example.com', 'zk3.example.com'], -# $zookeeper_chroot => '/kafka/clusterA', +# mirror_name => 'aggregate', +# zookeeper_url => 'zk:2181/kafka/clusterA', # } # # == Parameters: -# $zookeeper_hosts - Array of zookeeper hostname/IP(:port)s. -# -# $zookeeper_chroot - Znode Path in zookeeper in which to keep Kafka data. -# Default: undef (the root znode). Note that if you set -# this paramater, the Znode will not be created for you. -# You must do so manually yourself. See the README -# for instructions on how to do so. +# $zookeeper_url - URL to Kafka in zookeeper, including +# chroot. # # $zookeeper_connection_timeout_ms - Timeout in ms for connecting to zookeeper. # Default: 6000 @@ -26,17 +19,24 @@ # $zookeeper_session_timeout_ms - Timeout in ms for session to zookeeper. # Default: 6000 # -# $consumer_group_id - Consumer ID. This will be used to save the -# consumed high water mark for this consumer -# in Zookeeper. +# $auto_commit_enable - If true, periodically commit to zookeeper +# the offset of messages already fetched by +# the consumer. Default: true +# $auto_commit_interval_ms - The frequency in ms that the consumer +# offsets are committed to zookeeper. +# Default: 6000 +# $auto_offset_reset - smallest, largest, or throw exception. +# Default: largest # define kafka::mirror::consumer( - $zookeeper_hosts, - $zookeeper_chroot = undef, + $mirror_name, + $zookeeper_url, $zookeeper_connection_timeout_ms = 6000, $zookeeper_session_timeout_ms = 6000, - $consumer_group_id = "mirror${zookeeper_chroot}", - $consumer_properties_template = 'kafka/consumer.properties.erb' + $auto_commit_enable = true, + $auto_commit_interval_ms = 6000, + $auto_offset_reset = 'largest', + $consumer_properties_template = 'kafka/mirror/consumer.properties.erb' ) { # Kafka class must be included before kafka::mirror::consumer. @@ -48,8 +48,9 @@ # you want installed. require ::kafka - file { "/etc/kafka/mirror/consumer.${name}.properties": + @file { "/etc/kafka/mirror/${$mirror_name}/consumer.${title}.properties": content => template($consumer_properties_template), - before => Service['kafka-mirror'], + tag => ["kafka-mirror-${mirror_name}-consumer"], + before => Service["kafka-mirror-${mirror_name}"] } -} \ No newline at end of file +} diff --git a/manifests/mirror/init.pp b/manifests/mirror/init.pp new file mode 100644 index 0000000..d0ecf2c --- /dev/null +++ b/manifests/mirror/init.pp @@ -0,0 +1,38 @@ +# == Class kafka::mirror::init +# This class only exists because kafka::mirror is a define. These +# resources are common to every kafka mirror define and we don't +# want to declare them more than once. +# +class kafka::mirror::init { + # Kafka class must be included before kafka::mirror. + # Using 'require' here rather than an explicit class dependency + # so that this class can be used without having to manually + # include the base kafka class. This is for elegance only. + # You'd only need to manually include the base kafka class if + # you need to explicitly set the version of the Kafka package + # you want installed. + require ::kafka + + package { 'kafka-mirror': + ensure => $::kafka::version + } + + # Remove the kafka-mirror .deb provided kafka-mirror files. + # This define will install instance specific ones. + file { [ + '/etc/init.d/kafka-mirror', + '/lib/systemd/system/kafka-mirror.service', + '/etc/default/kafka-mirror', + ]: + ensure => 'absent' + } + + # Have puppet manage totally manage this directory. + # Anything it doesn't know about will be removed. + file { '/etc/kafka/mirror': + ensure => 'directory', + recurse => true, + purge => true, + force => true, + } +} diff --git a/manifests/mirror/jmxtrans.pp b/manifests/mirror/jmxtrans.pp new file mode 100644 index 0000000..062cff4 --- /dev/null +++ b/manifests/mirror/jmxtrans.pp @@ -0,0 +1,259 @@ +# == Class kafka::mirror::jmxtrans +# Sets up a jmxtrans instance for a Kafka MirrorMaker instance +# running on the current host. +# Note: This requires the jmxtrans puppet module found at +# https://github.com/wikimedia/puppet-jmxtrans. +# +# == Parameters +# $title - Should be the same as the kafka::mirror instance you want +# to monitor. +# $jmx_port - Kafka MirrorMaker JMX port +# $ganglia - Ganglia host:port +# $graphite - Graphite host:port +# $statsd - statsd host:port +# $outfile - outfile to which Kafka stats will be written. +# $objects - objects parameter to pass to jmxtrans::metrics. Only use +# this if you need to override the default ones that this +# class provides. +# group_prefix - If set, this will be prefixed to all metrics. Default: undef +# $run_interval - How often jmxtrans should run. Default: 15 +# $log_level - level at which jmxtrans should log. Default: info +# +# == Usage +# kafka::mirror::jmxtrans { 'aggregate': +# statsd => 'statsd.example.org:8125' +# } +# +define kafka::mirror::jmxtrans( + $jmx_port = 9998, + $ganglia = undef, + $graphite = undef, + $statsd = undef, + $outfile = undef, + $group_prefix = undef, + $objects = undef, + $run_interval = 15, + $log_level = 'info', +) +{ + # NOTE: $title should match title of kafka::mirror + # instance (AKA $mirror_name). + $client_id = "kafka-mirror-${title}" + $jmx = "${::fqdn}:${jmx_port}" + + # query for metrics from Kafka's JVM + jmxtrans::metrics::jvm { $jmx: + ganglia => $ganglia, + graphite => $graphite, + statsd => $statsd, + outfile => $outfile, + group_prefix => $group_prefix, + } + + # DRY up some often used JMX attributes. + $kafka_rate_jmx_attrs = { + 'Count' => { 'slope' => 'positive', 'bucketType' => 'g' }, + 'FifteenMinuteRate' => { 'slope' => 'both', 'bucketType' => 'g' }, + 'FiveMinuteRate' => { 'slope' => 'both', 'bucketType' => 'g' }, + 'OneMinuteRate' => { 'slope' => 'both', 'bucketType' => 'g' }, + 'MeanRate' => { 'slope' => 'both', 'bucketType' => 'g' }, + } + $kafka_timing_jmx_attrs = { + '50thPercentile' => { 'slope' => 'both', 'bucketType' => 'g' }, + '75ththPercentile' => { 'slope' => 'both', 'bucketType' => 'g' }, + '95thPercentile' => { 'slope' => 'both', 'bucketType' => 'g' }, + '98thPercentile' => { 'slope' => 'both', 'bucketType' => 'g' }, + '99thPercentile' => { 'slope' => 'both', 'bucketType' => 'g' }, + '999thPercentile' => { 'slope' => 'both', 'bucketType' => 'g' }, + 'Count' => { 'slope' => 'positive', 'bucketType' => 'g' }, + 'Max' => { 'slope' => 'both', 'bucketType' => 'g' }, + 'Mean' => { 'slope' => 'both', 'bucketType' => 'g' }, + 'Min' => { 'slope' => 'both', 'bucketType' => 'g' }, + 'StdDev' => { 'slope' => 'both', 'bucketType' => 'g' }, + } + $kafka_value_jmx_attrs = { + 'Value' => { 'slope' => 'both', 'bucketType' => 'g' }, + } + + $kafka_objects = $objects ? { + # if $objects was not set, then use this as the + # default set of Kafka JMX MBean objects to query. + undef => [ + # + # DataChannel Metrics + # + { + 'name' => 'kafka.tools:type=DataChannel,name=MirrorMaker-DataChannel-Size', + 'resultAlias' => 'kafka.tools.MirrorMaker.DataChannel.Size', + 'attrs' => $kafka_timing_jmx_attrs, + }, + { + 'name' => 'kafka.tools:type=DataChannel,name=MirrorMaker-DataChannel-WaitOnPut', + 'resultAlias' => 'kafka.tools.MirrorMaker.DataChannel.WaitOnPut', + 'attrs' => $kafka_rate_jmx_attrs, + }, + { + 'name' => 'kafka.tools:type=DataChannel,name=MirrorMaker-DataChannel-WaitOnTake', + 'resultAlias' => 'kafka.tools.MirrorMaker.DataChannel.WaitOnTake', + 'attrs' => $kafka_rate_jmx_attrs, + }, + + # + # Consumer Metrics + # + + # ConsumerFetcheManager (MaxLag, MinFetchRate) + { + 'name' => "kafka.consumer:type=ConsumerFetcherManager,name=*,clientId=${client_id}", + 'resultAlias' => "kafka.consumer.ConsumerFetcherManager", + 'typeNames' => ['name'], + 'attrs' => $kafka_value_jmx_attrs, + }, + + # All Topic Consumer Metrics + { + 'name' => "kafka.consumer:type=ConsumerTopicMetrics,name=*,clientId=${client_id}", + 'resultAlias' => 'kafka.consumer.ConsumerTopicMetrics-AllTopics', + 'typeNames' => ['name'], + 'attrs' => $kafka_rate_jmx_attrs, + }, + # Per Topic Consumer Metrics + { + 'name' => "kafka.consumer:type=ConsumerTopicMetrics,name=BytesPerSec,clientId=${client_id},topic=*", + 'resultAlias' => 'kafka.consumer.ConsumerTopicMetrics.BytesPerSec', + 'typeNames' => ['topic'], + 'attrs' => $kafka_rate_jmx_attrs, + }, + { + 'name' => "kafka.consumer:type=ConsumerTopicMetrics,name=MessagesPerSec,clientId=${client_id},topic=*", + 'resultAlias' => 'kafka.consumer.ConsumerTopicMetrics.MessagesPerSec', + 'typeNames' => ['topic'], + 'attrs' => $kafka_rate_jmx_attrs, + }, + + # Overall Consumer Fetch Request and Response Metrics + { + 'name' => "kafka.consumer:type=FetchRequestAndResponseMetrics,name=*,clientId=${client_id}", + 'resultAlias' => 'kafka.consumer.FetchRequestAndResponseMetrics', + 'typeNames' => ['name'], + 'attrs' => $kafka_timing_jmx_attrs, + }, + # Per Broker Fetch Request and Response Metrics + { + 'name' => "kafka.consumer:type=FetchRequestAndResponseMetrics,name=FetchRequestRateAndTimeMs,clientId=${client_id},brokerHost=*,brokerPort=*", + 'resultAlias' => 'kafka.consumer.FetchRequestAndResponseMetrics.FetchRequestRateAndTimeMs', + 'typeNames' => ['brokerHost', 'brokerPort'], + 'attrs' => $kafka_timing_jmx_attrs, + }, + { + 'name' => "kafka.consumer:type=FetchRequestAndResponseMetrics,name=FetchResponseSize,clientId=${client_id},brokerHost=*,brokerPort=*", + 'resultAlias' => 'kafka.consumer.FetchRequestAndResponseMetrics.FetchResponseSize', + 'attrs' => $kafka_timing_jmx_attrs, + 'attrs' => $kafka_timing_jmx_attrs, + }, + + # Consumer Lag + { + 'name' => "kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=${client_id},topic=*,partition=*", + 'resultAlias' => 'kafka.server.FetcherLagMetrics.ConsumerLag', + 'typeNames' => ['topic', 'partition'], + 'attrs' => $kafka_value_jmx_attrs, + }, + # Fetcher Stats + { + 'name' => "kafka.server:type=FetcherStats,name=BytesPerSec,clientId=${client_id},brokerHost=*,brokerPort=*", + 'resultAlias' => 'kafka.server.FetcherStats.BytesPerSec', + 'typeNames' => ['brokerHost', 'brokerPort'], + 'attrs' => $kafka_rate_jmx_attrs, + }, + { + 'name' => "kafka.server:type=FetcherStats,name=RequestsPerSec,clientId=${client_id},brokerHost=*,brokerPort=*", + 'resultAlias' => 'kafka.server.FetcherStats.RequestsPerSec', + 'typeNames' => ['brokerHost', 'brokerPort'], + 'attrs' => $kafka_rate_jmx_attrs, + }, + + # Commit Metrics + { + 'name' => "kafka.consumer:type=ZookeeperConsumerConnector,name=KafkaCommitsPerSec,clientId=${client_id}", + 'resultAlias' => 'kafka.consumer.ZookeeperConsumerConnector.KafkaCommitsPerSec', + 'attrs' => $kafka_rate_jmx_attrs, + }, + { + 'name' => "kafka.consumer:type=ZookeeperConsumerConnector,name=ZooKeeperCommitsPerSec,clientId=${client_id}", + 'resultAlias' => 'kafka.consumer.ZookeeperConsumerConnector.ZooKeeperCommitsPerSec', + 'attrs' => $kafka_rate_jmx_attrs, + }, + + + # + # Producer Topic Metrics + # + + # Producer client.ids are suffixed with the producer number. + # This means that we can't clean up the resultAlias well for + # producer metrics. + + # All Topic Producer Metrics + { + 'name' => 'kafka.producer:type=ProducerTopicMetrics,name=BytesPerSec,clientId=*', + 'resultAlias' => 'kafka.producer.ProducerTopicMetrics-AllTopics.BytesPerSec', + 'typeNames' => ['clientId'], + 'attrs' => $kafka_rate_jmx_attrs, + }, + { + 'name' => 'kafka.producer:type=ProducerTopicMetrics,name=DroppedMessagesPerSec,clientId=*', + 'resultAlias' => 'kafka.producer.ProducerTopicMetrics-AllTopics.DroppedMessagesPerSec', + 'typeNames' => ['clientId'], + 'attrs' => $kafka_rate_jmx_attrs, + }, + { + 'name' => 'kafka.producer:type=ProducerTopicMetrics,name=MessagesPerSec,clientId=*', + 'resultAlias' => 'kafka.producer.ProducerTopicMetrics-AllTopics.MessagesPerSec', + 'typeNames' => ['clientId'], + 'attrs' => $kafka_rate_jmx_attrs, + }, + # Per Topic Producer Metrics + { + 'name' => 'kafka.producer:type=ProducerTopicMetrics,name=BytesPerSec,clientId=*', + 'resultAlias' => 'kafka.producer.ProducerTopicMetrics.BytesPerSec', + 'typeNames' => ['clientId', 'topic'], + 'attrs' => $kafka_rate_jmx_attrs, + }, + { + 'name' => 'kafka.producer:type=ProducerTopicMetrics,name=DroppedMessagesPerSec,clientId=*,topic=*', + 'resultAlias' => 'kafka.producer.ProducerTopicMetrics.DroppedMessagesPerSec', + 'typeNames' => ['clientId', 'topic'], + 'attrs' => $kafka_rate_jmx_attrs, + }, + { + 'name' => 'kafka.producer:type=ProducerTopicMetrics,name=MessagesPerSec,clientId=*,topic=*', + 'resultAlias' => 'kafka.producer.ProducerTopicMetrics.MessagesPerSec', + 'typeNames' => ['clientId', 'topic'], + 'attrs' => $kafka_rate_jmx_attrs, + }, + + # Async Producer Metrics + { + 'name' => 'kafka.producer.async:type=ProducerSendThread,name=ProducerQueueSize,clientId=*', + 'resultAlias' => 'kafka.producer.ProducerSendThread.ProducerQueueSize', + 'typeNames' => ['clientId'], + 'attrs' => $kafka_value_jmx_attrs, + }, + ], + default => $objects, + } + + # query kafka for jmx metrics + jmxtrans::metrics { "kafka-mirror-${client_id}-${jmx_port}": + jmx => $jmx, + outfile => $outfile, + ganglia => $ganglia, + ganglia_group_name => "${group_prefix}kafka-mirror", + graphite => $graphite, + graphite_root_prefix => "${group_prefix}kafka-mirror", + statsd => $statsd, + statsd_root_prefix => "${group_prefix}kafka-mirror", + objects => $kafka_objects, + } +} diff --git a/manifests/mirror/monitoring.pp b/manifests/mirror/monitoring.pp new file mode 100644 index 0000000..3acdfba --- /dev/null +++ b/manifests/mirror/monitoring.pp @@ -0,0 +1,38 @@ +# == Define kafka::mirror::monitoring +# Wikimedia Foundation specific monitoring class. +# Only include this if you are using this kafka puppet module +# with the Wikimedia puppet repository from +# github.com/wikimedia/operations-puppet. +# +# == Parameters +# $title - Should be the same as the kafka::mirror instance +# you want. +# to monitor. +# $jmx_port - Default: 9998 +# $nagios_servicegroup - Nagios Service group to use for alerts. Default: undef +# $group_prefix - $group_prefix passed to kafka::mirror::jmxtrans. +# This will be used for graphite based alerts. +# Default: undef +# +define kafka::mirror::monitoring( + $jmx_port = 9998, + $nagios_servicegroup = undef, + $group_prefix = undef, +) { + # Generate icinga alert if Kafka Server is not running. + nrpe::monitor_service { "kafka-mirror-${title}": + description => "Kafka MirrorMaker ${title}", + nrpe_command => "/usr/lib/nagios/plugins/check_procs -c 1:1 -C java --ereg-argument-array 'kafka.tools.MirrorMaker.+/etc/kafka/mirror/${title}/producer\.properties'", + require => Class['::kafka::server'], + critical => true, + } + + if !defined(Nrpe::Monitor_service['jmxtrans']) { + # Generate icinga alert if this jmxtrans instance is not running. + nrpe::monitor_service { 'jmxtrans': + description => 'jmxtrans', + nrpe_command => '/usr/lib/nagios/plugins/check_procs -c 1:1 -C java --ereg-argument-array "-jar.+jmxtrans-all.jar"', + require => Service['jmxtrans'], + } + } +} diff --git a/manifests/server/jmxtrans.pp b/manifests/server/jmxtrans.pp index 5d247c0..7da01c4 100644 --- a/manifests/server/jmxtrans.pp +++ b/manifests/server/jmxtrans.pp @@ -13,6 +13,7 @@ # $objects - objects parameter to pass to jmxtrans::metrics. Only use # this if you need to override the default ones that this # class provides. +# group_prefix - If set, this will be prefixed to all metrics. Default: undef # $run_interval - How often jmxtrans should run. Default: 15 # $log_level - level at which jmxtrans should log. Default: info # $legacy_jmx - JMX Bean names changed in 0.8.2. If this is true diff --git a/manifests/server/monitoring.pp b/manifests/server/monitoring.pp index 959a428..6210ef6 100644 --- a/manifests/server/monitoring.pp +++ b/manifests/server/monitoring.pp @@ -12,9 +12,9 @@ # Default: undef # class kafka::server::monitoring( - $jmx_port = 9999, + $jmx_port = 9999, $nagios_servicegroup = undef, - $group_prefix = undef, + $group_prefix = undef, ) { # Generate icinga alert if Kafka Server is not running. nrpe::monitor_service { 'kafka': @@ -24,15 +24,17 @@ critical => true, } - # Generate icinga alert if this jmxtrans instance is not running. - nrpe::monitor_service { 'jmxtrans': - description => 'jmxtrans', - nrpe_command => '/usr/lib/nagios/plugins/check_procs -c 1:1 -C java --ereg-argument-array "-jar.+jmxtrans-all.jar"', - require => Class['::kafka::server::jmxtrans'], + if !defined(Nrpe::Monitor_service['jmxtrans']) { + # Generate icinga alert if this jmxtrans instance is not running. + nrpe::monitor_service { 'jmxtrans': + description => 'jmxtrans', + nrpe_command => '/usr/lib/nagios/plugins/check_procs -c 1:1 -C java --ereg-argument-array "-jar.+jmxtrans-all.jar"', + require => Service['jmxtrans'], + } } - # jmxtrans statsd writer emits Kafka Broker fqdns in keys - # by substiting '.' with '_' and suffixing the Broker port. + # jmxtrans statsd writer emits fqdns in keys + # by substiting '.' with '_' and suffixing the jmx port. $graphite_broker_key = regsubst("${::fqdn}_${jmx_port}", '\.', '_', 'G') # Alert if any Kafka has under replicated partitions. diff --git a/templates/consumer.properties.erb b/templates/consumer.properties.erb deleted file mode 100644 index 346aa4c..0000000 --- a/templates/consumer.properties.erb +++ /dev/null @@ -1,23 +0,0 @@ -# Note: This file is managed by Puppet. - -# -# see kafka.consumer.ConsumerConfig for more details -# - -# Zookeeper connection string -# comma separated host:port pairs, each corresponding to a zk -# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" -zookeeper.connect=<%= @zookeeper_hosts.sort.join(',') %><%= @zookeeper_chroot if @zookeeper_chroot %> - -# The max time that the client waits while establishing a connection to Zookeeper. -zookeeper.connection.timeout.ms=<%= @zookeeper_connection_timeout_ms %> - -# ZooKeeper session timeout. If the consumer fails to heartbeat to Zookeeper -# for this period of time it is considered dead and a rebalance will occur. -zookeeper.session.timeout.ms=<%= @zookeeper_session_timeout_ms %> - -#consumer group id -group.id=<%= @consumer_group_id %> - -#consumer timeout -#consumer.timeout.ms=5000 diff --git a/templates/kafka-mirror.default.erb b/templates/kafka-mirror.default.erb deleted file mode 100644 index cb47a43..0000000 --- a/templates/kafka-mirror.default.erb +++ /dev/null @@ -1,48 +0,0 @@ -# Note: This file is managed by Puppet. - -# whether to allow init.d script to start a Kafka MirrorMaker daemon ("yes", "no") -KAFKA_MIRROR_START=yes - -# User and group to run as -KAFKA_USER=kafka -KAFKA_GROUP=kafka -KAFKA_CONFIG=/etc/kafka - -# The default JMX_PORT for Kafka MirrorMaker is 9993. -# Set JMX_PORT to something else to override this. -JMX_PORT=<%= @jmx_port %> -<% if @jmx_opts -%> -KAFKA_JMX_OPTS=${KAFKA_JMX_OPTS:="<%= @jmx_opts %>"} -<% else -%> -#KAFKA_JMX_OPTS=${KAFKA_JMX_OPTS:="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"} -<% end -%> - -# Memory sizes, and logging configuration -# Memory sizes, and logging configuration -<% if @heap_opts -%> -KAFKA_HEAP_OPTS=<%= @heap_opts %> -<% else -%> -#KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" -<% end -%> -#KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC" -#KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${KAFKA_CONFIG}/log4j.properties" -#KAFKA_OPTS="" - -# -# MirrorMaker options: -# - -# Set this to a space separated list of consumer.properties files. -# By default, /etc/init.d/kafka-mirror will use any files that match -# /etc/kafka/mirror/consumer.* -#KAFKA_MIRROR_CONSUMER_CONFIGS= - -#KAFKA_MIRROR_PRODUCER_CONFIG=/etc/kafka/mirror/producer.properties - -# Only set one of the following. -#KAFKA_MIRROR_WHITELIST='.*' -#KAFKA_MIRROR_BLACKLIST='' - -#KAFKA_MIRROR_NUM_STREAMS=1 -#KAFKA_MIRROR_NUM_PRODUCERS=1 -#KAFKA_MIRROR_QUEUE_SIZE=10000 diff --git a/templates/log4j.properties.erb b/templates/log4j.properties.erb index 40d7cfd..53460e6 100644 --- a/templates/log4j.properties.erb +++ b/templates/log4j.properties.erb @@ -10,7 +10,7 @@ log4j.appender.fileAppender=org.apache.log4j.RollingFileAppender log4j.appender.fileAppender.File=<%= @kafka_log_file %> log4j.appender.fileAppender.MaxFileSize=256MB -log4j.appender.fileAppender.MaxBackupIndex=<%= @log_max_backup_index %> +log4j.appender.fileAppender.MaxBackupIndex=<%= @log_max_backup_index ? @log_max_backup_index : '4' %> log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout log4j.appender.fileAppender.layout.ConversionPattern=[%d] %-4r [%t] %-5p %c %x - %m%n log4j.appender.fileAppender.Threshold=INFO diff --git a/templates/mirror/consumer.properties.erb b/templates/mirror/consumer.properties.erb new file mode 100644 index 0000000..b81ff63 --- /dev/null +++ b/templates/mirror/consumer.properties.erb @@ -0,0 +1,36 @@ +# Note: This file is managed by Puppet. + +# +# see kafka.consumer.ConsumerConfig for more details +# + +# Kafka Consumer group id +group.id=kafka-mirror-<%= @mirror_name %> + +# Zookeeper connection string +# comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" +zookeeper.connect=<%= @zookeeper_url %> + +# The max time that the client waits while establishing a connection +# to Zookeeper. +zookeeper.connection.timeout.ms=<%= @zookeeper_connection_timeout_ms %> + +# ZooKeeper session timeout. If the consumer fails to heartbeat to Zookeeper +# for this period of time it is considered dead and a rebalance will occur. +zookeeper.session.timeout.ms=<%= @zookeeper_session_timeout_ms %> + +# If true, periodically commit to zookeeper the offset of messages already +# fetched by the consumer. This committed offset will be used when the process +# fails as the position from which the new consumer will begin. +auto.commit.enable=<%= @auto_commit_enable %> + +# The frequency in ms that the consumer offsets are committed to zookeeper. +auto.commit.interval.ms=<%= @auto_commit_interval_ms %> + +# What to do when there is no initial offset in Zookeeper or if an offset +# is out of range: +# * smallest: automatically reset the offset to the smallest offset +# * largest: automatically reset the offset to the largest offset +# * anything else: throw exception to the consumer. +auto.offset.reset=<%= auto_offset_reset %> diff --git a/templates/mirror/kafka-mirror.default.erb b/templates/mirror/kafka-mirror.default.erb new file mode 100644 index 0000000..12dace6 --- /dev/null +++ b/templates/mirror/kafka-mirror.default.erb @@ -0,0 +1,28 @@ +# Note: This file is managed by Puppet. + +# whether to allow init.d script to start a Kafka MirrorMaker daemon ("yes", "no") +KAFKA_MIRROR_START=yes + +# User and group to run as +KAFKA_USER=kafka +KAFKA_GROUP=kafka + +# The default JMX_PORT for Kafka MirrorMaker is 9998. +# Set JMX_PORT to something else to override this. +JMX_PORT=<%= @jmx_port %> +<% if @jmx_opts -%> +KAFKA_JMX_OPTS=${KAFKA_JMX_OPTS:="<%= @jmx_opts %>"} +<% else -%> +#KAFKA_JMX_OPTS=${KAFKA_JMX_OPTS:="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"} +<% end -%> + +# Memory sizes, and logging configuration +# Memory sizes, and logging configuration +<% if @heap_opts -%> +KAFKA_HEAP_OPTS=<%= @heap_opts %> +<% else -%> +#KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" +<% end -%> +#KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC" +KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:/etc/kafka/mirror/<%= @mirror_name %>/log4j.properties" +#KAFKA_OPTS="" diff --git a/templates/mirror/kafka-mirror.systemd.erb b/templates/mirror/kafka-mirror.systemd.erb new file mode 100644 index 0000000..e37a605 --- /dev/null +++ b/templates/mirror/kafka-mirror.systemd.erb @@ -0,0 +1,28 @@ +[Unit] +Description=Kafka MirrorMaker + +[Service] +User=kafka +Group=kafka + +# Set java.awt.headless=true if JAVA_OPTS is not set so the +# Xalan XSL transformer can work without X11 display on JDK 1.4+ +Environment="JAVA_OPTS=-Djava.awt.headless=true" + +# Load any environment overrides from this file. +EnvironmentFile=-/etc/default/kafka-mirror-<%= @mirror_name %> + +ExecStart=/usr/bin/kafka mirror-maker \ +<% if @whitelist -%> +--whitelist '<%= @whitelist %>' \ +<% else -%> +--blacklist '<%= @blacklist %>' \ +<% end -%> +--num.producers <%= @num_producers %> \ +--num.streams <%= @num_streams %> \ +--queue.size <%= @queue_size %> \ +--producer.config /etc/kafka/mirror/<%= @mirror_name %>/producer.properties \ +--consumer.configs /etc/kafka/mirror/<%= @mirror_name %>/consumer*.properties + +[Install] +WantedBy=multi-user.target diff --git a/templates/mirror/producer.properties.erb b/templates/mirror/producer.properties.erb new file mode 100644 index 0000000..ace250a --- /dev/null +++ b/templates/mirror/producer.properties.erb @@ -0,0 +1,50 @@ +# Note: This file is managed by Puppet. + +# +# see kafka.producer.ProducerConfig for more details +# + +############################# Producer Basics ############################# + + +# list of brokers used for bootstrapping +# format: host1:port1,host2:port2 ... +metadata.broker.list=<%= Array(@destination_brokers).join(',') %> + +# Logically identifies the application making the request. +client.id=kafka-mirror-<%= @mirror_name %> + +# specifies whether the messages are sent asynchronously (async) or synchronously (sync) +producer.type=<%= @producer_type %> + +# Required number of acks +request.required.acks=<%= @request_required_acks %> + +# specify the compression codec for all data generated: none , gzip, snappy. +# the old config values work as well: 0, 1, 2 for none, gzip, snappy, respectivally +compression.codec=<%= compression_codec %> + +<% if @producer_type == 'async' -%> +############################# Async Producer ############################# + +# Number of messages batched at the producer. Messages are grouped together +# and produced to partitions in batches of this number. +batch.num.messages=<%= @batch_num_messages %> + +# Maximum time, in milliseconds, for buffering data on the producer queue +queue.buffering.max.ms=<%= @queue_buffering_max_ms %> + +# Maximum size of the blocking queue for buffering on the producer +# This is the size of the producer's queue. If this fills up +# and queue.enqueue.timeout.ms is positive, messages in this queue +# will be dropped. If queue.enqueue.timeout.ms is negative, +# the producer will block until the queue is reduced. +queue.buffering.max.messages=<%= @queue_buffering_max_messages %> + +# Timeout for event enqueue: +# 0: events will be enqueued immediately or dropped if the queue is full +# -ve: enqueue will block indefinitely if the queue is full +# +ve: enqueue will block up to this many milliseconds if the queue is full +queue.enqueue.timeout.ms=<%= @queue_enqueue_timeout_ms %> + +<% end -%> diff --git a/templates/producer.properties.erb b/templates/producer.properties.erb deleted file mode 100644 index b4e5618..0000000 --- a/templates/producer.properties.erb +++ /dev/null @@ -1,64 +0,0 @@ -# Note: This file is managed by Puppet. -<% -# @hosts is a hash of the form -# { -# 'hostA' => { 'id' => 1, 'port' => 12345 }, -# 'hostB' => { 'id' => 2 } -# } -# 'port' is not required to be specified in the hash, -# so we fetch it with a default of 9092 -# -# Sort hosts and loop through the resulting array -# to get a list of ['hostname1:port1', 'hostname2:port2', ...] -# for the broker.list setting. -broker_list=[] [email protected] do |broker_config| - hostname = broker_config[0] - port = broker_config[1].fetch('port', 9092).to_s - broker_list += ["#{hostname}:#{port}"] -end --%> - -# -# see kafka.producer.ProducerConfig for more details -# - -############################# Producer Basics ############################# - - -# list of brokers used for bootstrapping -# format: host1:port1,host2:port2 ... -metadata.broker.list=<%= broker_list.join(',') %> - -# specifies whether the messages are sent asynchronously (async) or synchronously (sync) -producer.type=<%= @producer_type %> - -# name of the partitioner class for partitioning events; default partition spreads data randomly -#partitioner.class= - -# specify the compression codec for all data generated: none , gzip, snappy. -# the old config values work as well: 0, 1, 2 for none, gzip, snappy, respectivally -compression.codec=none - -# message encoder -serializer.class=kafka.serializer.StringEncoder - -# allow topic level compression -#compressed.topics= - -############################# Async Producer ############################# - -# maximum time, in milliseconds, for buffering data on the producer queue -#queue.buffering.max.ms= - -# the maximum size of the blocking queue for buffering on the producer -#queue.buffering.max.messages= - -# Timeout for event enqueue: -# 0: events will be enqueued immediately or dropped if the queue is full -# -ve: enqueue will block indefinitely if the queue is full -# +ve: enqueue will block up to this many milliseconds if the queue is full -#queue.enqueue.timeout.ms= - -# the number of messages batched at the producer -batch.num.messages=<%= @producer_batch_num_messages %> -- To view, visit https://gerrit.wikimedia.org/r/265789 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: merged Gerrit-Change-Id: I836e20a8f009ed89452630f95e570f02c522c837 Gerrit-PatchSet: 5 Gerrit-Project: operations/puppet/kafka Gerrit-Branch: master Gerrit-Owner: Ottomata <[email protected]> Gerrit-Reviewer: Ottomata <[email protected]> Gerrit-Reviewer: jenkins-bot <> _______________________________________________ MediaWiki-commits mailing list [email protected] https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits
