Sharing Producer Config on Spark Startup

25/04/16 15:11:59 INFO ProducerConfig: ProducerConfig values:
acks = -1
batch.size = 10000000
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
internal.auto.downgrade.txn.commit = false
key.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 10000
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class
org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer

On Wed, Apr 16, 2025 at 5:55 PM Abhishek Singla <abhisheksingla...@gmail.com>
wrote:

> Hi Daniel and Jungtaek,
>
> I am using Spark in batch. Tried with kafka.<option>, now I can see they
> are being set in Producer Config on Spark Startup but still they are not
> being honored. I have set "linger.ms": "1000" and "batch.size": "100000". I
> am publishing 10 records and they are flushed to kafka server immediately,
> however kafka producer behaviour when publishing via kafka-clients using
> foreachPartition is as expected. Am I missing something here or is
> throttling not supported in the kafka connector?
>
> Regards,
> Abhishek Singla
>
> On Thu, Mar 27, 2025 at 4:56 AM daniel williams <daniel.willi...@gmail.com>
> wrote:
>
>> If you're using structured streaming you can pass in options as
>> kafka.<option> into options as documented. If you're using Spark in batch
>> form you'll want to do a foreach on a KafkaProducer via a Broadcast.
>>
>> All KafkaProducer specific options
>> <https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html>
>>  will
>> need to be prepended by *kafka.*
>>
>>
>> https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
>>
>>
>> On Wed, Mar 26, 2025 at 4:11 PM Jungtaek Lim <
>> kabhwan.opensou...@gmail.com> wrote:
>>
>>> Sorry I missed this. Did you make sure that you add "kafka." as prefix
>>> on kafka side config when specifying Kafka source/sink option?
>>>
>>> On Mon, Feb 24, 2025 at 10:31 PM Abhishek Singla <
>>> abhisheksingla...@gmail.com> wrote:
>>>
>>>> Hi Team,
>>>>
>>>> I am using spark to read from S3 and write to Kafka.
>>>>
>>>> Spark Version: 3.1.2
>>>> Scala Version: 2.12
>>>> Spark Kafka connector: spark-sql-kafka-0-10_2.12
>>>>
>>>> I want to throttle kafka producer. I tried using *linger.ms
>>>> <http://linger.ms>* and *batch.size* config but I can see in 
>>>> *ProducerConfig:
>>>> ProducerConfig values* at runtime that they are not being set. Is
>>>> there something I am missing? Is there any other way to throttle kafka
>>>> writes?
>>>>
>>>> *dataset.write().format("kafka").options(options).save();*
>>>>
>>>> Regards,
>>>> Abhishek Singla
>>>>
>>>>
>>>>
>>>>
>>>>
>>
>> --
>> -dan
>>
>

Reply via email to