Ottomata has uploaded a new change for review.
https://gerrit.wikimedia.org/r/110650
Change subject: WIP - Kafkatee puppet module
......................................................................
WIP - Kafkatee puppet module
Change-Id: Ie91622168233d88c4eab6a80c3437d8622ba506e
---
A README.md
A manifests/init.pp
A manifests/input.pp
A manifests/output.pp
A templates/input.kafka.conf.erb
A templates/input.pipe.conf.erb
A templates/kafkatee.conf.erb
A templates/output.conf.erb
8 files changed, 298 insertions(+), 0 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/operations/puppet/kafkatee
refs/changes/50/110650/1
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/README.md
diff --git a/manifests/init.pp b/manifests/init.pp
new file mode 100644
index 0000000..b88841e
--- /dev/null
+++ b/manifests/init.pp
@@ -0,0 +1,34 @@
+# Class: kafkatee
+#
+# Installs and configures a kafkatee instance
+
+class kafkatee(
+ $kafka_brokers,
+ $kafka_offset_store_path = '/var/cache/kafkatee/offsets',
+ $kafka_offset_reset = 'largest',
+ $kafka_message_max_bytes = undef,
+ $pidfile = '/var/run/kafkatee/kafkatee.pid',
+ $log_statistics_file = '/var/cache/kafkatee/kafkatee.stats.json',
+ $log_statistics_interval = 60,
+ $output_encoding = 'string',
+ $output_format = '%{hostname} %{sequence} %{dt}
%{time_firstbyte} %{ip} %{handling}/%{http_status} %{bytes_sent}
%{request_method} http://%{host}%{uri}%{query} - %{mime_type}
%{referer} %{x_forwarded_for} %{user_agent} %{accept_language}
%{x_analytics}',
+ $output_queue_size = undef,
+ $config_file = '/etc/kafkatee.conf',
+ $config_directory = '/etc/kafkatee.d'
+)
+{
+ package { 'kafkatee':
+ ensure => 'installed',
+ }
+
+ file { $config_file:
+ template => template('kafkatee/kafkatee.conf.erb'),
+ require => Package['kafkatee'],
+ }
+
+ service { 'kafkatee':
+ ensure => running,
+ provider => 'upstart',
+ subscribe => File[$config_file],
+ }
+}
diff --git a/manifests/input.pp b/manifests/input.pp
new file mode 100644
index 0000000..bbcc85e
--- /dev/null
+++ b/manifests/input.pp
@@ -0,0 +1,23 @@
+# == Define kafkatee::input
+#
+# == Parameters
+#
+#
+define kafkatee::input(
+ $type = 'kafka',
+ $topic = undef,
+ $partitions = undef,
+ $offset = 'end',
+ $options = {},
+ $command = undef,
+ $ensure = 'present',
+)
+{
+ Class['kafkatee'] -> Kafkatee::Input[$title]
+
+ file { "${kafkatee::config_directory}/input.${type}.${title}.conf":
+ ensure => $ensure,
+ content => template("kafkatee/input.${type}.conf.erb"),
+ notify => Service['kafkatee'],
+ }
+}
diff --git a/manifests/output.pp b/manifests/output.pp
new file mode 100644
index 0000000..3736a72
--- /dev/null
+++ b/manifests/output.pp
@@ -0,0 +1,20 @@
+# == Define kafkatee::output
+#
+# == Parameters
+#
+#
+define kafkatee::output(
+ $destination,
+ $type = 'file',
+ $sample = 1,
+ $ensure = 'present',
+)
+{
+ Class['kafkatee'] -> Kafkatee::Output[$title]
+
+ file { "${kafkatee::config_directory}/output.${title}.conf":
+ ensure => $ensure,
+ content => template('kafkatee/output.conf.erb'),
+ notify => Service['kafkatee'],
+ }
+}
diff --git a/templates/input.kafka.conf.erb b/templates/input.kafka.conf.erb
new file mode 100644
index 0000000..bc66a19
--- /dev/null
+++ b/templates/input.kafka.conf.erb
@@ -0,0 +1,4 @@
+# Note: This file is managed by Puppet.
+
+# <%= @title %> kafka input
+input [<%= @options.map { |key,val| "#{key}=#{val}" }.join(',') %>] kafka
topic <%= @topic %> partition <%= @partitions %> from <%= @offset %>
diff --git a/templates/input.pipe.conf.erb b/templates/input.pipe.conf.erb
new file mode 100644
index 0000000..9fd41a8
--- /dev/null
+++ b/templates/input.pipe.conf.erb
@@ -0,0 +1,4 @@
+# Note: This file is managed by Puppet.
+
+# <%= @title %> piped input
+input [<%= @options.map { |key,val| "#{key}=#{val}" }.join(',') %>] pipe <%=
@command %>
diff --git a/templates/kafkatee.conf.erb b/templates/kafkatee.conf.erb
new file mode 100644
index 0000000..9fc2fbb
--- /dev/null
+++ b/templates/kafkatee.conf.erb
@@ -0,0 +1,210 @@
+#######################################################################
+# #
+# kafkatee configuration file #
+# #
+# #
+#######################################################################
+# #
+# Syntax: #
+# <property-name> = <value> #
+# input <type args..> #
+# output <type arg..> #
+# #
+# Boolean property values: #
+# >0, "true", "yes", "on", "" - interpreted as true #
+# everything else - interpreted as false #
+# #
+# #
+# The configuration file consists of: #
+# - Configuration properties (key = value) to control various #
+# aspects of kafkatee. #
+# - Inputs #
+# - Outputs #
+# #
+#######################################################################
+
+
+#######################################################################
+# #
+# Configuration properties #
+# #
+#######################################################################
+
+#######################################################################
+# #
+# Kafka configuration #
+# #
+# Kafka configuration properties are prefixed with "kafka." #
+# and topic properties are prefixed with "kafka.topic.". #
+# #
+# For the full range of Kafka handle and topic configuration #
+# properties, see: #
+# http://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md #
+# #
+# And the Apache Kafka configuration reference: #
+# http://kafka.apache.org/08/configuration.html #
+# #
+#######################################################################
+
+# Initial list of kafka brokers
+# Default: none
+kafka.metadata.broker.list = <%= Array(@brokers).sort.join(',') %>
+
+# Offset file directory.
+# Each topic + partition combination has its own offset file.
+# Default: current directory
+kafka.offset.store.path = <%= @kafka_offset_store_path %>
+
+# If the request offset was not found on broker, or there is no
+# initial offset known (no stored offset), then reset the offset according
+# to this configuration.
+# Values: smallest (oldest/beginning), largest (newest/end), error (fail)
+# Default: largest
+kafka.topic.auto.offset.reset = <%= @kafka_offset_reset %>
+
+# Maximum message size.
+# Should be synchronized on all producers, brokers and consumers.
+# Default: 4000000
+<%= @kafka_message_max_bytes ? "kafka.message.max.bytes =
#{@kafka_messages_max_bytes}" : '#kafka.message.max.bytes = 10000000' %>
+
+# Kafka debugging
+# Default: none
+#kafka.debug = msg,topic,broker
+
+
+#######################################################################
+# #
+# Misc configuration #
+# #
+#######################################################################
+
+# Pid file location
+# Default: /var/run/kafkatee.pid
+pid.file.path = <%= @pidfile %>
+
+# Daemonize (background)
+# Default: true
+daemonize = true
+
+# Logging output level
+# 1 = only emergencies .. 6 = info, 7 = debug
+# Default: 6 (info)
+#log.level = 7
+
+
+#
+# JSON Statistics
+#
+# Statistics is collected from kafkatee itself(*) as well as librdkafka
+# Each JSON object has a top level key of either 'kafkatee' or
+# 'kafka' to indicate which type of statistics the object contains.
+# Each line is a valid JSON object.
+#
+# *: kafkatee does not currently output any stats of its own, just from
rdkafka.
+#
+
+# Statistics output interval
+# Defaults to 60 seconds, use 0 to disable.
+log.statistics.interval = <%= @log_statistics_interval %>
+
+# Statistics output file
+# Defaults to /tmp/kafkatee.stats.json
+log.statistics.file = <%= @log_statistics_file %>
+
+
+# Command to run on startup, before starting IO.
+# Default: none
+#command.init = ./my-script.sh
+
+# Command to run on termination after all IOs have been stopped.
+# Default: none
+#command.term = ./my-cleanup-script.sh
+
+# Set environment variable which will be available for all sub-sequent
+# command executions (command.*, input pipe, output pipe, ..)
+#setenv.NMSGS=12
+# clear:
+#setenv.LD_LIBRARY_PATH=
+
+#######################################################################
+# #
+# Message transformation #
+# #
+# A message read from one of the inputs may be transformed before #
+# being enqueued on the output queues. #
+# #
+# Transformation requires that the input and output encoding differs, #
+# i.e., 'input [encoding=json] ..' and 'output.encoding=string' #
+# #
+# While the input encoding is configured per input, the output #
+# encoding is configured globally, all outputs will receive the same #
+# message. #
+# #
+# The currently supported transformation(s) are: #
+# JSON input -> string output: #
+# JSON data is formatted according to the output.format #
+# configuration. The %{field} syntax references the field in the #
+# original JSON object by the same name and outputs its value. #
+# #
+# If the input and output encoding matches then the message remains #
+# untouched. #
+# #
+# The output message delimiter (defaults to newline (\n)) is #
+# configurable (output.delimiter) and always appended to all output #
+# messages regardless of transformation. #
+# The input is always stripped of its delimiter (which is newline #
+# for pipe inputs). #
+# #
+#######################################################################
+
+# Output encoding: string or json
+# Default: string
+output.encoding = <%= @output_encoding %>
+
+#######################################################################
+# Output formatting #
+# #
+# The format string is made up of %{..}-tokens and literals. #
+# #
+# Tokens: #
+# #
+# %{FIELD} #
+# Retrieves the value from the JSON object's field with the #
+# same name. #
+# #
+# %{FIELD?DEFAULT} #
+# 'DEFAULT' is the default string to use if no field was matched, #
+# the default default string is "-". #
+# #
+# Literals are copied verbatim to the output string. #
+# #
+# Example JSON: {"task":19, "name":"Mike"} #
+# Example format: Got task %{task} for user %{name?unknown} #
+# Example output: Got task 19 for user Mike #
+# #
+# Note: Multi-level JSON objects are flattened: #
+# JSON: {"a": {"b": 9}, "c": "hi"} #
+# Gives: { "b": 9, "c": "hi" } #
+# #
+#######################################################################
+
+# Output format for JSON -> string transformation.
+# Default: none
+output.format = <%= @output_format %>
+
+
+# Output delimiter
+# The output message ends with this delimiter.
+# Supports \n, \r, \t, \0.
+# Default: newline (\n)
+#output.delimiter = ;END;\n
+
+
+# Maximum queue size for each output, in number of messages
+# Default: 100000
+#output.queue.size = 1000000
+<%= @output_queue_size ? "output.queue.size = #{@output_queue_size}" :
'#output.queue.size = 1000000' %>
+
+
+# Include other config files in /etc/kafkatee.d/*.conf
+include /etc/kafkatee.d/*.conf
diff --git a/templates/output.conf.erb b/templates/output.conf.erb
new file mode 100644
index 0000000..54fc29e
--- /dev/null
+++ b/templates/output.conf.erb
@@ -0,0 +1,3 @@
+# Note: This file is managed by Puppet.
+
+output <%= @type %> <%= @sample %> <%= @destination %>
--
To view, visit https://gerrit.wikimedia.org/r/110650
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ie91622168233d88c4eab6a80c3437d8622ba506e
Gerrit-PatchSet: 1
Gerrit-Project: operations/puppet/kafkatee
Gerrit-Branch: master
Gerrit-Owner: Ottomata <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits