Ottomata has submitted this change and it was merged.

Change subject: Manually specify Kafka api_version in kafka_clusters config
......................................................................


Manually specify Kafka api_version in kafka_clusters config

This allows Kafka clients to manually specify the API version
they should use when talking with the Kafka cluster.  In Kafka 0.10,
a API version request has been added to the API, so clients can
ask Kafka what version they should use.  In any older version,
this doesn't exist, so clients do some tricky API version discovery
by issuing requests and falling back if Kafka fails them.

This may be a cause of failed produce requests during broker restarts,
and related to https://issues.apache.org/jira/browse/KAFKA-3547

https://gerrit.wikimedia.org/r/#/c/324751/ Must be deployed
before this patch will work properly with eventlogging.

Bug: T142430
Change-Id: Ibda9195ad60c6fde736bcbe02b3d4878a215beb5
---
M hieradata/common.yaml
M modules/role/lib/puppet/parser/functions/kafka_config.rb
M modules/role/manifests/eventlogging.pp
3 files changed, 30 insertions(+), 2 deletions(-)

Approvals:
  Elukey: Looks good to me, but someone else must approve
  Ottomata: Verified; Looks good to me, approved



diff --git a/hieradata/common.yaml b/hieradata/common.yaml
index 5718704..d755a7e 100644
--- a/hieradata/common.yaml
+++ b/hieradata/common.yaml
@@ -401,6 +401,12 @@
   # This is the analytics Kafka cluster, named just 'eqiad' for
   # historical reasons.
   eqiad:
+    # Optional api_version indicates the Kafka API version the
+    # brokers are running.  Clients can use this to override
+    # version discovery for versions of Kafka where the version
+    # request API doesn't exist (< 0.10).  Once all brokers
+    # are on 0.10, this shouldn't be needed.
+    api_version: 0.9
     zookeeper_cluster_name: main-eqiad
     brokers:
       kafka1012.eqiad.wmnet:
@@ -417,6 +423,7 @@
         id: 22  # Row C
 
   main-eqiad:
+    api_version: 0.9
     zookeeper_cluster_name: main-eqiad
     brokers:
       kafka1001.eqiad.wmnet:
@@ -427,6 +434,7 @@
         id: 1003
 
   main-codfw:
+    api_version: 0.9
     zookeeper_cluster_name: main-codfw
     brokers:
       kafka2001.codfw.wmnet:
diff --git a/modules/role/lib/puppet/parser/functions/kafka_config.rb 
b/modules/role/lib/puppet/parser/functions/kafka_config.rb
index 4ae6387..d246fc0 100644
--- a/modules/role/lib/puppet/parser/functions/kafka_config.rb
+++ b/modules/role/lib/puppet/parser/functions/kafka_config.rb
@@ -66,7 +66,7 @@
     zk_hosts = zk_clusters[zk_cluster_name]['hosts'].keys.sort
 
     jmx_port = '9999'
-    {
+    config = {
       'name'      => cluster_name,
       'brokers'   => {
         'hash'     => brokers,
@@ -85,5 +85,13 @@
         'url'    => "#{zk_hosts.join(',')}/kafka/#{cluster_name}"
       }
     }
+
+    if cluster.key?('api_version')
+      config['api_version'] = cluster['api_version']
+    else
+      config['api_version'] = nil
+    end
+
+    config
   end
 end
diff --git a/modules/role/manifests/eventlogging.pp 
b/modules/role/manifests/eventlogging.pp
index f3086ce..843690b 100644
--- a/modules/role/manifests/eventlogging.pp
+++ b/modules/role/manifests/eventlogging.pp
@@ -48,6 +48,10 @@
 
     $kafka_brokers_array = $kafka_config['brokers']['array']
 
+    # Where possible, if this is set, it will be included in client 
configuration
+    # to avoid having to do API version for Kafka < 0.10 (where there is not a 
version API).
+    $kafka_api_version = $kafka_config['api_version']
+
     # By default, the EL Kafka writer writes events to
     # schema based topic names like eventlogging_SCHEMA,
     # with each message keyed by SCHEMA_REVISION.
@@ -153,7 +157,15 @@
     # out why and stop it.  This either needs to be higher,
     # or it is a bug in kafka-python.
     # See: https://phabricator.wikimedia.org/T142430
-    $kafka_producer_args = 'retries=6&retry_backoff_ms=200'
+    $kafka_retries_args = 'retries=6&retry_backoff_ms=200'
+
+    # If $kafka_api_version is set (from hiera cluster config),
+    # add it to kafka-python producer args.
+    $kafka_producer_args = $kafka_api_version ? {
+        undef   => $kafka_retries_args,
+        default => "api_version=${kafka_api_version}&${kafka_retries_args}"
+    }
+
     eventlogging::service::processor { $client_side_processors:
         format         => '%q %{recvFrom}s %{seqId}d %t %o %{userAgent}i',
         input          => $kafka_client_side_raw_uri,

-- 
To view, visit https://gerrit.wikimedia.org/r/324745
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: Ibda9195ad60c6fde736bcbe02b3d4878a215beb5
Gerrit-PatchSet: 4
Gerrit-Project: operations/puppet
Gerrit-Branch: production
Gerrit-Owner: Ottomata <o...@wikimedia.org>
Gerrit-Reviewer: Elukey <ltosc...@wikimedia.org>
Gerrit-Reviewer: Ottomata <o...@wikimedia.org>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to