Ottomata has submitted this change and it was merged. ( https://gerrit.wikimedia.org/r/372179 )
Change subject: Increase max kafka message size for changeprop and kafka main ...................................................................... Increase max kafka message size for changeprop and kafka main Some of the jobs in the JobQueue exceed the default 1M size for kafka brokers, so we need to increase that for the main cluster. However, the consumer's settings should be in sync with the broker setting, so we need to pass that value to CP, so we need to pull it from hiera in both places. The hiera usage is not optimal but several FIXME are placed to remind a refactor when moving to profiles. Change-Id: Ice77027db74e67c519ce305e99ea22c3ff2c7b07 --- M hieradata/labs/deployment-prep/common.yaml M hieradata/role/common/kafka/main/broker.yaml M hieradata/role/common/scb.yaml M modules/changeprop/manifests/init.pp M modules/confluent/manifests/kafka/broker.pp M modules/confluent/templates/kafka/server.properties.erb M modules/role/manifests/changeprop.pp M modules/role/manifests/eventbus/eventbus.pp M modules/role/manifests/kafka/main/broker.pp 9 files changed, 58 insertions(+), 24 deletions(-) Approvals: Ottomata: Verified; Looks good to me, approved diff --git a/hieradata/labs/deployment-prep/common.yaml b/hieradata/labs/deployment-prep/common.yaml index 5771a6e..e352b96 100644 --- a/hieradata/labs/deployment-prep/common.yaml +++ b/hieradata/labs/deployment-prep/common.yaml @@ -20,6 +20,8 @@ changeprop::ores_uris: ['http://deployment-sca03.deployment-prep.eqiad.wmflabs:8081'] # Need to redefine RESTBase URI as Change-Prop redefines it in order to provide a different value for async updates changeprop::restbase_uri: http://deployment-restbase02.deployment-prep.eqiad.wmflabs:7231 +# Used to sync the setting between main kafka cluster and change-prop +kafka_message_max_bytes: 4194304 graphoid::allowed_domains: http: - wmflabs.org diff --git a/hieradata/role/common/kafka/main/broker.yaml b/hieradata/role/common/kafka/main/broker.yaml index 6a1b509..dca07b1 100644 --- a/hieradata/role/common/kafka/main/broker.yaml +++ b/hieradata/role/common/kafka/main/broker.yaml @@ -1,3 +1,5 @@ debdeploy::grains: debdeploy-kafka-main: value: standard +# Used to sync the setting between main kafka cluster and change-prop +kafka_message_max_bytes: 4194304 diff --git a/hieradata/role/common/scb.yaml b/hieradata/role/common/scb.yaml index 4dccf01..6d6ec0e 100644 --- a/hieradata/role/common/scb.yaml +++ b/hieradata/role/common/scb.yaml @@ -89,6 +89,8 @@ broker.version.fallback: '0.9.0.1' changeprop::restbase_uri: http://restbase-async.discovery.wmnet:7231 +# Used to sync the setting between main kafka cluster and change-prop +kafka_message_max_bytes: 4194304 profile::nutcracker::memcached_pools: {} profile::nutcracker::monitor_port: 0 # we have nothing exposed via tcp diff --git a/modules/changeprop/manifests/init.pp b/modules/changeprop/manifests/init.pp index ef3b397..53e31e9 100644 --- a/modules/changeprop/manifests/init.pp +++ b/modules/changeprop/manifests/init.pp @@ -32,17 +32,22 @@ # The password to use when authenticating with Redis/Nutcracker. Default: # 'abc1234' # +# [*kafka_msg_max_bytes*] +# The maximum number of bytes allowed in a Kafka message. Default: +# '1048576' +# class changeprop( $broker_list, - $purge_host = '239.128.0.112', - $purge_port = 4827, - $restbase_uri = 'http://restbase.svc.eqiad.wmnet:7231', - $ores_uris = [ + $purge_host = '239.128.0.112', + $purge_port = 4827, + $restbase_uri = 'http://restbase.svc.eqiad.wmnet:7231', + $ores_uris = [ 'http://ores.svc.eqiad.wmnet:8081', 'http://ores.svc.codfw.wmnet:8081', ], - $redis_path = "/var/run/nutcracker/redis_${::site}.sock", - $redis_pass = 'abc1234', + $redis_path = "/var/run/nutcracker/redis_${::site}.sock", + $redis_pass = 'abc1234', + $kafka_msg_max_bytes = 1048576, ) { include ::service::configuration @@ -57,15 +62,16 @@ deployment => 'scap3', deployment_config => true, deployment_vars => { - broker_list => $broker_list, - mwapi_uri => $::service::configuration::mwapi_uri, - restbase_uri => $restbase_uri, - ores_uris => $ores_uris, - purge_host => $purge_host, - purge_port => $purge_port, - site => $::site, - redis_path => $redis_path, - redis_pass => $redis_pass, + broker_list => $broker_list, + mwapi_uri => $::service::configuration::mwapi_uri, + restbase_uri => $restbase_uri, + ores_uris => $ores_uris, + purge_host => $purge_host, + purge_port => $purge_port, + site => $::site, + redis_path => $redis_path, + redis_pass => $redis_pass, + kafka_max_bytes => $kafka_msg_max_bytes, }, auto_refresh => false, init_restart => false, diff --git a/modules/confluent/manifests/kafka/broker.pp b/modules/confluent/manifests/kafka/broker.pp index 99a7203..0f025a3 100644 --- a/modules/confluent/manifests/kafka/broker.pp +++ b/modules/confluent/manifests/kafka/broker.pp @@ -199,6 +199,10 @@ # [*log4j_properties_template*] # Default: 'confluent/kafka/log4j.properties.erb' # +# [*message_max_bytes*] +# The maximum message size allowed. +# Default: 1048576 +# class confluent::kafka::broker( $enabled = true, $brokers = { @@ -269,6 +273,8 @@ $server_properties_template = 'confluent/kafka/server.properties.erb', $default_template = 'confluent/kafka/kafka.default.erb', $log4j_properties_template = 'confluent/kafka/log4j.properties.erb', + + $message_max_bytes = 1048576, ) { # confluent::kafka::client installs the kafka package # and a handy wrapper script. diff --git a/modules/confluent/templates/kafka/server.properties.erb b/modules/confluent/templates/kafka/server.properties.erb index c2a8fe6..b289178 100644 --- a/modules/confluent/templates/kafka/server.properties.erb +++ b/modules/confluent/templates/kafka/server.properties.erb @@ -31,6 +31,15 @@ log.message.format.version=<%= @log_message_format_version %> <% end -%> +<% if @message_max_bytes -%> +message.max.bytes=<%= @message_max_bytes %> +replica.fetch.max.bytes=<%= @message_max_bytes %> +<% else -%> +<% if @replica_fetch_max_bytes -%> +replica.fetch.max.bytes=<%= @replica_fetch_max_bytes %> +<% end -%> +<% end -%> + ######################### Socket Server Settings ######################## <% if @security_inter_broker_protocol -%> security.inter.broker.protocol=<%= @security_inter_broker_protocol %> @@ -151,12 +160,6 @@ # to catch up on messages to get back into the ISR. num.replica.fetchers=<%= @num_replica_fetchers %> -<% if @replica_fetch_max_bytes -%> -# The number of byes of messages to attempt to fetch for each partition in the -# fetch requests the replicas send to the leader. -replica.fetch.max.bytes=<%= @replica_fetch_max_bytes %> - -<% end -%> <% if @log_flush_interval_messages or @log_flush_interval_ms -%> ############################# Log Flush Policy ############################# diff --git a/modules/role/manifests/changeprop.pp b/modules/role/manifests/changeprop.pp index 789eb5e..d25615f 100644 --- a/modules/role/manifests/changeprop.pp +++ b/modules/role/manifests/changeprop.pp @@ -11,8 +11,11 @@ } class { '::changeprop': - broker_list => $kafka_config['brokers']['string'], - redis_pass => $::passwords::redis::main_password + broker_list => $kafka_config['brokers']['string'], + redis_pass => $::passwords::redis::main_password, + # FIXME: this needs to be refactored when the role + # is moved to profiles. + kafka_msg_max_bytes => hiera('kafka_message_max_bytes', 1048576), } } diff --git a/modules/role/manifests/eventbus/eventbus.pp b/modules/role/manifests/eventbus/eventbus.pp index dfb70d5..83ef472 100644 --- a/modules/role/manifests/eventbus/eventbus.pp +++ b/modules/role/manifests/eventbus/eventbus.pp @@ -50,6 +50,11 @@ undef => '', default => "&api_version=${kafka_api_version}" } + $kafka_message_max_bytes = hiera('kafka_message_max_bytes', 1048576) + # The requests not only contain the message but also a small metadata overhead. + # So if we want to produce a kafka_message_max_bytes payload the max request size should be a bit higher. + # The 48564 value isn't arbitrary - it's the difference between default message.max.size and default max.request.size + $producer_request_max_size = $kafka_message_max_bytes + 48564 $outputs = [ # When events are produced to kafka, the @@ -65,7 +70,7 @@ # In normal cases, this will be much much faster than 10 seconds, but during # broker restarts, it can take a few seconds for meta data and leadership # info to propagate to the kafka client. - "${kafka_base_uri}?async=False&sync_timeout=10.0&topic=${::site}.{meta[topic]}${kafka_api_version_param}" + "${kafka_base_uri}?async=False&sync_timeout=10.0&topic=${::site}.{meta[topic]}${kafka_api_version_param}&max_request_size=${producer_request_max_size}" ] $access_log_level = $::realm ? { diff --git a/modules/role/manifests/kafka/main/broker.pp b/modules/role/manifests/kafka/main/broker.pp index df7fad3..c28edd3 100644 --- a/modules/role/manifests/kafka/main/broker.pp +++ b/modules/role/manifests/kafka/main/broker.pp @@ -68,6 +68,11 @@ log_cleanup_policy => 'delete', zookeeper_connection_timeout_ms => 6000, zookeeper_session_timeout_ms => 6000, + + # JobQueues-Eventbus needs a bigger msg size + # FIXME: this needs to be refactored when the role + # is moved to profiles. + message_max_bytes => hiera('kafka_message_max_bytes', 1048576), } # Include Kafka Broker Jmxtrans class to -- To view, visit https://gerrit.wikimedia.org/r/372179 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: merged Gerrit-Change-Id: Ice77027db74e67c519ce305e99ea22c3ff2c7b07 Gerrit-PatchSet: 32 Gerrit-Project: operations/puppet Gerrit-Branch: production Gerrit-Owner: Ppchelko <[email protected]> Gerrit-Reviewer: Elukey <[email protected]> Gerrit-Reviewer: Mobrovac <[email protected]> Gerrit-Reviewer: Ottomata <[email protected]> Gerrit-Reviewer: Ppchelko <[email protected]> Gerrit-Reviewer: jenkins-bot <> _______________________________________________ MediaWiki-commits mailing list [email protected] https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits
