Ottomata has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/298000

Change subject: Update eventlogging kafka consumer args to match with 
python-confluent-kafka consumer
......................................................................

Update eventlogging kafka consumer args to match with python-confluent-kafka 
consumer

Bug: T133779
Change-Id: Ia49006ea2430313ca89a3099c5348a1e4372189a
---
M manifests/role/eventlogging.pp
1 file changed, 11 insertions(+), 9 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/operations/puppet 
refs/changes/00/298000/1

diff --git a/manifests/role/eventlogging.pp b/manifests/role/eventlogging.pp
index 4cd12bf..ca41b84 100644
--- a/manifests/role/eventlogging.pp
+++ b/manifests/role/eventlogging.pp
@@ -55,7 +55,6 @@
         ],
         default => $kafka_config['brokers']['array']
     }
-    $kafka_zookeeper_url = $kafka_config['zookeeper']['url']
 
     # By default, the EL Kafka writer writes events to
     # schema based topic names like eventlogging_SCHEMA,
@@ -81,6 +80,9 @@
 
     $kafka_server_side_raw_uri = 
"${kafka_base_uri}?topic=eventlogging-server-side"
     $kafka_client_side_raw_uri = 
"${kafka_base_uri}?topic=eventlogging-client-side"
+
+    # This gets added to most consumers, DRY it up.
+    $kafka_default_topic_config = 
'default.topic.config={"auto.offset.reset":-1}'
 
     # This check was written for eventlog1001, so only include it there.,
     if $::hostname == 'eventlog1001' {
@@ -127,7 +129,7 @@
     # This forwards the kafka eventlogging-valid-mixed topic to
     # ZMQ port 8600 for backwards compatibility.
     eventlogging::service::forwarder { 'legacy-zmq':
-        input   => 
"${kafka_mixed_uri}&zookeeper_connect=${kafka_zookeeper_url}&auto_commit_enable=False&auto_offset_reset=-1&identity=legacy_zmq",
+        input   => 
"${kafka_mixed_uri}&auto.commit.enable=False&${kafka_default_topic_config}&identity=legacy_zmq",
         outputs => ["tcp://${eventlogging_host}:8600"],
     }
 
@@ -147,7 +149,7 @@
 class role::eventlogging::processor inherits role::eventlogging {
     $kafka_consumer_args  = hiera(
         'eventlogging_processor_kafka_consumer_args',
-        
'auto_commit_enable=True&auto_commit_interval_ms=10000&auto_offset_reset=-1'
+        
"auto.commit.enable=True&auto.commit.interval.ms=10000&${kafka_default_topic_config}",
     )
     $kafka_consumer_group = hiera(
         'eventlogging_processor_kafka_consumer_group',
@@ -162,7 +164,7 @@
     )
     eventlogging::service::processor { $client_side_processors:
         format         => '%q %{recvFrom}s %{seqId}d %t %o %{userAgent}i',
-        input          => 
"${kafka_client_side_raw_uri}&zookeeper_connect=${kafka_zookeeper_url}&${kafka_consumer_args}",
+        input          => 
"${kafka_client_side_raw_uri}&${kafka_consumer_args}",
         sid            => $kafka_consumer_group,
         outputs        => [
             $kafka_schema_uri,
@@ -183,7 +185,7 @@
 
     $kafka_consumer_args  = hiera(
         'eventlogging_mysql_kafka_consumer_args',
-        
'auto_commit_enable=True&auto_commit_interval_ms=1000&auto_offset_reset=-1'
+        
"auto.commit.enable=True&auto.commit.interval.mw=1000&${kafka_default_topic_config}",
     )
 
     class { 'passwords::mysql::eventlogging': }    # T82265
@@ -204,7 +206,7 @@
 
     # Kafka consumer group for this consumer is mysql-m4-master
     eventlogging::service::consumer { $mysql_consumers:
-        input  => 
"${kafka_mixed_uri}&zookeeper_connect=${kafka_zookeeper_url}&${kafka_consumer_args}",
+        input  => "${kafka_mixed_uri}&${kafka_consumer_args}",
         output => 
"mysql://${mysql_user}:${mysql_pass}@${mysql_db}?charset=utf8&statsd_host=${statsd_host}&replace=True",
         sid    => $kafka_consumer_group,
         # Restrict permissions on this config file since it contains a 
password.
@@ -267,7 +269,7 @@
         ],
     }
 
-    $kafka_consumer_args  = 
'auto_commit_enable=True&auto_commit_interval_ms=10000&auto_offset_reset=-1'
+    $kafka_consumer_args  = 
"auto.commit.enable=True&auto.commit.interval.ms=10000&${kafka_default_topic_config}"
     $kafka_consumer_group = hiera(
         'eventlogging_files_kafka_consumer_group',
         'eventlogging-files-00'
@@ -275,11 +277,11 @@
 
     eventlogging::service::consumer {
         'client-side-events.log':
-            input  => 
"${kafka_client_side_raw_uri}&zookeeper_connect=${kafka_zookeeper_url}&${kafka_consumer_args}&raw=True",
+            input  => 
"${kafka_client_side_raw_uri}&${kafka_consumer_args}&raw=True",
             output => "file://${out_dir}/client-side-events.log",
             sid    => $kafka_consumer_group;
         'all-events.log':
-            input  =>  
"${kafka_mixed_uri}&zookeeper_connect=${kafka_zookeeper_url}&${kafka_consumer_args}",
+            input  =>  "${kafka_mixed_uri}&${kafka_consumer_args}",
             output => "file://${out_dir}/all-events.log",
             sid    => $kafka_consumer_group;
     }

-- 
To view, visit https://gerrit.wikimedia.org/r/298000
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ia49006ea2430313ca89a3099c5348a1e4372189a
Gerrit-PatchSet: 1
Gerrit-Project: operations/puppet
Gerrit-Branch: production
Gerrit-Owner: Ottomata <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to