Ottomata has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/284349

Change subject: [WIP] Add new confluent module and puppetization for using 
confluent Kafka
......................................................................

[WIP] Add new confluent module and puppetization for using confluent Kafka

Bug: T132631
Change-Id: I36d9e6f50da554f1674294e805fe03f1071c5016
---
A modules/confluent/files/kafka/kafka.sh
A modules/confluent/manifests/init.pp
A modules/confluent/manifests/kafka/broker.pp
A modules/confluent/manifests/kafka/client.pp
A modules/confluent/templates/initscripts/kafka.systemd.erb
A modules/confluent/templates/kafka/kafka-profile.sh.erb
A modules/confluent/templates/kafka/kafka.default.erb
A modules/confluent/templates/kafka/log4j.properties.erb
A modules/confluent/templates/kafka/server.properties.erb
9 files changed, 592 insertions(+), 0 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/operations/puppet 
refs/changes/49/284349/1

diff --git a/modules/confluent/files/kafka/kafka.sh 
b/modules/confluent/files/kafka/kafka.sh
new file mode 100755
index 0000000..9cc4b49
--- /dev/null
+++ b/modules/confluent/files/kafka/kafka.sh
@@ -0,0 +1,61 @@
+
+SCRIPT_NAME=$(basename "$0")
+
+commands=$(ls /usr/bin/kafka-* | xargs -n 1 basename | sed 's@kafka-@  @g')
+
+USAGE="
+$SCRIPT_NAME <command> [opts]
+
+Handy wrapper around various kafka-* scripts.  Set the environment variables
+ZOOKEEPER_URL and BROKER_LIST so you don't have to keep typing
+--zookeeper-connect or --broker-list each time you want to use a kafka-*
+script.
+
+Usage:
+
+Run $SCRIPT_NAME <command> with zero arguments/options to see command usage.
+
+Commands:
+$commands
+
+Environment Variables:
+  ZOOKEEPER_URL - If this is set, any commands that take a --zookeeper flag 
will be given this value.
+  BROKER_LIST   - If this is set, any commands that take a --broker-list flag 
will be given this value.
+"
+
+if [ -z "${1}" ]; then
+    echo "${USAGE}"
+    exit 1
+fi
+
+# All kafka scripts start with kafka-.
+command="kafka-${1}"
+shift
+
+# Set ZOOKEEPER_OPT if ZOOKEEPER_URL is set and --zookeeper has not
+# also been passed in as a CLI arg.  This will be included
+# in command functions that take a --zookeeper argument.
+if [ -n "${ZOOKEEPER_URL}" -a -z "$(echo $@ | grep -- --zookeeper)" ]; then
+    ZOOKEEPER_OPT="--zookeeper ${ZOOKEEPER_URL}"
+fi
+
+# Set BROKER_LIST_OPT if BROKER_LIST is set and --broker-list has not
+# also been passed in as a CLI arg.  This will be included
+# in command functions that take a --broker-list argument.
+if [ -n "${BROKER_LIST}" -a -z "$(echo $@ | grep -- --broker-list)" ]; then
+    BROKER_LIST_OPT="--broker-list ${BROKER_LIST}"
+fi
+
+# Each of these lists signifies that either --broker-list or --zookeeper needs
+# to be given to the $script.  If $script matches one of these, then we
+# will add the opt if it is not provided already in $@.
+broker_list_commands="kafka-console-producer kafka-consumer-perf-test 
kafka-replica-verification kafka-simple-consumer-shell 
kafka-verifiable-consumer kafka-verifiable-producer"
+zookeeper_commands="kafka-configs kafka-console-consumer kafka-consumer-groups 
kafka-consumer-perf-test kafka-preferred-replica-election 
kafka-reassign-partitions kafka-replay-log-producer kafka-topics"
+
+EXTRA_OPTS=""
+echo "${broker_list_commands}" | /bin/grep -q "${script}" && 
EXTRA_OPTS="${EXTRA_OPTS} ${BROKER_LIST_OPT}"
+echo "${zookeeper_commands}" | /bin/grep -q "${script}" && 
EXTRA_OPTS="${EXTRA_OPTS} ${ZOOKEEPER_OPT}"
+
+# Print out the command we are about to exec, and then run it
+echo ${script} ${EXTRA_OPTS} $@"
+${script} ${EXTRA_OPTS} $@
diff --git a/modules/confluent/manifests/init.pp 
b/modules/confluent/manifests/init.pp
new file mode 100644
index 0000000..c86d8c5
--- /dev/null
+++ b/modules/confluent/manifests/init.pp
@@ -0,0 +1,5 @@
+# == Class confluent($scala_version = '2.11.7')
+#
+class confluent($scala_version = '2.11.7') {
+
+}
diff --git a/modules/confluent/manifests/kafka/broker.pp 
b/modules/confluent/manifests/kafka/broker.pp
new file mode 100644
index 0000000..516a233
--- /dev/null
+++ b/modules/confluent/manifests/kafka/broker.pp
@@ -0,0 +1,164 @@
+# == Class confluent::kafka::broker
+# Sets up a Kafka Broker and ensures that it is running.
+#
+# == Parameters:
+#
+class confluent::kafka::broker(
+    $enabled                             = true,
+    $brokers                             = {
+        "${::fqdn}" => {
+            'id'   => 1,
+            'port' => 9092,
+        },
+    },
+    $listeners                           = ['PLAINTEXT://:9092'],
+    $log_dirs                            = ['/var/spool/kafka'],
+
+    $num_recovery_threads_per_data_dir   = 1,
+
+    $zookeeper_hosts                     = ['localhost:2181'],
+    $zookeeper_chroot                    = undef,
+    $zookeeper_connection_timeout_ms     = 6000,
+    $zookeeper_session_timeout_ms        = 6000,
+
+    $java_home                           = undef,
+    $java_opts                           = undef,
+    $classpath                           = undef,
+    $jmx_port                            = 9999,
+    $heap_opts                           = undef,
+    $nofiles_ulimit                      = 8192,
+
+    $auto_create_topics_enable           = true, # changed
+    $auto_leader_rebalance_enable        = true,
+
+    $num_partitions                      = size($log_dirs),
+    $default_replication_factor          = 1,
+    $replica_lag_time_max_ms             = undef,
+    $num_recovery_threads_per_data_dir   = 1,
+    $replica_socket_timeout_ms           = 3000,
+    $replica_socket_receive_buffer_bytes = 65536,
+    $num_replica_fetchers                = 1,
+    $replica_fetch_max_bytes             = 1048576,
+
+    $num_network_threads                 = 3,
+    $num_io_threads                      = size($log_dirs),
+    $socket_send_buffer_bytes            = 1048576,
+    $socket_receive_buffer_bytes         = 1048576,
+    $socket_request_max_bytes            = 104857600,
+
+    # TODO: Tune these?
+    $log_flush_interval_messages         = 10000,
+    $log_flush_interval_ms               = 1000,
+
+    $log_retention_hours                 = 168,     # 1 week
+    $log_retention_bytes                 = undef,
+    $log_segment_bytes                   = 1073741824, # changed
+
+    $log_retention_check_interval_ms     = 300000,
+    $log_cleanup_policy                  = 'delete',
+
+    $offsets_retention_minutes           = 10080,   # 1 week
+
+    # $metrics_properties                  = 
$kafka::defaults::metrics_properties,
+    $log_max_backup_index                = 4,
+    $jvm_performance_opts                = undef,
+
+    $server_properties_template          = 
'confluent/kafka/server.properties.erb',
+    $default_template                    = 'confluent/kafka/kafka.default.erb',
+    $log4j_properties_template           = 
'confluent/kafka/log4j.properties.erb',
+) {
+    # confluent::kafka::client installs the kafka package
+    # and a handy wrapper script.
+    require ::confluent::kafka::client
+
+    # Get this broker's id out of the $kafka::brokers
+    # configuration hash.
+    $id = $brokers[$::fqdn]['id']
+
+    $default_port = 9092
+    # Using a conditional assignment selector with a
+    # Hash value results in a puppet syntax error.
+    # Using an if/else instead.
+    if ($brokers[$::fqdn]['port']) {
+        $port = $brokers[$::fqdn]['port']
+    }
+    else {
+        $port = $default_port
+    }
+
+    group { 'kafka':
+        ensure  => 'present',
+        system  => true,
+        require => Class['confluent::kafka::client']
+    }
+    # Kafka system user
+    user { 'kafka':
+        gid        => 'kafka',
+        shell      => '/bin/false',
+        home       => '/nonexistent',
+        comment    => 'Apache Kafka',
+        system     => true,
+        managehome => false,
+        require    => Group['kafka'],
+    }
+
+    # All following config files first require
+    # that the Kafka package has been installed.
+    File {
+        require => Class['confluent::kafka::client'],
+    }
+
+    file { '/var/log/kafka':
+        ensure  => 'directory',
+        owner   => 'kafka',
+        group   => 'kafka',
+        mode    => '0755',
+    }
+
+    # This is the message data directory,
+    # not to be confused with the $kafka_log_file,
+    # which contains daemon process logs.
+    file { $log_dirs:
+        ensure  => 'directory',
+        owner   => 'kafka',
+        group   => 'kafka',
+        mode    => '0755',
+    }
+
+    # Render out Kafka Broker config files.
+    file { '/etc/kafka/server.properties':
+        content => template($server_properties_template),
+    }
+
+    # log4j configuration for Kafka daemon
+    # process logs (this uses $kafka_log_dir).
+    file { '/etc/kafka/log4j.properties':
+        content => template($log4j_properties_template),
+    }
+
+    # Environment variables that are passed to kafka-run-class.
+    file { '/etc/default/kafka':
+        content => template($default_template),
+    }
+
+    # Start the Kafka server.
+    # We don't want to subscribe to the config files here.
+    # It will be better to manually restart Kafka when
+    # the config files changes.
+    $kafka_ensure = $enabled ? {
+        false   => 'absent',
+        default => 'present',
+    }
+
+    base::service_unit{ 'kafka':
+        ensure  => $kafka_ensure,
+        systemd => true,
+        refresh => false,
+        require => [
+            File[$log_dirs],
+            File['/etc/kafka/server.properties'],
+            File['/etc/kafka/log4j.properties'],
+            File['/etc/default/kafka'],
+        ],
+    }
+}
diff --git a/modules/confluent/manifests/kafka/client.pp 
b/modules/confluent/manifests/kafka/client.pp
new file mode 100644
index 0000000..543ba19
--- /dev/null
+++ b/modules/confluent/manifests/kafka/client.pp
@@ -0,0 +1,20 @@
+# == Class confluent::kafka::client
+# Installs the confluent-kafka package and a handy kafka wrapper script
+#
+class confluent::kafka::client(
+    $java_package  = 'openjdk-7-jdk',
+    $scala_version = '2.11.7'
+) {
+    require_package($java_package)
+
+    $package = "confluent-kafka-${scala_version}"
+    require_package($package)
+
+    file { '/usr/local/bin/kafka':
+        source => 'puppet:///modules/confluent/kafka/kafka.sh',
+        owner  => 'root',
+        group  => 'root',
+        mode   => '0755',
+        require => [Package[$package], Package[$java_package]],
+    }
+}
diff --git a/modules/confluent/templates/initscripts/kafka.systemd.erb 
b/modules/confluent/templates/initscripts/kafka.systemd.erb
new file mode 100644
index 0000000..d2521da
--- /dev/null
+++ b/modules/confluent/templates/initscripts/kafka.systemd.erb
@@ -0,0 +1,23 @@
+# NOTE: This file is managed by Puppet.
+
+[Unit]
+Description=Kafka Broker
+
+[Service]
+User=kafka
+Group=kafka
+
+# Set java.awt.headless=true if JAVA_OPTS is not set so the
+# Xalan XSL transformer can work without X11 display on JDK 1.4+
+Environment="JAVA_OPTS=-Djava.awt.headless=true"
+
+# Load any environment overrides from this file.
+EnvironmentFile=-/etc/default/kafka
+
+# Increase limit on number of open files.
+LimitNOFILE=<%= @nofiles_ulimit %>
+
+ExecStart=/usr/bin/kafka-server-start ${KAFKA_CONFIG}/server.properties
+
+[Install]
+WantedBy=multi-user.target
diff --git a/modules/confluent/templates/kafka/kafka-profile.sh.erb 
b/modules/confluent/templates/kafka/kafka-profile.sh.erb
new file mode 100644
index 0000000..0eaa625
--- /dev/null
+++ b/modules/confluent/templates/kafka/kafka-profile.sh.erb
@@ -0,0 +1,4 @@
+# NOTE:  This file is managed by Puppet
+
+export ZOOKEEPER_URL=<%= @zookeeper_url %>
+export BROKER_LIST=<%= @brokers_string %>
diff --git a/modules/confluent/templates/kafka/kafka.default.erb 
b/modules/confluent/templates/kafka/kafka.default.erb
new file mode 100644
index 0000000..bf6fb85
--- /dev/null
+++ b/modules/confluent/templates/kafka/kafka.default.erb
@@ -0,0 +1,45 @@
+# NOTE: This file is managed by Puppet.
+
+KAFKA_CONFIG=/etc/kafka
+
+<% if @java_home -%>
+JAVA_HOME="<%= @java_home %>"
+<% end -%>
+
+# Extra Java options
+<% if @java_opts -%>
+JAVA_OPTS="<%= @java_opts %>"
+<% else -%>
+#JAVA_OPTS=""
+<% end -%>
+
+# Extra classpath entries
+<% if @classpath -%>
+CLASSPATH="<%= @classpath %>"
+<% else -%>
+#CLASSPATH=""
+<% end -%>
+
+# The default JMX_PORT for Kafka Brokers is 9999.
+# Set JMX_PORT to something else to override this.
+JMX_PORT=<%= @jmx_port %>
+#KAFKA_JMX_OPTS=${KAFKA_JMX_OPTS:="-Dcom.sun.management.jmxremote 
-Dcom.sun.management.jmxremote.authenticate=false 
-Dcom.sun.management.jmxremote.ssl=false"}
+
+<% if @jvm_performance_opts -%>
+KAFKA_JVM_PERFORMANCE_OPTS="<%= @jvm_performance_opts %>"
+<% else -%>
+# Default GC settings.
+# KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 
-XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC 
-Djava.awt.headless=true"
+# LinkedIn recommended GC settings.  See: 
http://kafka.apache.org/documentation.html#java
+# You need Java 7 u51 or greater to use the G1 GC.
+#KAFKA_JVM_PERFORMANCE_OPTS="-Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC 
-XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 
-XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 
-XX:MaxMetaspaceFreeRatio=80"
+<% end -%>
+
+# Memory sizes, and logging configuration
+<% if @heap_opts -%>
+KAFKA_HEAP_OPTS="<%= @heap_opts %>"
+<% else -%>
+#KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
+<% end -%>
+#KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${KAFKA_CONFIG}/log4j.properties"
+# KAFKA_OPTS="-XX:GCLogFileSize=50M -XX:+UseGCLogFileRotation 
-XX:NumberOfGCLogFiles=5"
diff --git a/modules/confluent/templates/kafka/log4j.properties.erb 
b/modules/confluent/templates/kafka/log4j.properties.erb
new file mode 100644
index 0000000..ae657fd
--- /dev/null
+++ b/modules/confluent/templates/kafka/log4j.properties.erb
@@ -0,0 +1,79 @@
+# NOTE: This file is managed by Puppet.
+
+log4j.rootLogger=INFO, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.kafkaAppender=org.apache.log4j.RollingFileAppender
+log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log
+log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+log4j.appender.kafkaAppender.MaxFileSize=256MB
+log4j.appender.kafkaAppender.MaxBackupIndex=<%= @log_max_backup_index ? 
@log_max_backup_index : '4' %>
+
+log4j.appender.stateChangeAppender=org.apache.log4j.RollingFileAppender
+log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log
+log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+log4j.appender.stateChangeAppender.MaxFileSize=256MB
+log4j.appender.stateChangeAppender.MaxBackupIndex=<%= @log_max_backup_index ? 
@log_max_backup_index : '4' %>
+
+log4j.appender.requestAppender=org.apache.log4j.RollingFileAppender
+log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log
+log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+log4j.appender.requestAppender.MaxFileSize=256MB
+log4j.appender.requestAppender.MaxBackupIndex=<%= @log_max_backup_index ? 
@log_max_backup_index : '4' %>
+
+log4j.appender.cleanerAppender=org.apache.log4j.RollingFileAppender
+log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log
+log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+log4j.appender.cleanerAppender.MaxFileSize=256MB
+log4j.appender.cleanerAppender.MaxBackupIndex=<%= @log_max_backup_index ? 
@log_max_backup_index : '4' %>
+
+log4j.appender.controllerAppender=org.apache.log4j.RollingFileAppender
+log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log
+log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+log4j.appender.controllerAppender.MaxFileSize=256MB
+log4j.appender.controllerAppender.MaxBackupIndex=<%= @log_max_backup_index ? 
@log_max_backup_index : '4' %>
+
+log4j.appender.authorizerAppender=org.apache.log4j.RollingFileAppender
+log4j.appender.authorizerAppender.File=${kafka.logs.dir}/kafka-authorizer.log
+log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+log4j.appender.authorizerAppender.MaxFileSize=256MB
+log4j.appender.authorizerAppender.MaxBackupIndex=<%= @log_max_backup_index ? 
@log_max_backup_index : '4' %>
+
+# Turn on all our debugging info
+#log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender
+#log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender
+#log4j.logger.kafka.perf=DEBUG, kafkaAppender
+#log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, 
kafkaAppender
+#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
+log4j.logger.kafka=INFO, kafkaAppender
+
+log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender
+log4j.additivity.kafka.network.RequestChannel$=false
+
+#log4j.logger.kafka.network.Processor=TRACE, requestAppender
+#log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
+#log4j.additivity.kafka.server.KafkaApis=false
+log4j.logger.kafka.request.logger=WARN, requestAppender
+log4j.additivity.kafka.request.logger=false
+
+log4j.logger.kafka.controller=TRACE, controllerAppender
+log4j.additivity.kafka.controller=false
+
+log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender
+log4j.additivity.kafka.log.LogCleaner=false
+
+log4j.logger.state.change.logger=TRACE, stateChangeAppender
+log4j.additivity.state.change.logger=false
+
+#Change this to debug to get the actual audit log for authorizer.
+log4j.logger.kafka.authorizer.logger=WARN, authorizerAppender
+log4j.additivity.kafka.authorizer.logger=false
diff --git a/modules/confluent/templates/kafka/server.properties.erb 
b/modules/confluent/templates/kafka/server.properties.erb
new file mode 100644
index 0000000..6ea1a4c
--- /dev/null
+++ b/modules/confluent/templates/kafka/server.properties.erb
@@ -0,0 +1,191 @@
+# NOTE: This file is managed by Puppet.
+
+############################# Server Basics #############################
+
+# The id of the broker. This must be set to a unique integer for each broker.
+broker.id=<%= @id %>
+
+############################# Socket Server Settings 
#############################
+
+listeners=<%= Array(@listeners).join(',') %>
+
+# The port the socket server listens on
+port=<%= @port %>
+
+# Hostname the broker will bind to. If not set, the server will bind to all 
interfaces
+#host.name=localhost
+
+# Hostname the broker will advertise to producers and consumers. If not set, 
it uses the
+# value for "host.name" if configured.  Otherwise, it will use the value 
returned from
+# java.net.InetAddress.getCanonicalHostName().
+#advertised.host.name=<hostname routable by clients>
+
+# The port to publish to ZooKeeper for clients to use. If this is not set,
+# it will publish the same port that the broker binds to.
+#advertised.port=<port accessible by clients>
+
+# The number of threads handling network requests
+num.network.threads=<%= @num_network_threads %>
+
+# The number of threads doing disk I/O
+num.io.threads=<%= @num_io_threads %>
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer.bytes=<%= @socket_send_buffer_bytes %>
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer.bytes=<%= @socket_receive_buffer_bytes %>
+
+# The maximum size of a request that the socket server will accept (protection 
against OOM)
+socket.request.max.bytes=<%= @socket_request_max_bytes %>
+
+
+############################# Log Basics #############################
+
+# A comma seperated list of directories under which to store log files
+log.dirs=<%= Array(@log_dirs).join(',') %>
+
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
+num.partitions=<%= @num_partitions %>
+
+# The default replication factor for automatically created topics.
+# Default to the number of brokers in this cluster.
+default.replication.factor=<%= @default_replication_factor %>
+
+# Enable auto creation of topic on the server. If this is set to true
+# then attempts to produce, consume, or fetch metadata for a non-existent
+# topic will automatically create it with the default replication factor
+# and number of partitions.
+auto.create.topics.enable=<%= @auto_create_topics_enable ? 'true' : 'false' %>
+
+# If this is enabled the controller will automatically try to balance
+# leadership for partitions among the brokers by periodically returning
+# leadership to the "preferred" replica for each partition if it is available.
+auto.leader.rebalance.enable=<%= @auto_leader_rebalance_enable %>
+
+<% if @replica_lag_time_max_ms -%>
+# If a follower hasn't sent any fetch requests for this window of time,
+# the leader will remove the follower from ISR and treat it as dead.
+replica.lag.time.max.ms=<%= @replica_lag_time_max_ms %>
+<% end -%>
+
+# The number of threads per data directory to be used for log recovery at 
startup and flushing at shutdown.
+# This value is recommended to be increased for installations with data dirs 
located in RAID array.
+num.recovery.threads.per.data.dir=<%= @num_recovery_threads_per_data_dir %>
+
+<% if @replica_socket_timeout_ms -%>
+# The socket timeout for network requests to the leader for replicating data.
+replica.socket.timeout.ms=<%= @replica_socket_timeout_ms %>
+<% end -%>
+
+<% if @replica_socket_receive_buffer_bytes -%>
+# The socket receive buffer for network requests to the leader for replicating 
data.
+replica.socket.receive.buffer.bytes=<%= @replica_socket_receive_buffer_bytes %>
+<% end -%>
+
+# Number of threads used to replicate messages from leaders. Increasing this
+# value can increase the degree of I/O parallelism in the follower broker.
+# This is useful to temporarily increase if you have a broker that needs
+# to catch up on messages to get back into the ISR.
+num.replica.fetchers=<%= @num_replica_fetchers %>
+
+# The number of byes of messages to attempt to fetch for each partition in the
+# fetch requests the replicas send to the leader.
+replica.fetch.max.bytes=<%= @replica_fetch_max_bytes %>
+
+
+############################# Log Flush Policy #############################
+
+# Messages are immediately written to the filesystem but by default we only 
fsync() to sync
+# the OS cache lazily. The following configurations control the flush of data 
to disk.
+# There are a few important trade-offs here:
+#    1. Durability: Unflushed data may be lost if you are not using 
replication.
+#    2. Latency: Very large flush intervals may lead to latency spikes when 
the flush does occur as there will be a lot of data to flush.
+#    3. Throughput: The flush is generally the most expensive operation, and a 
small flush interval may lead to exceessive seeks.
+# The settings below allow one to configure the flush policy to flush data 
after a period of time or
+# every N messages (or both). This can be done globally and overridden on a 
per-topic basis.
+
+# The number of messages accumulated on a log partition before messages
+# are flushed to disk.  Default 9223372036854775807.
+<%= @log_flush_interval_messages ? 
"log.flush.interval.messages=#{@log_flush_interval_messages}" : 
"#log.flush.interval.messages=" %>
+
+# The maximum time in ms that a message in any topic is kept in memory before
+# flushed to disk. If not set, the value in log.flush.scheduler.interval.ms
+# is used.
+<%= @log_flush_interval_ms ? "log.flush.interval.ms=#{@log_flush_interval_ms}" 
: "#log.flush.interval.ms=" %>
+
+
+
+############################# Log Retention Policy 
#############################
+
+# The following configurations control the disposal of log segments. The 
policy can
+# be set to delete segments after a period of time, or after a given size has 
accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. 
Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion
+<%= @log_retention_hours ? "log.retention.hours=#{@log_retention_hours}" : 
"#log.retention.hours=" %>
+
+# A size-based retention policy for logs. Segments are pruned from the log as 
long as the remaining
+# segments don't drop below log.retention.bytes.
+<%= @log_retention_bytes ? "log.retention.bytes=#{@log_retention_bytes}" : 
"#log.retention.bytes=" %>
+
+# The maximum size of a log segment file. When this size is reached a new log 
segment will be created.
+log.segment.bytes=<%= @log_segment_bytes %>
+
+# The interval at which log segments are checked to see if they can be deleted 
according
+# to the retention policies
+log.retention.check.interval.ms=<%= @log_retention_check_interval_ms %>
+
+# The default cleanup policy for segments beyond the retention window,
+# must be either "delete" or "compact"
+log.cleanup.policy=<%= @log_cleanup_policy %>
+
+# Log retention window in minutes for offsets topic.  If an offset
+# commit for a consumer group has not been recieved in this amount of
+# time, Kafka will drop the offset commit and consumers in the group
+# will have to start a new.  This can be overridden in an offset commit
+# request.
+offsets.retention.minutes=<%= @offsets_retention_minutes %>
+
+############################# Zookeeper #############################
+
+# Zookeeper connection string (see zookeeper docs for details).
+# This is a comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
+# You can also append an optional chroot string to the urls to specify the
+# root directory for all kafka znodes.
+zookeeper.connect=<%= @zookeeper_hosts.sort.join(',') %><%= @zookeeper_chroot 
if @zookeeper_chroot %>
+
+# The maximum amount of time in ms that the client waits to establish a
+# connection to zookeeper
+zookeeper.connection.timeout.ms=<%= @zookeeper_connection_timeout_ms %>
+
+# Zookeeper session timeout. If the server fails to heartbeat to Zookeeper
+# within this period of time it is considered dead. If you set this too low
+# the server may be falsely considered dead; if you set it too high it may
+# take too long to recognize a truly dead server.
+zookeeper.session.timeout.ms=<%= @zookeeper_session_timeout_ms %>
+
+##################### Confluent Proactive Support ######################
+
+# If set to true, then the feature to collect and report support metrics
+# ("Metrics") is enabled.  If set to false, the feature is disabled.
+#
+confluent.support.metrics.enable=false
+
+# The customer ID under which support metrics will be collected and
+# reported.
+#
+# When the customer ID is set to "anonymous" (the default), then only a
+# reduced set of metrics is being collected and reported.
+#
+# Confluent customers
+# -------------------
+# If you are a Confluent customer, then you should replace the default
+# value with your actual Confluent customer ID.  Doing so will ensure
+# that additional support metrics will be collected and reported.
+#
+confluent.support.customer.id=anonymous
\ No newline at end of file

-- 
To view, visit https://gerrit.wikimedia.org/r/284349
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I36d9e6f50da554f1674294e805fe03f1071c5016
Gerrit-PatchSet: 1
Gerrit-Project: operations/puppet
Gerrit-Branch: production
Gerrit-Owner: Ottomata <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to