Ottomata has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/324790

Change subject: Set api_version for eventlogging kafka consumers as well as 
producers
......................................................................

Set api_version for eventlogging kafka consumers as well as producers

Bug: T142430
Change-Id: Iec32a4e1d6e9bfd8478e3ea4e59bf96d16971f70
---
M modules/role/manifests/eventlogging.pp
1 file changed, 25 insertions(+), 13 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/operations/puppet 
refs/changes/90/324790/1

diff --git a/modules/role/manifests/eventlogging.pp 
b/modules/role/manifests/eventlogging.pp
index 843690b..84a95e7 100644
--- a/modules/role/manifests/eventlogging.pp
+++ b/modules/role/manifests/eventlogging.pp
@@ -150,6 +150,13 @@
         ['client-side-00', 'client-side-01']
     )
 
+
+    # Append this to query params if set.
+    $kafka_api_version_param = $kafka_api_version ? {
+        undef   => '',
+        default => "&api_version=${kafka_api_version}"
+    }
+
     # Increase number and backoff time of retries for async
     # analytics uses.  If metadata changes, we should give
     # more time to retry. NOTE: testing this in production
@@ -157,18 +164,11 @@
     # out why and stop it.  This either needs to be higher,
     # or it is a bug in kafka-python.
     # See: https://phabricator.wikimedia.org/T142430
-    $kafka_retries_args = 'retries=6&retry_backoff_ms=200'
-
-    # If $kafka_api_version is set (from hiera cluster config),
-    # add it to kafka-python producer args.
-    $kafka_producer_args = $kafka_api_version ? {
-        undef   => $kafka_retries_args,
-        default => "api_version=${kafka_api_version}&${kafka_retries_args}"
-    }
+    $kafka_producer_args = 
"retries=6&retry_backoff_ms=200${kafka_api_version_param}"
 
     eventlogging::service::processor { $client_side_processors:
         format         => '%q %{recvFrom}s %{seqId}d %t %o %{userAgent}i',
-        input          => $kafka_client_side_raw_uri,
+        input          => 
"${kafka_client_side_raw_uri}${kafka_api_version_param}",
         sid            => $kafka_consumer_group,
         outputs        => [
             "${kafka_schema_uri}&${kafka_producer_args}",
@@ -203,10 +203,16 @@
     )
     $kafka_consumer_group = 'eventlogging_consumer_mysql_00'
 
+    # Append this to query params if set.
+    $kafka_api_version_param = $kafka_api_version ? {
+        undef   => '',
+        default => "&api_version=${kafka_api_version}"
+    }
+
     # Kafka consumer group for this consumer is mysql-m4-master
     eventlogging::service::consumer { $mysql_consumers:
-        # auto commit offsets to kafka more often for mysql consumer:
-        input  => "${kafka_mixed_uri}&auto_commit_interval_ms=1000",
+        # auto commit offsets to kafka more often for mysql consumer
+        input  => 
"${kafka_mixed_uri}&auto_commit_interval_ms=1000${$kafka_api_version_param}"
         output => 
"mysql://${mysql_user}:${mysql_pass}@${mysql_db}?charset=utf8&statsd_host=${statsd_host}&replace=True",
         sid    => $kafka_consumer_group,
         # Restrict permissions on this config file since it contains a 
password.
@@ -275,9 +281,15 @@
         'eventlogging_consumer_files_00'
     )
 
+    # Append this to query params if set.
+    $kafka_api_version_param = $kafka_api_version ? {
+        undef   => '',
+        default => "&api_version=${kafka_api_version}"
+    }
+
     # Raw client side events:
     eventlogging::service::consumer { 'client-side-events.log':
-        input  => "${kafka_client_side_raw_uri}&raw=True",
+        input  => 
"${kafka_client_side_raw_uri}&raw=True${kafka_api_version_param}",
         output => "file://${out_dir}/client-side-events.log",
         sid    => $kafka_consumer_group,
     }
@@ -286,7 +298,7 @@
     # 'blacklisted' during processing.  Events are blacklisted
     # from these logs for volume reasons.
     eventlogging::service::consumer { 'all-events.log':
-        input  =>  $kafka_mixed_uri,
+        input  =>  "${kafka_mixed_uri}${kafka_api_version_param}",
         output => "file://${out_dir}/all-events.log",
         sid    => $kafka_consumer_group,
     }

-- 
To view, visit https://gerrit.wikimedia.org/r/324790
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Iec32a4e1d6e9bfd8478e3ea4e59bf96d16971f70
Gerrit-PatchSet: 1
Gerrit-Project: operations/puppet
Gerrit-Branch: production
Gerrit-Owner: Ottomata <o...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to