Ottomata has uploaded a new change for review.
https://gerrit.wikimedia.org/r/284349
Change subject: [WIP] Add new confluent module and puppetization for using
confluent Kafka
......................................................................
[WIP] Add new confluent module and puppetization for using confluent Kafka
Bug: T132631
Change-Id: I36d9e6f50da554f1674294e805fe03f1071c5016
---
A modules/confluent/files/kafka/kafka.sh
A modules/confluent/manifests/init.pp
A modules/confluent/manifests/kafka/broker.pp
A modules/confluent/manifests/kafka/client.pp
A modules/confluent/templates/initscripts/kafka.systemd.erb
A modules/confluent/templates/kafka/kafka-profile.sh.erb
A modules/confluent/templates/kafka/kafka.default.erb
A modules/confluent/templates/kafka/log4j.properties.erb
A modules/confluent/templates/kafka/server.properties.erb
9 files changed, 592 insertions(+), 0 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/operations/puppet
refs/changes/49/284349/1
diff --git a/modules/confluent/files/kafka/kafka.sh
b/modules/confluent/files/kafka/kafka.sh
new file mode 100755
index 0000000..9cc4b49
--- /dev/null
+++ b/modules/confluent/files/kafka/kafka.sh
@@ -0,0 +1,61 @@
+
+SCRIPT_NAME=$(basename "$0")
+
+commands=$(ls /usr/bin/kafka-* | xargs -n 1 basename | sed 's@kafka-@ @g')
+
+USAGE="
+$SCRIPT_NAME <command> [opts]
+
+Handy wrapper around various kafka-* scripts. Set the environment variables
+ZOOKEEPER_URL and BROKER_LIST so you don't have to keep typing
+--zookeeper-connect or --broker-list each time you want to use a kafka-*
+script.
+
+Usage:
+
+Run $SCRIPT_NAME <command> with zero arguments/options to see command usage.
+
+Commands:
+$commands
+
+Environment Variables:
+ ZOOKEEPER_URL - If this is set, any commands that take a --zookeeper flag
will be given this value.
+ BROKER_LIST - If this is set, any commands that take a --broker-list flag
will be given this value.
+"
+
+if [ -z "${1}" ]; then
+ echo "${USAGE}"
+ exit 1
+fi
+
+# All kafka scripts start with kafka-.
+command="kafka-${1}"
+shift
+
+# Set ZOOKEEPER_OPT if ZOOKEEPER_URL is set and --zookeeper has not
+# also been passed in as a CLI arg. This will be included
+# in command functions that take a --zookeeper argument.
+if [ -n "${ZOOKEEPER_URL}" -a -z "$(echo $@ | grep -- --zookeeper)" ]; then
+ ZOOKEEPER_OPT="--zookeeper ${ZOOKEEPER_URL}"
+fi
+
+# Set BROKER_LIST_OPT if BROKER_LIST is set and --broker-list has not
+# also been passed in as a CLI arg. This will be included
+# in command functions that take a --broker-list argument.
+if [ -n "${BROKER_LIST}" -a -z "$(echo $@ | grep -- --broker-list)" ]; then
+ BROKER_LIST_OPT="--broker-list ${BROKER_LIST}"
+fi
+
+# Each of these lists signifies that either --broker-list or --zookeeper needs
+# to be given to the $script. If $script matches one of these, then we
+# will add the opt if it is not provided already in $@.
+broker_list_commands="kafka-console-producer kafka-consumer-perf-test
kafka-replica-verification kafka-simple-consumer-shell
kafka-verifiable-consumer kafka-verifiable-producer"
+zookeeper_commands="kafka-configs kafka-console-consumer kafka-consumer-groups
kafka-consumer-perf-test kafka-preferred-replica-election
kafka-reassign-partitions kafka-replay-log-producer kafka-topics"
+
+EXTRA_OPTS=""
+echo "${broker_list_commands}" | /bin/grep -q "${script}" &&
EXTRA_OPTS="${EXTRA_OPTS} ${BROKER_LIST_OPT}"
+echo "${zookeeper_commands}" | /bin/grep -q "${script}" &&
EXTRA_OPTS="${EXTRA_OPTS} ${ZOOKEEPER_OPT}"
+
+# Print out the command we are about to exec, and then run it
+echo ${script} ${EXTRA_OPTS} $@"
+${script} ${EXTRA_OPTS} $@
diff --git a/modules/confluent/manifests/init.pp
b/modules/confluent/manifests/init.pp
new file mode 100644
index 0000000..c86d8c5
--- /dev/null
+++ b/modules/confluent/manifests/init.pp
@@ -0,0 +1,5 @@
+# == Class confluent($scala_version = '2.11.7')
+#
+class confluent($scala_version = '2.11.7') {
+
+}
diff --git a/modules/confluent/manifests/kafka/broker.pp
b/modules/confluent/manifests/kafka/broker.pp
new file mode 100644
index 0000000..516a233
--- /dev/null
+++ b/modules/confluent/manifests/kafka/broker.pp
@@ -0,0 +1,164 @@
+# == Class confluent::kafka::broker
+# Sets up a Kafka Broker and ensures that it is running.
+#
+# == Parameters:
+#
+class confluent::kafka::broker(
+ $enabled = true,
+ $brokers = {
+ "${::fqdn}" => {
+ 'id' => 1,
+ 'port' => 9092,
+ },
+ },
+ $listeners = ['PLAINTEXT://:9092'],
+ $log_dirs = ['/var/spool/kafka'],
+
+ $num_recovery_threads_per_data_dir = 1,
+
+ $zookeeper_hosts = ['localhost:2181'],
+ $zookeeper_chroot = undef,
+ $zookeeper_connection_timeout_ms = 6000,
+ $zookeeper_session_timeout_ms = 6000,
+
+ $java_home = undef,
+ $java_opts = undef,
+ $classpath = undef,
+ $jmx_port = 9999,
+ $heap_opts = undef,
+ $nofiles_ulimit = 8192,
+
+ $auto_create_topics_enable = true, # changed
+ $auto_leader_rebalance_enable = true,
+
+ $num_partitions = size($log_dirs),
+ $default_replication_factor = 1,
+ $replica_lag_time_max_ms = undef,
+ $num_recovery_threads_per_data_dir = 1,
+ $replica_socket_timeout_ms = 3000,
+ $replica_socket_receive_buffer_bytes = 65536,
+ $num_replica_fetchers = 1,
+ $replica_fetch_max_bytes = 1048576,
+
+ $num_network_threads = 3,
+ $num_io_threads = size($log_dirs),
+ $socket_send_buffer_bytes = 1048576,
+ $socket_receive_buffer_bytes = 1048576,
+ $socket_request_max_bytes = 104857600,
+
+ # TODO: Tune these?
+ $log_flush_interval_messages = 10000,
+ $log_flush_interval_ms = 1000,
+
+ $log_retention_hours = 168, # 1 week
+ $log_retention_bytes = undef,
+ $log_segment_bytes = 1073741824, # changed
+
+ $log_retention_check_interval_ms = 300000,
+ $log_cleanup_policy = 'delete',
+
+ $offsets_retention_minutes = 10080, # 1 week
+
+ # $metrics_properties =
$kafka::defaults::metrics_properties,
+ $log_max_backup_index = 4,
+ $jvm_performance_opts = undef,
+
+ $server_properties_template =
'confluent/kafka/server.properties.erb',
+ $default_template = 'confluent/kafka/kafka.default.erb',
+ $log4j_properties_template =
'confluent/kafka/log4j.properties.erb',
+) {
+ # confluent::kafka::client installs the kafka package
+ # and a handy wrapper script.
+ require ::confluent::kafka::client
+
+ # Get this broker's id out of the $kafka::brokers
+ # configuration hash.
+ $id = $brokers[$::fqdn]['id']
+
+ $default_port = 9092
+ # Using a conditional assignment selector with a
+ # Hash value results in a puppet syntax error.
+ # Using an if/else instead.
+ if ($brokers[$::fqdn]['port']) {
+ $port = $brokers[$::fqdn]['port']
+ }
+ else {
+ $port = $default_port
+ }
+
+ group { 'kafka':
+ ensure => 'present',
+ system => true,
+ require => Class['confluent::kafka::client']
+ }
+ # Kafka system user
+ user { 'kafka':
+ gid => 'kafka',
+ shell => '/bin/false',
+ home => '/nonexistent',
+ comment => 'Apache Kafka',
+ system => true,
+ managehome => false,
+ require => Group['kafka'],
+ }
+
+ # All following config files first require
+ # that the Kafka package has been installed.
+ File {
+ require => Class['confluent::kafka::client'],
+ }
+
+ file { '/var/log/kafka':
+ ensure => 'directory',
+ owner => 'kafka',
+ group => 'kafka',
+ mode => '0755',
+ }
+
+ # This is the message data directory,
+ # not to be confused with the $kafka_log_file,
+ # which contains daemon process logs.
+ file { $log_dirs:
+ ensure => 'directory',
+ owner => 'kafka',
+ group => 'kafka',
+ mode => '0755',
+ }
+
+ # Render out Kafka Broker config files.
+ file { '/etc/kafka/server.properties':
+ content => template($server_properties_template),
+ }
+
+ # log4j configuration for Kafka daemon
+ # process logs (this uses $kafka_log_dir).
+ file { '/etc/kafka/log4j.properties':
+ content => template($log4j_properties_template),
+ }
+
+ # Environment variables that are passed to kafka-run-class.
+ file { '/etc/default/kafka':
+ content => template($default_template),
+ }
+
+ # Start the Kafka server.
+ # We don't want to subscribe to the config files here.
+ # It will be better to manually restart Kafka when
+ # the config files changes.
+ $kafka_ensure = $enabled ? {
+ false => 'absent',
+ default => 'present',
+ }
+
+ base::service_unit{ 'kafka':
+ ensure => $kafka_ensure,
+ systemd => true,
+ refresh => false,
+ require => [
+ File[$log_dirs],
+ File['/etc/kafka/server.properties'],
+ File['/etc/kafka/log4j.properties'],
+ File['/etc/default/kafka'],
+ ],
+ }
+}
diff --git a/modules/confluent/manifests/kafka/client.pp
b/modules/confluent/manifests/kafka/client.pp
new file mode 100644
index 0000000..543ba19
--- /dev/null
+++ b/modules/confluent/manifests/kafka/client.pp
@@ -0,0 +1,20 @@
+# == Class confluent::kafka::client
+# Installs the confluent-kafka package and a handy kafka wrapper script
+#
+class confluent::kafka::client(
+ $java_package = 'openjdk-7-jdk',
+ $scala_version = '2.11.7'
+) {
+ require_package($java_package)
+
+ $package = "confluent-kafka-${scala_version}"
+ require_package($package)
+
+ file { '/usr/local/bin/kafka':
+ source => 'puppet:///modules/confluent/kafka/kafka.sh',
+ owner => 'root',
+ group => 'root',
+ mode => '0755',
+ require => [Package[$package], Package[$java_package]],
+ }
+}
diff --git a/modules/confluent/templates/initscripts/kafka.systemd.erb
b/modules/confluent/templates/initscripts/kafka.systemd.erb
new file mode 100644
index 0000000..d2521da
--- /dev/null
+++ b/modules/confluent/templates/initscripts/kafka.systemd.erb
@@ -0,0 +1,23 @@
+# NOTE: This file is managed by Puppet.
+
+[Unit]
+Description=Kafka Broker
+
+[Service]
+User=kafka
+Group=kafka
+
+# Set java.awt.headless=true if JAVA_OPTS is not set so the
+# Xalan XSL transformer can work without X11 display on JDK 1.4+
+Environment="JAVA_OPTS=-Djava.awt.headless=true"
+
+# Load any environment overrides from this file.
+EnvironmentFile=-/etc/default/kafka
+
+# Increase limit on number of open files.
+LimitNOFILE=<%= @nofiles_ulimit %>
+
+ExecStart=/usr/bin/kafka-server-start ${KAFKA_CONFIG}/server.properties
+
+[Install]
+WantedBy=multi-user.target
diff --git a/modules/confluent/templates/kafka/kafka-profile.sh.erb
b/modules/confluent/templates/kafka/kafka-profile.sh.erb
new file mode 100644
index 0000000..0eaa625
--- /dev/null
+++ b/modules/confluent/templates/kafka/kafka-profile.sh.erb
@@ -0,0 +1,4 @@
+# NOTE: This file is managed by Puppet
+
+export ZOOKEEPER_URL=<%= @zookeeper_url %>
+export BROKER_LIST=<%= @brokers_string %>
diff --git a/modules/confluent/templates/kafka/kafka.default.erb
b/modules/confluent/templates/kafka/kafka.default.erb
new file mode 100644
index 0000000..bf6fb85
--- /dev/null
+++ b/modules/confluent/templates/kafka/kafka.default.erb
@@ -0,0 +1,45 @@
+# NOTE: This file is managed by Puppet.
+
+KAFKA_CONFIG=/etc/kafka
+
+<% if @java_home -%>
+JAVA_HOME="<%= @java_home %>"
+<% end -%>
+
+# Extra Java options
+<% if @java_opts -%>
+JAVA_OPTS="<%= @java_opts %>"
+<% else -%>
+#JAVA_OPTS=""
+<% end -%>
+
+# Extra classpath entries
+<% if @classpath -%>
+CLASSPATH="<%= @classpath %>"
+<% else -%>
+#CLASSPATH=""
+<% end -%>
+
+# The default JMX_PORT for Kafka Brokers is 9999.
+# Set JMX_PORT to something else to override this.
+JMX_PORT=<%= @jmx_port %>
+#KAFKA_JMX_OPTS=${KAFKA_JMX_OPTS:="-Dcom.sun.management.jmxremote
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false"}
+
+<% if @jvm_performance_opts -%>
+KAFKA_JVM_PERFORMANCE_OPTS="<%= @jvm_performance_opts %>"
+<% else -%>
+# Default GC settings.
+# KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20
-XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC
-Djava.awt.headless=true"
+# LinkedIn recommended GC settings. See:
http://kafka.apache.org/documentation.html#java
+# You need Java 7 u51 or greater to use the G1 GC.
+#KAFKA_JVM_PERFORMANCE_OPTS="-Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC
-XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35
-XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50
-XX:MaxMetaspaceFreeRatio=80"
+<% end -%>
+
+# Memory sizes, and logging configuration
+<% if @heap_opts -%>
+KAFKA_HEAP_OPTS="<%= @heap_opts %>"
+<% else -%>
+#KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
+<% end -%>
+#KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${KAFKA_CONFIG}/log4j.properties"
+# KAFKA_OPTS="-XX:GCLogFileSize=50M -XX:+UseGCLogFileRotation
-XX:NumberOfGCLogFiles=5"
diff --git a/modules/confluent/templates/kafka/log4j.properties.erb
b/modules/confluent/templates/kafka/log4j.properties.erb
new file mode 100644
index 0000000..ae657fd
--- /dev/null
+++ b/modules/confluent/templates/kafka/log4j.properties.erb
@@ -0,0 +1,79 @@
+# NOTE: This file is managed by Puppet.
+
+log4j.rootLogger=INFO, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.kafkaAppender=org.apache.log4j.RollingFileAppender
+log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log
+log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+log4j.appender.kafkaAppender.MaxFileSize=256MB
+log4j.appender.kafkaAppender.MaxBackupIndex=<%= @log_max_backup_index ?
@log_max_backup_index : '4' %>
+
+log4j.appender.stateChangeAppender=org.apache.log4j.RollingFileAppender
+log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log
+log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+log4j.appender.stateChangeAppender.MaxFileSize=256MB
+log4j.appender.stateChangeAppender.MaxBackupIndex=<%= @log_max_backup_index ?
@log_max_backup_index : '4' %>
+
+log4j.appender.requestAppender=org.apache.log4j.RollingFileAppender
+log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log
+log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+log4j.appender.requestAppender.MaxFileSize=256MB
+log4j.appender.requestAppender.MaxBackupIndex=<%= @log_max_backup_index ?
@log_max_backup_index : '4' %>
+
+log4j.appender.cleanerAppender=org.apache.log4j.RollingFileAppender
+log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log
+log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+log4j.appender.cleanerAppender.MaxFileSize=256MB
+log4j.appender.cleanerAppender.MaxBackupIndex=<%= @log_max_backup_index ?
@log_max_backup_index : '4' %>
+
+log4j.appender.controllerAppender=org.apache.log4j.RollingFileAppender
+log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log
+log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+log4j.appender.controllerAppender.MaxFileSize=256MB
+log4j.appender.controllerAppender.MaxBackupIndex=<%= @log_max_backup_index ?
@log_max_backup_index : '4' %>
+
+log4j.appender.authorizerAppender=org.apache.log4j.RollingFileAppender
+log4j.appender.authorizerAppender.File=${kafka.logs.dir}/kafka-authorizer.log
+log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+log4j.appender.authorizerAppender.MaxFileSize=256MB
+log4j.appender.authorizerAppender.MaxBackupIndex=<%= @log_max_backup_index ?
@log_max_backup_index : '4' %>
+
+# Turn on all our debugging info
+#log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender
+#log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender
+#log4j.logger.kafka.perf=DEBUG, kafkaAppender
+#log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG,
kafkaAppender
+#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
+log4j.logger.kafka=INFO, kafkaAppender
+
+log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender
+log4j.additivity.kafka.network.RequestChannel$=false
+
+#log4j.logger.kafka.network.Processor=TRACE, requestAppender
+#log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
+#log4j.additivity.kafka.server.KafkaApis=false
+log4j.logger.kafka.request.logger=WARN, requestAppender
+log4j.additivity.kafka.request.logger=false
+
+log4j.logger.kafka.controller=TRACE, controllerAppender
+log4j.additivity.kafka.controller=false
+
+log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender
+log4j.additivity.kafka.log.LogCleaner=false
+
+log4j.logger.state.change.logger=TRACE, stateChangeAppender
+log4j.additivity.state.change.logger=false
+
+#Change this to debug to get the actual audit log for authorizer.
+log4j.logger.kafka.authorizer.logger=WARN, authorizerAppender
+log4j.additivity.kafka.authorizer.logger=false
diff --git a/modules/confluent/templates/kafka/server.properties.erb
b/modules/confluent/templates/kafka/server.properties.erb
new file mode 100644
index 0000000..6ea1a4c
--- /dev/null
+++ b/modules/confluent/templates/kafka/server.properties.erb
@@ -0,0 +1,191 @@
+# NOTE: This file is managed by Puppet.
+
+############################# Server Basics #############################
+
+# The id of the broker. This must be set to a unique integer for each broker.
+broker.id=<%= @id %>
+
+############################# Socket Server Settings
#############################
+
+listeners=<%= Array(@listeners).join(',') %>
+
+# The port the socket server listens on
+port=<%= @port %>
+
+# Hostname the broker will bind to. If not set, the server will bind to all
interfaces
+#host.name=localhost
+
+# Hostname the broker will advertise to producers and consumers. If not set,
it uses the
+# value for "host.name" if configured. Otherwise, it will use the value
returned from
+# java.net.InetAddress.getCanonicalHostName().
+#advertised.host.name=<hostname routable by clients>
+
+# The port to publish to ZooKeeper for clients to use. If this is not set,
+# it will publish the same port that the broker binds to.
+#advertised.port=<port accessible by clients>
+
+# The number of threads handling network requests
+num.network.threads=<%= @num_network_threads %>
+
+# The number of threads doing disk I/O
+num.io.threads=<%= @num_io_threads %>
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer.bytes=<%= @socket_send_buffer_bytes %>
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer.bytes=<%= @socket_receive_buffer_bytes %>
+
+# The maximum size of a request that the socket server will accept (protection
against OOM)
+socket.request.max.bytes=<%= @socket_request_max_bytes %>
+
+
+############################# Log Basics #############################
+
+# A comma seperated list of directories under which to store log files
+log.dirs=<%= Array(@log_dirs).join(',') %>
+
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
+num.partitions=<%= @num_partitions %>
+
+# The default replication factor for automatically created topics.
+# Default to the number of brokers in this cluster.
+default.replication.factor=<%= @default_replication_factor %>
+
+# Enable auto creation of topic on the server. If this is set to true
+# then attempts to produce, consume, or fetch metadata for a non-existent
+# topic will automatically create it with the default replication factor
+# and number of partitions.
+auto.create.topics.enable=<%= @auto_create_topics_enable ? 'true' : 'false' %>
+
+# If this is enabled the controller will automatically try to balance
+# leadership for partitions among the brokers by periodically returning
+# leadership to the "preferred" replica for each partition if it is available.
+auto.leader.rebalance.enable=<%= @auto_leader_rebalance_enable %>
+
+<% if @replica_lag_time_max_ms -%>
+# If a follower hasn't sent any fetch requests for this window of time,
+# the leader will remove the follower from ISR and treat it as dead.
+replica.lag.time.max.ms=<%= @replica_lag_time_max_ms %>
+<% end -%>
+
+# The number of threads per data directory to be used for log recovery at
startup and flushing at shutdown.
+# This value is recommended to be increased for installations with data dirs
located in RAID array.
+num.recovery.threads.per.data.dir=<%= @num_recovery_threads_per_data_dir %>
+
+<% if @replica_socket_timeout_ms -%>
+# The socket timeout for network requests to the leader for replicating data.
+replica.socket.timeout.ms=<%= @replica_socket_timeout_ms %>
+<% end -%>
+
+<% if @replica_socket_receive_buffer_bytes -%>
+# The socket receive buffer for network requests to the leader for replicating
data.
+replica.socket.receive.buffer.bytes=<%= @replica_socket_receive_buffer_bytes %>
+<% end -%>
+
+# Number of threads used to replicate messages from leaders. Increasing this
+# value can increase the degree of I/O parallelism in the follower broker.
+# This is useful to temporarily increase if you have a broker that needs
+# to catch up on messages to get back into the ISR.
+num.replica.fetchers=<%= @num_replica_fetchers %>
+
+# The number of byes of messages to attempt to fetch for each partition in the
+# fetch requests the replicas send to the leader.
+replica.fetch.max.bytes=<%= @replica_fetch_max_bytes %>
+
+
+############################# Log Flush Policy #############################
+
+# Messages are immediately written to the filesystem but by default we only
fsync() to sync
+# the OS cache lazily. The following configurations control the flush of data
to disk.
+# There are a few important trade-offs here:
+# 1. Durability: Unflushed data may be lost if you are not using
replication.
+# 2. Latency: Very large flush intervals may lead to latency spikes when
the flush does occur as there will be a lot of data to flush.
+# 3. Throughput: The flush is generally the most expensive operation, and a
small flush interval may lead to exceessive seeks.
+# The settings below allow one to configure the flush policy to flush data
after a period of time or
+# every N messages (or both). This can be done globally and overridden on a
per-topic basis.
+
+# The number of messages accumulated on a log partition before messages
+# are flushed to disk. Default 9223372036854775807.
+<%= @log_flush_interval_messages ?
"log.flush.interval.messages=#{@log_flush_interval_messages}" :
"#log.flush.interval.messages=" %>
+
+# The maximum time in ms that a message in any topic is kept in memory before
+# flushed to disk. If not set, the value in log.flush.scheduler.interval.ms
+# is used.
+<%= @log_flush_interval_ms ? "log.flush.interval.ms=#{@log_flush_interval_ms}"
: "#log.flush.interval.ms=" %>
+
+
+
+############################# Log Retention Policy
#############################
+
+# The following configurations control the disposal of log segments. The
policy can
+# be set to delete segments after a period of time, or after a given size has
accumulated.
+# A segment will be deleted whenever *either* of these criteria are met.
Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion
+<%= @log_retention_hours ? "log.retention.hours=#{@log_retention_hours}" :
"#log.retention.hours=" %>
+
+# A size-based retention policy for logs. Segments are pruned from the log as
long as the remaining
+# segments don't drop below log.retention.bytes.
+<%= @log_retention_bytes ? "log.retention.bytes=#{@log_retention_bytes}" :
"#log.retention.bytes=" %>
+
+# The maximum size of a log segment file. When this size is reached a new log
segment will be created.
+log.segment.bytes=<%= @log_segment_bytes %>
+
+# The interval at which log segments are checked to see if they can be deleted
according
+# to the retention policies
+log.retention.check.interval.ms=<%= @log_retention_check_interval_ms %>
+
+# The default cleanup policy for segments beyond the retention window,
+# must be either "delete" or "compact"
+log.cleanup.policy=<%= @log_cleanup_policy %>
+
+# Log retention window in minutes for offsets topic. If an offset
+# commit for a consumer group has not been recieved in this amount of
+# time, Kafka will drop the offset commit and consumers in the group
+# will have to start a new. This can be overridden in an offset commit
+# request.
+offsets.retention.minutes=<%= @offsets_retention_minutes %>
+
+############################# Zookeeper #############################
+
+# Zookeeper connection string (see zookeeper docs for details).
+# This is a comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
+# You can also append an optional chroot string to the urls to specify the
+# root directory for all kafka znodes.
+zookeeper.connect=<%= @zookeeper_hosts.sort.join(',') %><%= @zookeeper_chroot
if @zookeeper_chroot %>
+
+# The maximum amount of time in ms that the client waits to establish a
+# connection to zookeeper
+zookeeper.connection.timeout.ms=<%= @zookeeper_connection_timeout_ms %>
+
+# Zookeeper session timeout. If the server fails to heartbeat to Zookeeper
+# within this period of time it is considered dead. If you set this too low
+# the server may be falsely considered dead; if you set it too high it may
+# take too long to recognize a truly dead server.
+zookeeper.session.timeout.ms=<%= @zookeeper_session_timeout_ms %>
+
+##################### Confluent Proactive Support ######################
+
+# If set to true, then the feature to collect and report support metrics
+# ("Metrics") is enabled. If set to false, the feature is disabled.
+#
+confluent.support.metrics.enable=false
+
+# The customer ID under which support metrics will be collected and
+# reported.
+#
+# When the customer ID is set to "anonymous" (the default), then only a
+# reduced set of metrics is being collected and reported.
+#
+# Confluent customers
+# -------------------
+# If you are a Confluent customer, then you should replace the default
+# value with your actual Confluent customer ID. Doing so will ensure
+# that additional support metrics will be collected and reported.
+#
+confluent.support.customer.id=anonymous
\ No newline at end of file
--
To view, visit https://gerrit.wikimedia.org/r/284349
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I36d9e6f50da554f1674294e805fe03f1071c5016
Gerrit-PatchSet: 1
Gerrit-Project: operations/puppet
Gerrit-Branch: production
Gerrit-Owner: Ottomata <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits