Ottomata has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/110650

Change subject: WIP - Kafkatee puppet module
......................................................................

WIP - Kafkatee puppet module

Change-Id: Ie91622168233d88c4eab6a80c3437d8622ba506e
---
A README.md
A manifests/init.pp
A manifests/input.pp
A manifests/output.pp
A templates/input.kafka.conf.erb
A templates/input.pipe.conf.erb
A templates/kafkatee.conf.erb
A templates/output.conf.erb
8 files changed, 298 insertions(+), 0 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/operations/puppet/kafkatee 
refs/changes/50/110650/1

diff --git a/README.md b/README.md
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/README.md
diff --git a/manifests/init.pp b/manifests/init.pp
new file mode 100644
index 0000000..b88841e
--- /dev/null
+++ b/manifests/init.pp
@@ -0,0 +1,34 @@
+# Class: kafkatee
+#
+# Installs and configures a kafkatee instance
+
+class kafkatee(
+    $kafka_brokers,
+    $kafka_offset_store_path = '/var/cache/kafkatee/offsets',
+    $kafka_offset_reset      = 'largest',
+    $kafka_message_max_bytes = undef,
+    $pidfile                 = '/var/run/kafkatee/kafkatee.pid',
+    $log_statistics_file     = '/var/cache/kafkatee/kafkatee.stats.json',
+    $log_statistics_interval = 60,
+    $output_encoding         = 'string',
+    $output_format           = '%{hostname}    %{sequence}     %{dt}   
%{time_firstbyte}       %{ip}   %{handling}/%{http_status}      %{bytes_sent}   
%{request_method}       http://%{host}%{uri}%{query}    -       %{mime_type}    
%{referer}      %{x_forwarded_for}      %{user_agent}   %{accept_language}      
%{x_analytics}',
+    $output_queue_size       = undef,
+    $config_file             = '/etc/kafkatee.conf',
+    $config_directory        = '/etc/kafkatee.d'
+)
+{
+    package { 'kafkatee':
+        ensure => 'installed',
+    }
+
+    file { $config_file:
+        template => template('kafkatee/kafkatee.conf.erb'),
+        require  => Package['kafkatee'],
+    }
+
+    service { 'kafkatee':
+        ensure    => running,
+        provider  => 'upstart',
+        subscribe => File[$config_file],
+    }
+}
diff --git a/manifests/input.pp b/manifests/input.pp
new file mode 100644
index 0000000..bbcc85e
--- /dev/null
+++ b/manifests/input.pp
@@ -0,0 +1,23 @@
+# == Define kafkatee::input
+#
+# == Parameters
+# 
+#
+define kafkatee::input(
+    $type           = 'kafka',
+    $topic          = undef,
+    $partitions     = undef,
+    $offset         = 'end',
+    $options        = {},
+    $command        = undef,
+    $ensure         = 'present',
+)
+{
+    Class['kafkatee'] -> Kafkatee::Input[$title]
+
+    file { "${kafkatee::config_directory}/input.${type}.${title}.conf":
+        ensure  => $ensure,
+        content => template("kafkatee/input.${type}.conf.erb"),
+        notify  => Service['kafkatee'],
+    }
+}
diff --git a/manifests/output.pp b/manifests/output.pp
new file mode 100644
index 0000000..3736a72
--- /dev/null
+++ b/manifests/output.pp
@@ -0,0 +1,20 @@
+# == Define kafkatee::output
+#
+# == Parameters
+# 
+#
+define kafkatee::output(
+    $destination,
+    $type           = 'file',
+    $sample         = 1,
+    $ensure         = 'present',
+)
+{
+    Class['kafkatee'] -> Kafkatee::Output[$title]
+
+    file { "${kafkatee::config_directory}/output.${title}.conf":
+        ensure  => $ensure,
+        content => template('kafkatee/output.conf.erb'),
+        notify  => Service['kafkatee'],
+    }
+}
diff --git a/templates/input.kafka.conf.erb b/templates/input.kafka.conf.erb
new file mode 100644
index 0000000..bc66a19
--- /dev/null
+++ b/templates/input.kafka.conf.erb
@@ -0,0 +1,4 @@
+# Note: This file is managed by Puppet.
+
+# <%= @title %> kafka input
+input [<%= @options.map { |key,val| "#{key}=#{val}" }.join(',') %>] kafka 
topic <%= @topic %> partition <%= @partitions %> from <%= @offset %>
diff --git a/templates/input.pipe.conf.erb b/templates/input.pipe.conf.erb
new file mode 100644
index 0000000..9fd41a8
--- /dev/null
+++ b/templates/input.pipe.conf.erb
@@ -0,0 +1,4 @@
+# Note: This file is managed by Puppet.
+
+# <%= @title %> piped input
+input [<%= @options.map { |key,val| "#{key}=#{val}" }.join(',') %>] pipe <%= 
@command %>
diff --git a/templates/kafkatee.conf.erb b/templates/kafkatee.conf.erb
new file mode 100644
index 0000000..9fc2fbb
--- /dev/null
+++ b/templates/kafkatee.conf.erb
@@ -0,0 +1,210 @@
+#######################################################################
+#                                                                     #
+#                    kafkatee configuration file                      #
+#                                                                     #
+#                                                                     #
+#######################################################################
+#                                                                     #
+# Syntax:                                                             #
+#  <property-name> = <value>                                          #
+#  input <type args..>                                                #
+#  output <type arg..>                                                #
+#                                                                     #
+# Boolean property values:                                            #
+#   >0, "true", "yes", "on", "" - interpreted as true                 #
+#  everything else              - interpreted as false                #
+#                                                                     #
+#                                                                     #
+# The configuration file consists of:                                 #
+#   - Configuration properties (key = value) to control various       #
+#     aspects of kafkatee.                                            #
+#   - Inputs                                                          #
+#   - Outputs                                                         #
+#                                                                     #
+#######################################################################
+
+
+#######################################################################
+#                                                                     #
+# Configuration properties                                            #
+#                                                                     #
+#######################################################################
+
+#######################################################################
+#                                                                     #
+# Kafka configuration                                                 #
+#                                                                     #
+# Kafka configuration properties are prefixed with "kafka."           #
+# and topic properties are prefixed with "kafka.topic.".              #
+#                                                                     #
+# For the full range of Kafka handle and topic configuration          #
+# properties, see:                                                    #
+#  http://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md #
+#                                                                     #
+# And the Apache Kafka configuration reference:                       #
+#  http://kafka.apache.org/08/configuration.html                      #
+#                                                                     #
+#######################################################################
+
+# Initial list of kafka brokers
+# Default: none
+kafka.metadata.broker.list = <%= Array(@brokers).sort.join(',') %>
+
+# Offset file directory.
+# Each topic + partition combination has its own offset file.
+# Default: current directory
+kafka.offset.store.path = <%= @kafka_offset_store_path %>
+
+# If the request offset was not found on broker, or there is no
+# initial offset known (no stored offset), then reset the offset according
+# to this configuration.
+# Values: smallest (oldest/beginning), largest (newest/end), error (fail)
+# Default: largest
+kafka.topic.auto.offset.reset = <%= @kafka_offset_reset %>
+
+# Maximum message size.
+# Should be synchronized on all producers, brokers and consumers.
+# Default: 4000000
+<%= @kafka_message_max_bytes ? "kafka.message.max.bytes = 
#{@kafka_messages_max_bytes}" : '#kafka.message.max.bytes = 10000000' %>
+
+# Kafka debugging
+# Default: none
+#kafka.debug = msg,topic,broker
+
+
+#######################################################################
+#                                                                     #
+# Misc configuration                                                  #
+#                                                                     #
+#######################################################################
+
+# Pid file location
+# Default: /var/run/kafkatee.pid
+pid.file.path = <%= @pidfile %>
+
+# Daemonize (background)
+# Default: true
+daemonize = true
+
+# Logging output level
+# 1 = only emergencies .. 6 = info, 7 = debug
+# Default: 6 (info)
+#log.level = 7
+
+
+#
+# JSON Statistics
+#
+# Statistics is collected from kafkatee itself(*) as well as librdkafka
+# Each JSON object has a top level key of either 'kafkatee' or
+# 'kafka' to indicate which type of statistics the object contains.
+# Each line is a valid JSON object.
+#
+# *: kafkatee does not currently output any stats of its own, just from 
rdkafka.
+#
+
+# Statistics output interval
+# Defaults to 60 seconds, use 0 to disable.
+log.statistics.interval = <%= @log_statistics_interval %>
+
+# Statistics output file
+# Defaults to /tmp/kafkatee.stats.json
+log.statistics.file = <%= @log_statistics_file %>
+
+
+# Command to run on startup, before starting IO.
+# Default: none
+#command.init = ./my-script.sh
+
+# Command to run on termination after all IOs have been stopped.
+# Default: none
+#command.term = ./my-cleanup-script.sh
+
+# Set environment variable which will be available for all sub-sequent
+# command executions (command.*, input pipe, output pipe, ..)
+#setenv.NMSGS=12
+# clear:
+#setenv.LD_LIBRARY_PATH=
+
+#######################################################################
+#                                                                     #
+# Message transformation                                              #
+#                                                                     #
+# A message read from one of the inputs may be transformed before     #
+# being enqueued on the output queues.                                #
+#                                                                     #
+# Transformation requires that the input and output encoding differs, #
+# i.e., 'input [encoding=json] ..' and 'output.encoding=string'       #
+#                                                                     #
+# While the input encoding is configured per input, the output        #
+# encoding is configured globally, all outputs will receive the same  #
+# message.                                                            #
+#                                                                     #
+# The currently supported transformation(s) are:                      #
+#  JSON input -> string output:                                       #
+#    JSON data is formatted according to the output.format            #
+#    configuration. The %{field} syntax references the field in the   #
+#    original JSON object by the same name and outputs its value.     #
+#                                                                     #
+# If the input and output encoding matches then the message remains   #
+# untouched.                                                          #
+#                                                                     #
+# The output message delimiter (defaults to newline (\n)) is          #
+# configurable (output.delimiter) and always appended to all output   #
+# messages regardless of transformation.                              #
+# The input is always stripped of its delimiter (which is newline     #
+# for pipe inputs).                                                   #
+#                                                                     #
+#######################################################################
+
+# Output encoding: string or json
+# Default: string
+output.encoding = <%= @output_encoding %>
+
+#######################################################################
+# Output formatting                                                   #
+#                                                                     #
+# The format string is made up of %{..}-tokens and literals.          #
+#                                                                     #
+# Tokens:                                                             #
+#                                                                     #
+#  %{FIELD}                                                           #
+#    Retrieves the value from the JSON object's field with the        #
+#    same name.                                                       #
+#                                                                     #
+#  %{FIELD?DEFAULT}                                                   #
+#    'DEFAULT' is the default string to use if no field was matched,  #
+#     the default default string is "-".                              #
+#                                                                     #
+#  Literals are copied verbatim to the output string.                 #
+#                                                                     #
+#  Example JSON: {"task":19, "name":"Mike"}                           #
+#  Example format: Got task %{task} for user %{name?unknown}          #
+#  Example output: Got task 19 for user Mike                          #
+#                                                                     #
+# Note: Multi-level JSON objects are flattened:                       #
+#       JSON:  {"a": {"b": 9}, "c": "hi"}                             #
+#       Gives: { "b": 9, "c": "hi" }                                  #
+#                                                                     #
+#######################################################################
+
+# Output format for JSON -> string transformation.
+# Default: none
+output.format = <%= @output_format %>
+
+
+# Output delimiter
+# The output message ends with this delimiter.
+# Supports \n, \r, \t, \0.
+# Default: newline (\n)
+#output.delimiter = ;END;\n
+
+
+# Maximum queue size for each output, in number of messages
+# Default: 100000
+#output.queue.size = 1000000
+<%= @output_queue_size ? "output.queue.size = #{@output_queue_size}" : 
'#output.queue.size = 1000000' %>
+
+
+# Include other config files in /etc/kafkatee.d/*.conf
+include /etc/kafkatee.d/*.conf
diff --git a/templates/output.conf.erb b/templates/output.conf.erb
new file mode 100644
index 0000000..54fc29e
--- /dev/null
+++ b/templates/output.conf.erb
@@ -0,0 +1,3 @@
+# Note: This file is managed by Puppet.
+
+output <%= @type %> <%= @sample %> <%= @destination %>

-- 
To view, visit https://gerrit.wikimedia.org/r/110650
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ie91622168233d88c4eab6a80c3437d8622ba506e
Gerrit-PatchSet: 1
Gerrit-Project: operations/puppet/kafkatee
Gerrit-Branch: master
Gerrit-Owner: Ottomata <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to