We haven't entirely solved our problem, but I thought I'd write down some notes in case it helps someone else. It seems that we ran into an unintended interaction between our configuration of large producer buffers and producer quotas.
In at-least-once mode, producer stalls due to quota throttling are (mostly) harmless - as long as you up the consumer poll interval, you're fine, stalls resolve themselves. In exactly-once mode, producer throttling with large buffers can be fatal, if your buffer ends up large enough that it can't be emptied in the (now relatively short) transaction timeout. The app will very easily pack gigabytes of data into producer buffers that will take many seconds to ack to the Kafka brokers when throttled by an insufficient quota. By removing the producer quota (for now, until we can re-evaluate requirements) the app is much better behaved. I love Kafka, but the huge number of configuration options sure leaves a lot of room for misguided tuning, and the failure mode is so often "it timed out"... Happy holidays all, and thanks for the advice, Steven > On Nov 18, 2025, at 12:12 AM, Matthias J. Sax <[email protected]> wrote: > > Some progress. > > For the TX timeout, the producer default is 60 seconds, however, Kafka > Streams reduced it to 10 seconds out-of-the-box. Cf > https://kafka.apache.org/41/documentation/streams/developer-guide/config-streams.html#default-values > > Not sure why you believe the actuall timeout is more like 90 seconds? Note, a > TX starts bascially on the first send / first add partitionToTx request. > > As the broker does timeout transactions, the first questiont to ask would be: > does KS try to commit on time? > > In general, commit interval should be set to a smaller value than transaction > timeout, and KS verifies this. What are both values? > > Even if both values are reasonable (ie commit interval signifantly smaller > than TX-timeout), there could be other reason why KS might not commit on > time. I would recommend to check client (KS and producer) side logs, to see > if KS attempt to commit or not. > > > -Matthias > > > On 11/17/25 3:07 PM, Steven Schlansker wrote: >> Thanks so much for replying. >> I tried turning on DEBUG logging and tracking down transaction-specific >> details. >> For about a minute and a half, there's a stream of logs like: >> [2025-11-13 00:13:50,355] DEBUG TransactionalId >> search-indexing-qa-scs-tmp-150-1ad3ea89-ebbf-4bec-af13-e8e664c41a9b-1 >> prepare transition from EMPTY to TxnTransitMetadata[producerId=380008, >> prevProducerId=380008, nextProducerId=-1, producerEpoch=0, >> lastProducerEpoch=-1, txnTimeoutMs=10000, txnState=EMPTY, >> topicPartitions=[], txnStartTimestamp=-1, >> txnLastUpdateTimestamp=1762992830073, clientTransactionVersion=TV_0] >> (kafka.coordinator.transaction.TransactionMetadata) >> INFO [TransactionCoordinator id=102] Initialized transactionalId >> search-indexing-qa-scs-tmp-150-1ad3ea89-ebbf-4bec-af13-e8e664c41a9b-1 with >> producerId 380008 and producer epoch 0 on partition __transaction_state-34 >> (kafka.coordinator.transaction.TransactionCoordinator) >> [2025-11-13 00:15:12,222] DEBUG [Transaction State Manager 102]: Updating >> search-indexing-qa-scs-tmp-150-1ad3ea89-ebbf-4bec-af13-e8e664c41a9b-1's >> transaction state to TxnTransitMetadata[producerId=380008, >> prevProducerId=380008, nextProducerId=-1, producerEpoch=1, >> lastProducerEpoch=0, txnTimeoutMs=10000, txnState=ONGOING, >> topicPartitions=[search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-21, >> search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-19, >> search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-15, >> search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-27, >> search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-26, >> search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-25, >> search-indexing-qa-scs-tmp-150-indexable-user-table-repartition-29, >> search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-24, >> search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-23, >> search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-3, >> search-indexing-qa-scs-tmp-150-indexable-user-table-repartition-7, >> search-indexing-qa-scs-tmp-150-indexable-user-table-repartition-12, >> search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-9, >> search-indexing-qa-scs-tmp-150-indexable-user-table-repartition-14], >> txnStartTimestamp=1762992902983, txnLastUpdateTimestamp=1762992911740, >> clientTransactionVersion=TV_2] with coordinator epoch 13 for >> search-indexing-qa-scs-tmp-150-1ad3ea89-ebbf-4bec-af13-e8e664c41a9b-1 >> succeeded (kafka.coordinator.transaction.TransactionStateManager) >> (lots of repeated ADD_PARTITIONS) >> [2025-11-13 00:15:20,102] DEBUG [Transaction State Manager 102]: Updating >> search-indexing-qa-scs-tmp-150-1ad3ea89-ebbf-4bec-af13-e8e664c41a9b-1's >> transaction state to TxnTransitMetadata[producerId=380008, >> prevProducerId=380008, nextProducerId=-1, producerEpoch=2, >> lastProducerEpoch=1, txnTimeoutMs=10000, txnState=PREPARE_ABORT, >> topicPartitions=[search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-22, >> search-indexing-qa-scs-tmp-150-indexable-user-table-repartition-16, >> search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-21, >> search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-19, >> search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-15, >> search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-29, >> search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-28, >> search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-27, >> search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-26, >> search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-25, >> search-indexing-qa-scs-tmp-150-indexable-user-table-repartition-29, >> search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-24, >> search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-23, >> search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-3, >> search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-0, >> search-indexing-qa-scs-tmp-150-indexable-user-table-repartition-7, >> search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-13, >> search-indexing-qa-scs-tmp-150-indexable-user-table-repartition-10, >> search-indexing-qa-scs-tmp-150-indexable-user-table-repartition-12, >> search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-9, >> search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-8, >> search-indexing-qa-scs-tmp-150-indexable-user-table-repartition-14], >> txnStartTimestamp=1762992902983, txnLastUpdateTimestamp=1762992918599, >> clientTransactionVersion=TV_2] with coordinator epoch 13 for >> search-indexing-qa-scs-tmp-150-1ad3ea89-ebbf-4bec-af13-e8e664c41a9b-1 >> succeeded (kafka.coordinator.transaction.TransactionStateManager) >> [2025-11-13 00:15:20,102] DEBUG TransactionalId >> search-indexing-qa-scs-tmp-150-1ad3ea89-ebbf-4bec-af13-e8e664c41a9b-1 >> prepare transition from PREPARE_ABORT to >> TxnTransitMetadata[producerId=380008, prevProducerId=380008, >> nextProducerId=-1, producerEpoch=2, lastProducerEpoch=1, txnTimeoutMs=10000, >> txnState=COMPLETE_ABORT, topicPartitions=[], >> txnStartTimestamp=1762992902983, txnLastUpdateTimestamp=1762992920102, >> clientTransactionVersion=TV_2] >> (kafka.coordinator.transaction.TransactionMetadata) >> [2025-11-13 00:15:20,121] INFO [TransactionCoordinator id=102] Completed >> rollback of ongoing transaction for transactionalId >> search-indexing-qa-scs-tmp-150-1ad3ea89-ebbf-4bec-af13-e8e664c41a9b-1 due to >> timeout (kafka.coordinator.transaction.TransactionCoordinator) >> So, it seems all the transactions end up getting aborted due to timeout. I'm >> a little surprised to see "txnTimeoutMs=10000" - the documented default is >> 60000, and we didn't reduce it. But the overall actual timeout seems closer >> to 90s. >> I'm not sure why our brokers would be struggling to commit transactions so >> badly. In at least once mode, we can push 50MB/s+ pretty easily, but in >> exactly once mode we see only a few KB/s actually getting through. >> We tried upgrading the broker disks to a higher class (SSD backed) but that >> didn't seem to change much, the disks remain mostly idle as transactions >> timeout left and right. Network processor threads metric remains mostly idle >> as well. >> I tried attaching a profiler to the Kafka broker and see if I could identify >> any place it gets "stuck", but unfortunately perhaps due to the asynchronous >> nature of the system, stack trace profiling wasn't particularly helpful. >> I'll keep playing with it... I've attached a broker log dump filtered to >> this single transaction, in case there's anything helpful there. >> Best, >> Steven >>> On Nov 8, 2025, at 10:18 AM, Matthias J. Sax <[email protected]> wrote: >>> >>> Hello Steven, >>> >>> Hard to say. >>> >>> To give some background around "concurrent TX" error. When a client calls >>> commitTransaction(), a 2-phase commit is executed, and only the first phase >>> is sync, ie, commitTransaction() blocks only until the first phase is >>> completed (ie, after the commit itself was successful). However, after a TX >>> is committed, it is no completed yet, and the completion of the TX happens >>> in the second phase which happens async on the broker. >>> >>> As long as the second phase is not completed, the same client cannot start >>> a new TX, and if it tried to do so, the broker returns the "concurrent TX" >>> error you observe. >>> >>> If the broker is healthy and completes the second phase quickly, the error >>> should be rare and transient; as you have noted, it's a retriable error. >>> The client just needs to wait until the broker completed the TX before it >>> can resume. >>> >>> Given the issue you describe, it seems the broker is slow (or not able at >>> all) to complete a committed transaction. Why this is the case is hard to >>> say from the information you provided, but I would inspect broker logs in >>> more details (especially the broker hosting the corresponding TX >>> coordinator) to find out more. Maybe collecting DEBUG logs could also help. >>> >>> The only suspicious config I found is `replication.factor`, which you set >>> to 2. For EOS, the recommended configuration is a replication factor of 3, >>> plus min-in-sync-replicas of 2. Not sure what you current >>> min-in-sync-replica config is. It would also be good to verify the >>> corresponding configs of the broke side internal __transaction_state topic. >>> >>> HTH to get you started with trouble shooting. Let us know what you find. >>> >>> >>> -Matthias >>> >>> >>> On 11/7/25 8:51 PM, Steven Schlansker wrote: >>>> Happy Friday everyone, >>>> We run a moderately large Kafka 4.1.0 Streams topology. Over time we've >>>> started to receive bug reports from users about seemingly "impossible" >>>> aggregate values. >>>> After investigation, we hypothesize this is due to stores (particularly >>>> aggregates) double-counting some messages >>>> (https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores) >>>> So, we're starting to experiment with exactly-once semantics. I try to >>>> launch a single node Streams app >>>> StreamsConfig values: >>>> acceptable.recovery.lag = 10000 >>>> application.id = search-indexing-prod-scs-tmp-145 >>>> application.server = localhost:8080 >>>> bootstrap.servers = [kafka-broker-0.kafka-broker.default:9093, >>>> kafka-broker-1.kafka-broker.default:9093, >>>> kafka-broker-2.kafka-broker.default:9093] >>>> buffered.records.per.partition = 500 >>>> built.in.metrics.version = latest >>>> cache.max.bytes.buffering = 10485760 >>>> client.id = >>>> commit.interval.ms = 100 >>>> connections.max.idle.ms = 540000 >>>> default.client.supplier = class >>>> org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier >>>> default.deserialization.exception.handler = class >>>> org.apache.kafka.streams.errors.LogAndFailExceptionHandler >>>> default.dsl.store = rocksDB >>>> default.key.serde = null >>>> default.list.key.serde.inner = null >>>> default.list.key.serde.type = null >>>> default.list.value.serde.inner = null >>>> default.list.value.serde.type = null >>>> default.production.exception.handler = class >>>> org.apache.kafka.streams.errors.DefaultProductionExceptionHandler >>>> default.timestamp.extractor = class >>>> org.apache.kafka.streams.processor.FailOnInvalidTimestamp >>>> default.value.serde = null >>>> deserialization.exception.handler = class >>>> org.apache.kafka.streams.errors.LogAndFailExceptionHandler >>>> dsl.store.suppliers.class = class >>>> org.apache.kafka.streams.state.BuiltInDslStoreSuppliers$RocksDBDslStoreSuppliers >>>> enable.metrics.push = true >>>> ensure.explicit.internal.resource.naming = true >>>> group.protocol = classic >>>> log.summary.interval.ms = 120000 >>>> max.task.idle.ms = 0 >>>> max.warmup.replicas = 2 >>>> metadata.max.age.ms = 300000 >>>> metadata.recovery.rebootstrap.trigger.ms = 300000 >>>> metadata.recovery.strategy = rebootstrap >>>> metric.reporters = [org.apache.kafka.common.metrics.JmxReporter] >>>> metrics.num.samples = 2 >>>> metrics.recording.level = INFO >>>> metrics.sample.window.ms = 30000 >>>> num.standby.replicas = 0 >>>> num.stream.threads = 8 >>>> poll.ms = 100 >>>> probing.rebalance.interval.ms = 600000 >>>> processing.exception.handler = class >>>> org.apache.kafka.streams.errors.LogAndFailProcessingExceptionHandler >>>> processing.guarantee = exactly_once_v2 >>>> processor.wrapper.class = class >>>> com.paywholesail.service.search.core.streams.NamedFrameProcessorWrapper >>>> production.exception.handler = class >>>> org.apache.kafka.streams.errors.DefaultProductionExceptionHandler >>>> rack.aware.assignment.non_overlap_cost = null >>>> rack.aware.assignment.strategy = balance_subtopology >>>> rack.aware.assignment.tags = [zone] >>>> rack.aware.assignment.traffic_cost = null >>>> receive.buffer.bytes = 32768 >>>> reconnect.backoff.max.ms = 1000 >>>> reconnect.backoff.ms = 50 >>>> repartition.purge.interval.ms = 30000 >>>> replication.factor = 2 >>>> request.timeout.ms = 40000 >>>> retry.backoff.ms = 100 >>>> rocksdb.config.setter = class >>>> com.paywholesail.service.search.core.streams.SearchRocksDBConfigSetter >>>> security.protocol = SASL_PLAINTEXT >>>> send.buffer.bytes = 131072 >>>> state.cleanup.delay.ms = 600000 >>>> state.dir = /volumes/search-index/40-streams-data >>>> statestore.cache.max.bytes = 21474836480 >>>> task.assignor.class = null >>>> task.timeout.ms = 300000 >>>> topology.optimization = all >>>> upgrade.from = null >>>> window.size.ms = null >>>> windowed.inner.class.serde = null >>>> windowstore.changelog.additional.retention.ms = 86400000 >>>> Almost immediately, the application starts spamming logs like: >>>> [Producer >>>> clientId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-StreamThread-5-producer, >>>> >>>> transactionalId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-5] >>>> Got error produce response with correlation id 12 on topic-partition >>>> search-indexing-prod-scs-tmp-145-current-date-per-tz-eoc-table-changelog-28, >>>> retrying (2147483646 attempts left). Error: CONCURRENT_TRANSACTIONS >>>> [Producer >>>> clientId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-StreamThread-8-producer, >>>> >>>> transactionalId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-8] >>>> Got error produce response with correlation id 15 on topic-partition >>>> search-indexing-prod-scs-tmp-145-indexed-tag-group-by-company-repartition-13, >>>> retrying (2147483646 attempts left). Error: CONCURRENT_TRANSACTIONS >>>> [Producer >>>> clientId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-StreamThread-8-producer, >>>> >>>> transactionalId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-8] >>>> Got error produce response with correlation id 15 on topic-partition >>>> search-indexing-prod-scs-tmp-145-indexed-tag-group-by-company-repartition-2, >>>> retrying (2147483646 attempts left). Error: CONCURRENT_TRANSACTIONS >>>> [Producer >>>> clientId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-StreamThread-8-producer, >>>> >>>> transactionalId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-8] >>>> Got error produce response with correlation id 15 on topic-partition >>>> search-indexing-prod-scs-tmp-145-indexed-tag-group-by-company-repartition-6, >>>> retrying (2147483646 attempts left). Error: CONCURRENT_TRANSACTIONS >>>> [Producer >>>> clientId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-StreamThread-8-producer, >>>> >>>> transactionalId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-8] >>>> Got error produce response with correlation id 15 on topic-partition >>>> search-indexing-prod-scs-tmp-145-user-access-group-by-company-repartition-13, >>>> retrying (2147483646 attempts left). Error: CONCURRENT_TRANSACTIONS >>>> [Producer >>>> clientId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-StreamThread-7-producer, >>>> >>>> transactionalId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-7] >>>> Got error produce response with correlation id 10 on topic-partition >>>> search-indexing-prod-scs-tmp-145-payment-account-share-materialized-changelog-14, >>>> retrying (2147483646 attempts left). Error: CONCURRENT_TRANSACTIONS >>>> (continues about 60,000 times) >>>> After about 20 minutes of this, it gives up: >>>> org.apache.kafka.streams.errors.StreamsException: Error encountered trying >>>> to commit a transaction >>>> at >>>> org.apache.kafka.streams.processor.internals.StreamsProducer.commitTransaction(StreamsProducer.java:262) >>>> at >>>> org.apache.kafka.streams.processor.internals.TaskExecutor.commitOffsetsOrTransaction(TaskExecutor.java:184) >>>> at >>>> org.apache.kafka.streams.processor.internals.TaskExecutor.commitTasksAndMaybeUpdateCommittableOffsets(TaskExecutor.java:152) >>>> at >>>> org.apache.kafka.streams.processor.internals.TaskManager.commitTasksAndMaybeUpdateCommittableOffsets(TaskManager.java:1957) >>>> at >>>> org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1924) >>>> at >>>> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1782) >>>> at >>>> org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:1276) >>>> at >>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:926) >>>> at >>>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:886) >>>> Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: >>>> The producer attempted a transactional operation in an invalid state. >>>> Around the same time, the 3 brokers (also 4.1.0) starts logging lots of: >>>> ERROR [ReplicaManager broker=102] Error processing append operation on >>>> partition >>>> x4ERcnG9RqmG2vdJvboRKw:search-indexing-prod-scs-tmp-145-notification-repartition-3 >>>> (kafka.server.ReplicaManager) >>>> org.apache.kafka.common.errors.OutOfOrderSequenceException: Invalid >>>> sequence number for producer 370010 at offset 2827 in partition >>>> search-indexing-prod-scs-tmp-145-notification-repartition-29: 1 (incoming >>>> seq. number). Expected sequence 0 for transactions v2 idempotent producer >>>> with no existing state >>>> [2025-11-08 00:06:56,628] ERROR [ReplicaManager broker=102] Error >>>> processing append operation on partition >>>> x4ERcnG9RqmG2vdJvboRKw:search-indexing-prod-scs-tmp-145-notification-repartition-17 >>>> (kafka.server.ReplicaManager). >>>> org.apache.kafka.common.errors.OutOfOrderSequenceException: Invalid >>>> sequence number for producer 370012 at offset 2742 in partition >>>> search-indexing-prod-scs-tmp-145-notification-repartition-17: 1 (incoming >>>> seq. number). Expected sequence 0 for transactions v2 idempotent producer >>>> with no existing state. >>>> [2025-11-08 00:06:57,750] ERROR [ReplicaManager broker=100] Error >>>> processing append operation on partition >>>> x4ERcnG9RqmG2vdJvboRKw:search-indexing-prod-scs-tmp-145-notification-repartition-20 >>>> (kafka.server.ReplicaManager) >>>> org.apache.kafka.common.errors.OutOfOrderSequenceException: Out of order >>>> sequence number for producer 368019 at offset 3196 in partition >>>> search-indexing-prod-scs-tmp-145-notification-repartition-20: 2 (incoming >>>> seq. number), 0 (current end sequence number) >>>> This is our first experience with Streams EOSv2 and Kafka transactions in >>>> general, so I hope I didn't configure something horribly badly, but I am >>>> not sure where we've gone wrong. >>>> If anyone can offer any advice that'd be much appreciated. >>>> Thanks, >>>> Steven >>> >
