Ottomata has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/324745

Change subject: Manually specify Kafka api_version in kafka_clusters config
......................................................................

Manually specify Kafka api_version in kafka_clusters config

This allows Kafka clients to manually specify the API version
they should use when talking with the Kafka cluster.  In Kafka 0.10,
a API version request has been added to the API, so clients can
ask Kafka what version they should use.  In any older version,
this doesn't exist, so clients do some tricky API version discovery
by issuing requests and falling back if Kafka fails them.

This may be a cause of failed produce requests during broker restarts,
and related to https://issues.apache.org/jira/browse/KAFKA-3547

Bug: T142430
Change-Id: Ibda9195ad60c6fde736bcbe02b3d4878a215beb5
---
M hieradata/common.yaml
M modules/role/lib/puppet/parser/functions/kafka_config.rb
M modules/role/manifests/eventlogging.pp
M modules/varnishkafka
4 files changed, 31 insertions(+), 3 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/operations/puppet 
refs/changes/45/324745/1

diff --git a/hieradata/common.yaml b/hieradata/common.yaml
index 5718704..d755a7e 100644
--- a/hieradata/common.yaml
+++ b/hieradata/common.yaml
@@ -401,6 +401,12 @@
   # This is the analytics Kafka cluster, named just 'eqiad' for
   # historical reasons.
   eqiad:
+    # Optional api_version indicates the Kafka API version the
+    # brokers are running.  Clients can use this to override
+    # version discovery for versions of Kafka where the version
+    # request API doesn't exist (< 0.10).  Once all brokers
+    # are on 0.10, this shouldn't be needed.
+    api_version: 0.9
     zookeeper_cluster_name: main-eqiad
     brokers:
       kafka1012.eqiad.wmnet:
@@ -417,6 +423,7 @@
         id: 22  # Row C
 
   main-eqiad:
+    api_version: 0.9
     zookeeper_cluster_name: main-eqiad
     brokers:
       kafka1001.eqiad.wmnet:
@@ -427,6 +434,7 @@
         id: 1003
 
   main-codfw:
+    api_version: 0.9
     zookeeper_cluster_name: main-codfw
     brokers:
       kafka2001.codfw.wmnet:
diff --git a/modules/role/lib/puppet/parser/functions/kafka_config.rb 
b/modules/role/lib/puppet/parser/functions/kafka_config.rb
index 4ae6387..d246fc0 100644
--- a/modules/role/lib/puppet/parser/functions/kafka_config.rb
+++ b/modules/role/lib/puppet/parser/functions/kafka_config.rb
@@ -66,7 +66,7 @@
     zk_hosts = zk_clusters[zk_cluster_name]['hosts'].keys.sort
 
     jmx_port = '9999'
-    {
+    config = {
       'name'      => cluster_name,
       'brokers'   => {
         'hash'     => brokers,
@@ -85,5 +85,13 @@
         'url'    => "#{zk_hosts.join(',')}/kafka/#{cluster_name}"
       }
     }
+
+    if cluster.key?('api_version')
+      config['api_version'] = cluster['api_version']
+    else
+      config['api_version'] = nil
+    end
+
+    config
   end
 end
diff --git a/modules/role/manifests/eventlogging.pp 
b/modules/role/manifests/eventlogging.pp
index f3086ce..843690b 100644
--- a/modules/role/manifests/eventlogging.pp
+++ b/modules/role/manifests/eventlogging.pp
@@ -48,6 +48,10 @@
 
     $kafka_brokers_array = $kafka_config['brokers']['array']
 
+    # Where possible, if this is set, it will be included in client 
configuration
+    # to avoid having to do API version for Kafka < 0.10 (where there is not a 
version API).
+    $kafka_api_version = $kafka_config['api_version']
+
     # By default, the EL Kafka writer writes events to
     # schema based topic names like eventlogging_SCHEMA,
     # with each message keyed by SCHEMA_REVISION.
@@ -153,7 +157,15 @@
     # out why and stop it.  This either needs to be higher,
     # or it is a bug in kafka-python.
     # See: https://phabricator.wikimedia.org/T142430
-    $kafka_producer_args = 'retries=6&retry_backoff_ms=200'
+    $kafka_retries_args = 'retries=6&retry_backoff_ms=200'
+
+    # If $kafka_api_version is set (from hiera cluster config),
+    # add it to kafka-python producer args.
+    $kafka_producer_args = $kafka_api_version ? {
+        undef   => $kafka_retries_args,
+        default => "api_version=${kafka_api_version}&${kafka_retries_args}"
+    }
+
     eventlogging::service::processor { $client_side_processors:
         format         => '%q %{recvFrom}s %{seqId}d %t %o %{userAgent}i',
         input          => $kafka_client_side_raw_uri,
diff --git a/modules/varnishkafka b/modules/varnishkafka
index 63949e1..238a72d 160000
--- a/modules/varnishkafka
+++ b/modules/varnishkafka
@@ -1 +1 @@
-Subproject commit 63949e1cb21e0a66fb974a135102b7795d1e2051
+Subproject commit 238a72d304e8df8f1cfb787a6ab077269b86390c

-- 
To view, visit https://gerrit.wikimedia.org/r/324745
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ibda9195ad60c6fde736bcbe02b3d4878a215beb5
Gerrit-PatchSet: 1
Gerrit-Project: operations/puppet
Gerrit-Branch: production
Gerrit-Owner: Ottomata <o...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to