Ottomata has uploaded a new change for review. https://gerrit.wikimedia.org/r/265789
Change subject: Refactor MirrorMaker puppetization ...................................................................... Refactor MirrorMaker puppetization This change makes MirrorMaker work with the latest 0.8.2.1-4 jessie-wikimedia .deb release at http://apt.wikimedia.org/wikimedia/pool/main/k/kafka/ Bug: T124077 Change-Id: I836e20a8f009ed89452630f95e570f02c522c837 --- M README.md M manifests/defaults.pp M manifests/mirror.pp M manifests/mirror/consumer.pp D templates/consumer.properties.erb M templates/log4j.properties.erb A templates/mirror/consumer.properties.erb R templates/mirror/kafka-mirror.default.erb A templates/mirror/producer.properties.erb D templates/producer.properties.erb 10 files changed, 293 insertions(+), 233 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/operations/puppet/kafka refs/changes/89/265789/1 diff --git a/README.md b/README.md index 31e28e0..fcf32ed 100644 --- a/README.md +++ b/README.md @@ -72,52 +72,12 @@ `zookeeper_chroot` is optional, and allows you to specify a Znode under which Kafka will store its metadata in Zookeeper. This is useful if you want to use a single Zookeeper cluster to manage multiple Kafka clusters. -See below for information on how to create this Znode in Zookeeper. - - -## Custom Zookeeper Chroot - -If Kafka will share a Zookeeper cluster with other users, you might want to -create a Znode in zookeeper in which to store your Kafka cluster's data. -You can set the `zookeeper_chroot` parameter on the `kafka` class to do this. - -First, you'll need to create the znode manually yourself. You can use -`zkCli.sh` that ships with Zookeeper, or you can use the kafka built in -`zookeeper-shell`: - - -``` -$ kafka zookeeper-shell <zookeeper_host>:2182 -Connecting to kraken-zookeeper -Welcome to ZooKeeper! -JLine support is enabled - -WATCHER:: - -WatchedEvent state:SyncConnected type:None path:null -[zk: kraken-zookeeper(CONNECTED) 0] create /my_kafka kafka -Created /my_kafka -``` - -You can use whatever chroot znode path you like. The second argument -(`data`) is arbitrary. I used 'kafka' here. - -Then: -```puppet -class { 'kafka::server': - brokers => { - 'kafka-node01.example.com' => { 'id' => 1, 'port' => 12345 }, - 'kafka-node02.example.com' => { 'id' => 2 }, - }, - zookeeper_hosts => ['zk-node01:2181', 'zk-node02:2181', 'zk-node03:2181'], - # set zookeeper_chroot on the kafka class. - zookeeper_chroot => '/kafka/clusterA', -} -``` ## Kafka Mirror +TODO: UPDATE THIS XXX + Kafka MirrorMaker is usually used for inter data center Kafka cluster replication and aggregation. You can consume from any number of source Kafka clusters, and produce to a single destination Kafka cluster. diff --git a/manifests/defaults.pp b/manifests/defaults.pp index 54f168a..feb7ae9 100644 --- a/manifests/defaults.pp +++ b/manifests/defaults.pp @@ -23,9 +23,6 @@ $kafka_log_file = '/var/log/kafka/kafka.log' $log_max_backup_index = 4 - $producer_type = 'async' - $producer_batch_num_messages = 200 - $consumer_group_id = 'test-consumer-group' # Broker Server settings $java_home = undef @@ -69,24 +66,12 @@ # Kafka package version. $version = 'installed' - # MirrorMaker settings - $topic_whitelist = '.*' - $topic_blacklist = undef - $num_streams = 1 - $num_producers = 1 - $queue_size = 10000 - # Default puppet paths to template config files. # This allows us to use custom template config files # if we want to override more settings than this # module yet supports. - $producer_properties_template = 'kafka/producer.properties.erb' - $consumer_properties_template = 'kafka/consumer.properties.erb' $log4j_properties_template = 'kafka/log4j.properties.erb' $server_properties_template = 'kafka/server.properties.erb' $server_systemd_override_template = 'kafka/kafka.systemd.override.conf.erb' $server_default_template = 'kafka/kafka.default.erb' - $mirror_default_template = 'kafka/kafka-mirror.default.erb' - - } diff --git a/manifests/mirror.pp b/manifests/mirror.pp index 7c3e5af..a23c1c6 100644 --- a/manifests/mirror.pp +++ b/manifests/mirror.pp @@ -1,44 +1,131 @@ # == Class kafka::mirror -# Sets up a Kafka MirrorMaker and ensures that it is running. +# Sets up a Kafka MirrorMaker instance and ensures that it is running. +# You must declare your kafka::mirror::consumers before this class. # -# == Parameters: +# NOTE: This does not work without systemd. Make sure you are including +# this define on a node that supports systemd. +# +# == Usage +# +# # Mirror the 'main' and 'secondary' Kafka clusters +# # to the 'aggregate' Kafka cluster. +# kafka::mirror::consumer { 'main': +# mirror_name => 'aggregate', +# zookeeper_url => 'zk:2181/kafka/main', +# } +# kafka::mirror::consumer { 'secondary': +# mirror_name => 'aggregate', +# zookeeper_url => 'zk:2181/kafka/secondary', +# } +# +# kafka::mirror { 'aggregate': +# destination_brokers => { +# 'aggregateA' => { 'id' => 10 }, +# 'aggregateB' => { 'id' => 11 }, +# }, +# ..., +# } +# +# == Parameters +# +# $destination_brokers - Hash of Kafka Broker to which you want to +# produce configs keyed by fqdn of each +# broker node. This Hash should be of the form: +# { +# 'hostA' => { 'id' => 1, 'port' => 12345 }, +# 'hostB' => { 'id' => 2 },\ +# ... +# } +# 'port' is optional, and will default to 9092. +# # $enabled - If false, Kafka Mirror Maker service will not be # started. Default: true. # -# $destination_brokers - Hash of Kafka Broker to which you want to produce configs keyed by -# fqdn of each kafka broker node. This Hash -# should be of the form: -# { 'hostA' => { 'id' => 1, 'port' => 12345 }, 'hostB' => { 'id' => 2 }, ... } -# 'port' is optional, and will default to 9092. - -# $jmx_port - Port on which to expose JMX metrics. Default: 9999 +# $topic_whitelist - Java regex matching topics to mirror. +# You must set either this or $topic_blacklist +# Default: '.*' # -class kafka::mirror( - $enabled = true, - $destination_brokers = $kafka::defaults::brokers, +# $topic_blacklist - Java regex matching topics to not mirror. +# Default: undef +# You must set either this or $topic_whitelist +# +# $num_producers - Number of producer threads. Default: 1 +# +# $num_streams - Number of consumer threads. Default: 1 +# +# $queue_size - Size of intermediate consumer -> producer +# queue. Note that this is different than +# $queue_buffering_max_messages, which is the +# queue size of messages in async producers. +# Default: 10000 +# +# $heap_opts - Heap options to pass to JVM on startup. +# Default: undef +# +# $request_required_acks - Required number of acks for a produce request. +# Default: -1 (all replicas) +# $producer_type - sync or async. Default: async +# +# $compression_codec - none, gzip, or snappy. Default: snappy +# +# $batch_num_messages - If async producer, the number of messages +# to batch together in a single produce request. +# Default: 200 +# +# queue_buffering_max_ms - Maximum time to buffer data when using async +# mode. For example a setting of 100 will try to +# batch together 100ms of messages to send at +# once. Default: 5000 +# +# queue_buffering_max_messages - The maximum number of unsent messages that can +# be queued up the producer when using async +# mode before either the producer must be +# blocked or data must be dropped. +# Default: 10000 +# +# queue_enqueue_timeout_ms - The amount of time to block before dropping +# messages when running in async mode and the +# buffer has reached +# queue.buffering.max.messages. If set to 0 +# events will be enqueued immediately or dropped +# if the queue is full (the producer send call +# will never block). If set to -1 the producer +# will block indefinitely and never willingly +# drop a send. Default: -1 +# +# $jmx_port - Port on which to expose MirrorMaker +# JMX metrics. Default: 9998 +# +define kafka::mirror( + $destination_brokers, + $enabled = true, - $jmx_port = $kafka::defaults::jmx_port, + $topic_whitelist = '.*', + $topic_blacklist = undef, - $topic_whitelist = $kafka::defaults::topic_whitelist, - $topic_blacklist = $kafka::defaults::topic_blacklist, + $num_producers = 1, + $num_streams = 1, + $queue_size = 10000, + $heap_opts = undef, - $producer_type = $kafka::defaults::producer_type, - $producer_compression_codec = $kafka::defaults::producer_compression_codec, - $producer_serializer_class = $kafka::defaults::producer_serializer_class, - $producer_queue_buffering_max_ms = $kafka::defaults::producer_queue_buffering_max_ms, - $producer_queue_buffering_max_messages = $kafka::defaults::producer_queue_buffering_max_messages, - $producer_queue_enqueue_timeout_ms = $kafka::defaults::producer_queue_enqueue_timeout_ms, - $producer_batch_num_messages = $kafka::defaults::producer_batch_num_messages, + # Producer Settings + $request_required_acks = -1, + $producer_type = 'async', + $compression_codec = 'snappy', - # TODO: check on names here, can we make these consistent? - $num_streams = $kafka::defaults::num_streams, - $num_producers = $kafka::defaults::num_producers, - $queue_size = $kafka::defaults::queue_size, + # Async Producer Settings + $batch_num_messages = 200, + $queue_buffering_max_ms = 5000, + $queue_buffering_max_messages = 10000, + $queue_enqueue_timeout_ms = -1, - $producer_properties_template = $kafka::defaults::producer_properties_template, - $default_template = $kafka::defaults::mirror_default_template + $jmx_port = 9998, -) inherits kafka::defaults + $producer_properties_template = 'kafka/mirror/producer.properties.erb', + $systemd_service_template = 'kafka/mirror/kafka-mirror.systemd.erb', + $default_template = 'kafka/mirror/kafka-mirror.default.erb', + $log4j_properties_template = 'kafka/log4j.properties.erb', +) { # Kafka class must be included before kafka::mirror. # Using 'require' here rather than an explicit class dependency @@ -53,40 +140,68 @@ ensure => $::kafka::version } - file { '/etc/default/kafka-mirror': + # Remove the kafka-mirror .deb provided kafka-mirror files. + # This define will install instance specific ones. + file { [ + '/lib/systemd/system/kafka-mirror.service', + '/etc/default/kafka-mirror', + '/etc/kafka/mirror/log4j.properties', + ]: + ensure => 'absent' + } + + $mirror_name = $title + file { "/etc/default/kafka-mirror-${mirror_name}": content => template($default_template), require => Package['kafka-mirror'], } - # Make sure /etc/kafka/mirror is a directory. - # MirrorMaker will read consumer and producer - # properties files out of this directory. - file { '/etc/kafka/mirror': + file { "/etc/kafka/mirror/${mirror_name}": ensure => 'directory', + } + + # Log to custom log file for this MirrorMaker instance. + $kafka_log_file = "/var/log/kafka/kafka-mirror-${mirror_name}.log" + file { "/etc/kafka/mirror/${mirror_name}/log4j.properties": + content => template($log4j_properties_template), + } + + file { "/etc/kafka/mirror/${mirror_name}/producer.properties": + content => template($producer_properties_template), require => Package['kafka-mirror'], } - # MirrorMaker will produce to this cluster - # of Kafka Brokers. - $brokers = $destination_brokers - file { '/etc/kafka/mirror/producer.properties': - content => template($producer_properties_template), + # Realize all consumer properties files for this MirrorMaker instance. + # --consumer.configs will be passed a wildcard matching all of the + # files in /etc/kafka/mirror/$mirror_name/consumer*.properties + File <| tag == "kafka-mirror-${mirror_name}-consumer" |> + + # Render a systemd service unit file + file { "/lib/systemd/system/kafka-mirror-${mirror_name}.service": + content => template($systemd_service_template), require => Package['kafka-mirror'], + } + + # systemd needs a reload to pick up changes to this file. + exec { "systemd-reload-for-kafka-mirror-${mirror_name}": + command => '/bin/systemctl daemon-reload', + refreshonly => true, + subscribe => File["/lib/systemd/system/kafka-mirror-${mirror_name}.service"], } # Start the Kafka MirrorMaker daemon. # We don't want to subscribe to the config files here. # It will be better to manually restart Kafka MirrorMaker # when the config files changes. - $kafka_mirror_ensure = $enabled ? { + $service_ensure = $enabled ? { false => 'stopped', default => 'running', } - service { 'kafka-mirror': - ensure => $kafka_mirror_ensure, + service { "kafka-mirror-${mirror_name}": + ensure => $service_ensure, require => [ - File['/etc/kafka/mirror/producer.properties'], - File['/etc/default/kafka-mirror'], + File["/etc/kafka/mirror/${mirror_name}/producer.properties"], + Exec["systemd-reload-for-kafka-mirror-${mirror_name}"], ], hasrestart => true, hasstatus => true, diff --git a/manifests/mirror/consumer.pp b/manifests/mirror/consumer.pp index faba482..51fa206 100644 --- a/manifests/mirror/consumer.pp +++ b/manifests/mirror/consumer.pp @@ -1,24 +1,17 @@ # == Define kafka::mirror::consumer -# Kafka MirrorMaker takes multiple consumer.property files. -# Each one of these property files defines a Zookeeper URL -# (hosts/chroot) from which information about a Kafka Broker -# cluster may be read. This is a define so that nay number -# of source Kafka clusters may be configured. +# kafka::mirror::consumers are grouped together by $mirror_name. +# $mirror_name should match the title of the kafka::mirror instance +# you want this consumer to be associated with. # # == Usage: # kafka::mirror::consumer { 'clusterA': -# $zookeeper_hosts => ['zk1.example.com', 'zk2.example.com', 'zk3.example.com'], -# $zookeeper_chroot => '/kafka/clusterA', +# mirror_name => 'aggregate', +# zookeeper_url => 'zk:2181/kafka/clusterA', # } # # == Parameters: -# $zookeeper_hosts - Array of zookeeper hostname/IP(:port)s. -# -# $zookeeper_chroot - Znode Path in zookeeper in which to keep Kafka data. -# Default: undef (the root znode). Note that if you set -# this paramater, the Znode will not be created for you. -# You must do so manually yourself. See the README -# for instructions on how to do so. +# $zookeeper_url - URL to Kafka in zookeeper, including +# chroot. # # $zookeeper_connection_timeout_ms - Timeout in ms for connecting to zookeeper. # Default: 6000 @@ -26,17 +19,24 @@ # $zookeeper_session_timeout_ms - Timeout in ms for session to zookeeper. # Default: 6000 # -# $consumer_group_id - Consumer ID. This will be used to save the -# consumed high water mark for this consumer -# in Zookeeper. +# $auto_commit_enable - If true, periodically commit to zookeeper +# the offset of messages already fetched by +# the consumer. Default: true +# $auto_commit_interval_ms - The frequency in ms that the consumer +# offsets are committed to zookeeper. +# Default: 6000 +# $auto_offset_reset - smallest, largest, or throw exception. +# Default: largest # define kafka::mirror::consumer( - $zookeeper_hosts, - $zookeeper_chroot = undef, + $mirror_name, + $zookeeper_url, $zookeeper_connection_timeout_ms = 6000, $zookeeper_session_timeout_ms = 6000, - $consumer_group_id = "mirror${zookeeper_chroot}", - $consumer_properties_template = 'kafka/consumer.properties.erb' + $auto_commit_enable = true, + $auto_commit_interval_ms = 6000, + $auto_offset_reset = 'largest', + $consumer_properties_template = 'kafka/mirror/consumer.properties.erb' ) { # Kafka class must be included before kafka::mirror::consumer. @@ -48,8 +48,9 @@ # you want installed. require ::kafka - file { "/etc/kafka/mirror/consumer.${name}.properties": + @file { "/etc/kafka/mirror/${$mirror_name}/consumer.${title}.properties": content => template($consumer_properties_template), - before => Service['kafka-mirror'], + tag => ["kafka-mirror-${mirror_name}-consumer"], + before => Service["kafka-mirror-${mirror_name}"] } -} \ No newline at end of file +} diff --git a/templates/consumer.properties.erb b/templates/consumer.properties.erb deleted file mode 100644 index 346aa4c..0000000 --- a/templates/consumer.properties.erb +++ /dev/null @@ -1,23 +0,0 @@ -# Note: This file is managed by Puppet. - -# -# see kafka.consumer.ConsumerConfig for more details -# - -# Zookeeper connection string -# comma separated host:port pairs, each corresponding to a zk -# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" -zookeeper.connect=<%= @zookeeper_hosts.sort.join(',') %><%= @zookeeper_chroot if @zookeeper_chroot %> - -# The max time that the client waits while establishing a connection to Zookeeper. -zookeeper.connection.timeout.ms=<%= @zookeeper_connection_timeout_ms %> - -# ZooKeeper session timeout. If the consumer fails to heartbeat to Zookeeper -# for this period of time it is considered dead and a rebalance will occur. -zookeeper.session.timeout.ms=<%= @zookeeper_session_timeout_ms %> - -#consumer group id -group.id=<%= @consumer_group_id %> - -#consumer timeout -#consumer.timeout.ms=5000 diff --git a/templates/log4j.properties.erb b/templates/log4j.properties.erb index 40d7cfd..53460e6 100644 --- a/templates/log4j.properties.erb +++ b/templates/log4j.properties.erb @@ -10,7 +10,7 @@ log4j.appender.fileAppender=org.apache.log4j.RollingFileAppender log4j.appender.fileAppender.File=<%= @kafka_log_file %> log4j.appender.fileAppender.MaxFileSize=256MB -log4j.appender.fileAppender.MaxBackupIndex=<%= @log_max_backup_index %> +log4j.appender.fileAppender.MaxBackupIndex=<%= @log_max_backup_index ? @log_max_backup_index : '4' %> log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout log4j.appender.fileAppender.layout.ConversionPattern=[%d] %-4r [%t] %-5p %c %x - %m%n log4j.appender.fileAppender.Threshold=INFO diff --git a/templates/mirror/consumer.properties.erb b/templates/mirror/consumer.properties.erb new file mode 100644 index 0000000..a8022b2 --- /dev/null +++ b/templates/mirror/consumer.properties.erb @@ -0,0 +1,36 @@ +# Note: This file is managed by Puppet. + +# +# see kafka.consumer.ConsumerConfig for more details +# + +# Kafka Consumer group id +group.id=kafka-mirror-<%= @mirror_name %> + +# Zookeeper connection string +# comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" +zookeeper.connect=<%= @zookeeper_url %> + +# The max time that the client waits while establishing a connection +# to Zookeeper. +zookeeper.connection.timeout.ms=<%= @zookeeper_connection_timeout_ms %> + +# ZooKeeper session timeout. If the consumer fails to heartbeat to Zookeeper +# for this period of time it is considered dead and a rebalance will occur. +zookeeper.session.timeout.ms=<%= @zookeeper_session_timeout_ms %> + +# If true, periodically commit to zookeeper the offset of messages already +# fetched by the consumer. This committed offset will be used when the process +# fails as the position from which the new consumer will begin. +auto.commit.enable=<%= @auto_commit_enable %> + +# The frequency in ms that the consumer offsets are committed to zookeeper. +auto.commit.interval.ms=<% @auto_commit_interval_ms %> + +# What to do when there is no initial offset in Zookeeper or if an offset +# is out of range: +# * smallest: automatically reset the offset to the smallest offset +# * largest: automatically reset the offset to the largest offset +# * anything else: throw exception to the consumer. +auto.offset.reset=<%= auto_offset_reset %> diff --git a/templates/kafka-mirror.default.erb b/templates/mirror/kafka-mirror.default.erb similarity index 64% rename from templates/kafka-mirror.default.erb rename to templates/mirror/kafka-mirror.default.erb index cb47a43..0a8409f 100644 --- a/templates/kafka-mirror.default.erb +++ b/templates/mirror/kafka-mirror.default.erb @@ -25,24 +25,5 @@ #KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" <% end -%> #KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC" -#KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${KAFKA_CONFIG}/log4j.properties" +KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${KAFKA_CONFIG}/kafka-mirror-<%= @mirror_name %>/log4j.properties" #KAFKA_OPTS="" - -# -# MirrorMaker options: -# - -# Set this to a space separated list of consumer.properties files. -# By default, /etc/init.d/kafka-mirror will use any files that match -# /etc/kafka/mirror/consumer.* -#KAFKA_MIRROR_CONSUMER_CONFIGS= - -#KAFKA_MIRROR_PRODUCER_CONFIG=/etc/kafka/mirror/producer.properties - -# Only set one of the following. -#KAFKA_MIRROR_WHITELIST='.*' -#KAFKA_MIRROR_BLACKLIST='' - -#KAFKA_MIRROR_NUM_STREAMS=1 -#KAFKA_MIRROR_NUM_PRODUCERS=1 -#KAFKA_MIRROR_QUEUE_SIZE=10000 diff --git a/templates/mirror/producer.properties.erb b/templates/mirror/producer.properties.erb new file mode 100644 index 0000000..4a30410 --- /dev/null +++ b/templates/mirror/producer.properties.erb @@ -0,0 +1,69 @@ +# Note: This file is managed by Puppet. +<% +# @hosts is a hash of the form +# { +# 'hostA' => { 'id' => 1, 'port' => 12345 }, +# 'hostB' => { 'id' => 2 } +# } +# 'port' is not required to be specified in the hash, +# so we fetch it with a default of 9092 +# +# Sort hosts and loop through the resulting array +# to get a list of ['hostname1:port1', 'hostname2:port2', ...] +# for the broker.list setting. +broker_list=[] +@destination_brokers.sort.each do |broker_config| + hostname = broker_config[0] + port = broker_config[1].fetch('port', 9092).to_s + broker_list += ["#{hostname}:#{port}"] +end +-%> + +# +# see kafka.producer.ProducerConfig for more details +# + +############################# Producer Basics ############################# + + +# list of brokers used for bootstrapping +# format: host1:port1,host2:port2 ... +metadata.broker.list=<%= broker_list.join(',') %> + +# Logically identifies the application making the request. +client.id=kafka-mirror-<%= @mirror_name %> + +# specifies whether the messages are sent asynchronously (async) or synchronously (sync) +producer.type=<%= @producer_type %> + +# Required number of acks +request.required.acks=<%= @request_required_acks %> + +# specify the compression codec for all data generated: none , gzip, snappy. +# the old config values work as well: 0, 1, 2 for none, gzip, snappy, respectivally +compression.codec=<%= compression_codec %> + +<% if @producer_type == 'async' -%> +############################# Async Producer ############################# + +# Number of messages batched at the producer. Messages are grouped together +# and produced to partitions in batches of this number. +batch.num.messages=<%= @batch_num_messages %> + +# Maximum time, in milliseconds, for buffering data on the producer queue +queue.buffering.max.ms=<%= @queue_buffering_max_ms %> + +# Maximum size of the blocking queue for buffering on the producer +# This is the size of the producer's queue. If this fills up +# and queue.enqueue.timeout.ms is positive, messages in this queue +# will be dropped. If queue.enqueue.timeout.ms is negative, +# the producer will block until the queue is reduced. +queue.buffering.max.messages=<%= @queue_buffering_max_messages %> + +# Timeout for event enqueue: +# 0: events will be enqueued immediately or dropped if the queue is full +# -ve: enqueue will block indefinitely if the queue is full +# +ve: enqueue will block up to this many milliseconds if the queue is full +queue.enqueue.timeout.ms=<%= @queue_enqueue_timeout_ms %> + +<% end -%> diff --git a/templates/producer.properties.erb b/templates/producer.properties.erb deleted file mode 100644 index b4e5618..0000000 --- a/templates/producer.properties.erb +++ /dev/null @@ -1,64 +0,0 @@ -# Note: This file is managed by Puppet. -<% -# @hosts is a hash of the form -# { -# 'hostA' => { 'id' => 1, 'port' => 12345 }, -# 'hostB' => { 'id' => 2 } -# } -# 'port' is not required to be specified in the hash, -# so we fetch it with a default of 9092 -# -# Sort hosts and loop through the resulting array -# to get a list of ['hostname1:port1', 'hostname2:port2', ...] -# for the broker.list setting. -broker_list=[] [email protected] do |broker_config| - hostname = broker_config[0] - port = broker_config[1].fetch('port', 9092).to_s - broker_list += ["#{hostname}:#{port}"] -end --%> - -# -# see kafka.producer.ProducerConfig for more details -# - -############################# Producer Basics ############################# - - -# list of brokers used for bootstrapping -# format: host1:port1,host2:port2 ... -metadata.broker.list=<%= broker_list.join(',') %> - -# specifies whether the messages are sent asynchronously (async) or synchronously (sync) -producer.type=<%= @producer_type %> - -# name of the partitioner class for partitioning events; default partition spreads data randomly -#partitioner.class= - -# specify the compression codec for all data generated: none , gzip, snappy. -# the old config values work as well: 0, 1, 2 for none, gzip, snappy, respectivally -compression.codec=none - -# message encoder -serializer.class=kafka.serializer.StringEncoder - -# allow topic level compression -#compressed.topics= - -############################# Async Producer ############################# - -# maximum time, in milliseconds, for buffering data on the producer queue -#queue.buffering.max.ms= - -# the maximum size of the blocking queue for buffering on the producer -#queue.buffering.max.messages= - -# Timeout for event enqueue: -# 0: events will be enqueued immediately or dropped if the queue is full -# -ve: enqueue will block indefinitely if the queue is full -# +ve: enqueue will block up to this many milliseconds if the queue is full -#queue.enqueue.timeout.ms= - -# the number of messages batched at the producer -batch.num.messages=<%= @producer_batch_num_messages %> -- To view, visit https://gerrit.wikimedia.org/r/265789 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I836e20a8f009ed89452630f95e570f02c522c837 Gerrit-PatchSet: 1 Gerrit-Project: operations/puppet/kafka Gerrit-Branch: master Gerrit-Owner: Ottomata <[email protected]> _______________________________________________ MediaWiki-commits mailing list [email protected] https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits
