Sharing Producer Config on Spark Startup 25/04/16 15:11:59 INFO ProducerConfig: ProducerConfig values: acks = -1 batch.size = 10000000 bootstrap.servers = [localhost:9092] buffer.memory = 33554432 client.dns.lookup = use_all_dns_ips client.id = producer-1 compression.type = none connections.max.idle.ms = 540000 delivery.timeout.ms = 120000 enable.idempotence = false interceptor.classes = [] internal.auto.downgrade.txn.commit = false key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer linger.ms = 10000 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metadata.max.idle.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 2147483647 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.3] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.3 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = null value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
On Wed, Apr 16, 2025 at 5:55 PM Abhishek Singla <abhisheksingla...@gmail.com> wrote: > Hi Daniel and Jungtaek, > > I am using Spark in batch. Tried with kafka.<option>, now I can see they > are being set in Producer Config on Spark Startup but still they are not > being honored. I have set "linger.ms": "1000" and "batch.size": "100000". I > am publishing 10 records and they are flushed to kafka server immediately, > however kafka producer behaviour when publishing via kafka-clients using > foreachPartition is as expected. Am I missing something here or is > throttling not supported in the kafka connector? > > Regards, > Abhishek Singla > > On Thu, Mar 27, 2025 at 4:56 AM daniel williams <daniel.willi...@gmail.com> > wrote: > >> If you're using structured streaming you can pass in options as >> kafka.<option> into options as documented. If you're using Spark in batch >> form you'll want to do a foreach on a KafkaProducer via a Broadcast. >> >> All KafkaProducer specific options >> <https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html> >> will >> need to be prepended by *kafka.* >> >> >> https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html >> >> >> On Wed, Mar 26, 2025 at 4:11 PM Jungtaek Lim < >> kabhwan.opensou...@gmail.com> wrote: >> >>> Sorry I missed this. Did you make sure that you add "kafka." as prefix >>> on kafka side config when specifying Kafka source/sink option? >>> >>> On Mon, Feb 24, 2025 at 10:31 PM Abhishek Singla < >>> abhisheksingla...@gmail.com> wrote: >>> >>>> Hi Team, >>>> >>>> I am using spark to read from S3 and write to Kafka. >>>> >>>> Spark Version: 3.1.2 >>>> Scala Version: 2.12 >>>> Spark Kafka connector: spark-sql-kafka-0-10_2.12 >>>> >>>> I want to throttle kafka producer. I tried using *linger.ms >>>> <http://linger.ms>* and *batch.size* config but I can see in >>>> *ProducerConfig: >>>> ProducerConfig values* at runtime that they are not being set. Is >>>> there something I am missing? Is there any other way to throttle kafka >>>> writes? >>>> >>>> *dataset.write().format("kafka").options(options).save();* >>>> >>>> Regards, >>>> Abhishek Singla >>>> >>>> >>>> >>>> >>>> >> >> -- >> -dan >> >