Ottomata has submitted this change and it was merged. ( 
https://gerrit.wikimedia.org/r/372179 )

Change subject: Increase max kafka message size for changeprop and kafka main
......................................................................


Increase max kafka message size for changeprop and kafka main

Some of the jobs in the JobQueue exceed the default 1M
size for kafka brokers, so we need to increase that for
the main cluster.

However, the consumer's settings should be in sync with
the broker setting, so we need to pass that value to CP,
so we need to pull it from hiera in both places.

The hiera usage is not optimal but several FIXME
are placed to remind a refactor when moving to profiles.

Change-Id: Ice77027db74e67c519ce305e99ea22c3ff2c7b07
---
M hieradata/labs/deployment-prep/common.yaml
M hieradata/role/common/kafka/main/broker.yaml
M hieradata/role/common/scb.yaml
M modules/changeprop/manifests/init.pp
M modules/confluent/manifests/kafka/broker.pp
M modules/confluent/templates/kafka/server.properties.erb
M modules/role/manifests/changeprop.pp
M modules/role/manifests/eventbus/eventbus.pp
M modules/role/manifests/kafka/main/broker.pp
9 files changed, 58 insertions(+), 24 deletions(-)

Approvals:
  Ottomata: Verified; Looks good to me, approved



diff --git a/hieradata/labs/deployment-prep/common.yaml 
b/hieradata/labs/deployment-prep/common.yaml
index 5771a6e..e352b96 100644
--- a/hieradata/labs/deployment-prep/common.yaml
+++ b/hieradata/labs/deployment-prep/common.yaml
@@ -20,6 +20,8 @@
 changeprop::ores_uris: 
['http://deployment-sca03.deployment-prep.eqiad.wmflabs:8081']
 # Need to redefine RESTBase URI as Change-Prop redefines it in order to 
provide a different value for async updates
 changeprop::restbase_uri: 
http://deployment-restbase02.deployment-prep.eqiad.wmflabs:7231
+# Used to sync the setting between main kafka cluster and change-prop
+kafka_message_max_bytes: 4194304
 graphoid::allowed_domains:
   http:
     - wmflabs.org
diff --git a/hieradata/role/common/kafka/main/broker.yaml 
b/hieradata/role/common/kafka/main/broker.yaml
index 6a1b509..dca07b1 100644
--- a/hieradata/role/common/kafka/main/broker.yaml
+++ b/hieradata/role/common/kafka/main/broker.yaml
@@ -1,3 +1,5 @@
 debdeploy::grains:
   debdeploy-kafka-main:
     value: standard
+# Used to sync the setting between main kafka cluster and change-prop
+kafka_message_max_bytes: 4194304
diff --git a/hieradata/role/common/scb.yaml b/hieradata/role/common/scb.yaml
index 4dccf01..6d6ec0e 100644
--- a/hieradata/role/common/scb.yaml
+++ b/hieradata/role/common/scb.yaml
@@ -89,6 +89,8 @@
   broker.version.fallback: '0.9.0.1'
 
 changeprop::restbase_uri: http://restbase-async.discovery.wmnet:7231
+# Used to sync the setting between main kafka cluster and change-prop
+kafka_message_max_bytes: 4194304
 profile::nutcracker::memcached_pools: {}
 profile::nutcracker::monitor_port: 0 # we have nothing exposed via tcp
 
diff --git a/modules/changeprop/manifests/init.pp 
b/modules/changeprop/manifests/init.pp
index ef3b397..53e31e9 100644
--- a/modules/changeprop/manifests/init.pp
+++ b/modules/changeprop/manifests/init.pp
@@ -32,17 +32,22 @@
 #   The password to use when authenticating with Redis/Nutcracker. Default:
 #   'abc1234'
 #
+# [*kafka_msg_max_bytes*]
+#   The maximum number of bytes allowed in a Kafka message. Default:
+#   '1048576'
+#
 class changeprop(
     $broker_list,
-    $purge_host   = '239.128.0.112',
-    $purge_port   = 4827,
-    $restbase_uri = 'http://restbase.svc.eqiad.wmnet:7231',
-    $ores_uris    = [
+    $purge_host          = '239.128.0.112',
+    $purge_port          = 4827,
+    $restbase_uri        = 'http://restbase.svc.eqiad.wmnet:7231',
+    $ores_uris           = [
         'http://ores.svc.eqiad.wmnet:8081',
         'http://ores.svc.codfw.wmnet:8081',
     ],
-    $redis_path   = "/var/run/nutcracker/redis_${::site}.sock",
-    $redis_pass   = 'abc1234',
+    $redis_path          = "/var/run/nutcracker/redis_${::site}.sock",
+    $redis_pass          = 'abc1234',
+    $kafka_msg_max_bytes = 1048576,
 ) {
 
     include ::service::configuration
@@ -57,15 +62,16 @@
         deployment        => 'scap3',
         deployment_config => true,
         deployment_vars   => {
-            broker_list  => $broker_list,
-            mwapi_uri    => $::service::configuration::mwapi_uri,
-            restbase_uri => $restbase_uri,
-            ores_uris    => $ores_uris,
-            purge_host   => $purge_host,
-            purge_port   => $purge_port,
-            site         => $::site,
-            redis_path   => $redis_path,
-            redis_pass   => $redis_pass,
+            broker_list     => $broker_list,
+            mwapi_uri       => $::service::configuration::mwapi_uri,
+            restbase_uri    => $restbase_uri,
+            ores_uris       => $ores_uris,
+            purge_host      => $purge_host,
+            purge_port      => $purge_port,
+            site            => $::site,
+            redis_path      => $redis_path,
+            redis_pass      => $redis_pass,
+            kafka_max_bytes => $kafka_msg_max_bytes,
         },
         auto_refresh      => false,
         init_restart      => false,
diff --git a/modules/confluent/manifests/kafka/broker.pp 
b/modules/confluent/manifests/kafka/broker.pp
index 99a7203..0f025a3 100644
--- a/modules/confluent/manifests/kafka/broker.pp
+++ b/modules/confluent/manifests/kafka/broker.pp
@@ -199,6 +199,10 @@
 # [*log4j_properties_template*]
 #   Default: 'confluent/kafka/log4j.properties.erb'
 #
+# [*message_max_bytes*]
+#   The maximum message size allowed.
+#   Default: 1048576
+#
 class confluent::kafka::broker(
     $enabled                             = true,
     $brokers                             = {
@@ -269,6 +273,8 @@
     $server_properties_template          = 
'confluent/kafka/server.properties.erb',
     $default_template                    = 'confluent/kafka/kafka.default.erb',
     $log4j_properties_template           = 
'confluent/kafka/log4j.properties.erb',
+
+    $message_max_bytes                   = 1048576,
 ) {
     # confluent::kafka::client installs the kafka package
     # and a handy wrapper script.
diff --git a/modules/confluent/templates/kafka/server.properties.erb 
b/modules/confluent/templates/kafka/server.properties.erb
index c2a8fe6..b289178 100644
--- a/modules/confluent/templates/kafka/server.properties.erb
+++ b/modules/confluent/templates/kafka/server.properties.erb
@@ -31,6 +31,15 @@
 log.message.format.version=<%= @log_message_format_version %>
 
 <% end -%>
+<% if @message_max_bytes -%>
+message.max.bytes=<%= @message_max_bytes %>
+replica.fetch.max.bytes=<%= @message_max_bytes %>
+<% else -%>
+<% if @replica_fetch_max_bytes -%>
+replica.fetch.max.bytes=<%= @replica_fetch_max_bytes %>
+<% end -%>
+<% end -%>
+
 ######################### Socket Server Settings ########################
 <% if @security_inter_broker_protocol -%>
 security.inter.broker.protocol=<%= @security_inter_broker_protocol %>
@@ -151,12 +160,6 @@
 # to catch up on messages to get back into the ISR.
 num.replica.fetchers=<%= @num_replica_fetchers %>
 
-<% if @replica_fetch_max_bytes -%>
-# The number of byes of messages to attempt to fetch for each partition in the
-# fetch requests the replicas send to the leader.
-replica.fetch.max.bytes=<%= @replica_fetch_max_bytes %>
-
-<% end -%>
 <% if @log_flush_interval_messages or @log_flush_interval_ms -%>
 ############################# Log Flush Policy #############################
 
diff --git a/modules/role/manifests/changeprop.pp 
b/modules/role/manifests/changeprop.pp
index 789eb5e..d25615f 100644
--- a/modules/role/manifests/changeprop.pp
+++ b/modules/role/manifests/changeprop.pp
@@ -11,8 +11,11 @@
     }
 
     class { '::changeprop':
-        broker_list => $kafka_config['brokers']['string'],
-        redis_pass  => $::passwords::redis::main_password
+        broker_list         => $kafka_config['brokers']['string'],
+        redis_pass          => $::passwords::redis::main_password,
+        # FIXME: this needs to be refactored when the role
+        # is moved to profiles.
+        kafka_msg_max_bytes => hiera('kafka_message_max_bytes', 1048576),
     }
 
 }
diff --git a/modules/role/manifests/eventbus/eventbus.pp 
b/modules/role/manifests/eventbus/eventbus.pp
index dfb70d5..83ef472 100644
--- a/modules/role/manifests/eventbus/eventbus.pp
+++ b/modules/role/manifests/eventbus/eventbus.pp
@@ -50,6 +50,11 @@
         undef   => '',
         default => "&api_version=${kafka_api_version}"
     }
+    $kafka_message_max_bytes = hiera('kafka_message_max_bytes', 1048576)
+    # The requests not only contain the message but also a small metadata 
overhead.
+    # So if we want to produce a kafka_message_max_bytes payload the max 
request size should be a bit higher.
+    # The 48564 value isn't arbitrary - it's the difference between default 
message.max.size and default max.request.size
+    $producer_request_max_size = $kafka_message_max_bytes + 48564
 
     $outputs = [
         # When events are produced to kafka, the
@@ -65,7 +70,7 @@
         # In normal cases, this will be much much faster than 10 seconds, but 
during
         # broker restarts, it can take a few seconds for meta data and 
leadership
         # info to propagate to the kafka client.
-        
"${kafka_base_uri}?async=False&sync_timeout=10.0&topic=${::site}.{meta[topic]}${kafka_api_version_param}"
+        
"${kafka_base_uri}?async=False&sync_timeout=10.0&topic=${::site}.{meta[topic]}${kafka_api_version_param}&max_request_size=${producer_request_max_size}"
     ]
 
     $access_log_level = $::realm ? {
diff --git a/modules/role/manifests/kafka/main/broker.pp 
b/modules/role/manifests/kafka/main/broker.pp
index df7fad3..c28edd3 100644
--- a/modules/role/manifests/kafka/main/broker.pp
+++ b/modules/role/manifests/kafka/main/broker.pp
@@ -68,6 +68,11 @@
         log_cleanup_policy              => 'delete',
         zookeeper_connection_timeout_ms => 6000,
         zookeeper_session_timeout_ms    => 6000,
+
+        # JobQueues-Eventbus needs a bigger msg size
+        # FIXME: this needs to be refactored when the role
+        # is moved to profiles.
+        message_max_bytes               => hiera('kafka_message_max_bytes', 
1048576),
     }
 
     # Include Kafka Broker Jmxtrans class to

-- 
To view, visit https://gerrit.wikimedia.org/r/372179
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: Ice77027db74e67c519ce305e99ea22c3ff2c7b07
Gerrit-PatchSet: 32
Gerrit-Project: operations/puppet
Gerrit-Branch: production
Gerrit-Owner: Ppchelko <[email protected]>
Gerrit-Reviewer: Elukey <[email protected]>
Gerrit-Reviewer: Mobrovac <[email protected]>
Gerrit-Reviewer: Ottomata <[email protected]>
Gerrit-Reviewer: Ppchelko <[email protected]>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to