Hello Steven, Hard to say.
To give some background around "concurrent TX" error. When a client calls commitTransaction(), a 2-phase commit is executed, and only the first phase is sync, ie, commitTransaction() blocks only until the first phase is completed (ie, after the commit itself was successful). However, after a TX is committed, it is no completed yet, and the completion of the TX happens in the second phase which happens async on the broker.
As long as the second phase is not completed, the same client cannot start a new TX, and if it tried to do so, the broker returns the "concurrent TX" error you observe.
If the broker is healthy and completes the second phase quickly, the error should be rare and transient; as you have noted, it's a retriable error. The client just needs to wait until the broker completed the TX before it can resume.
Given the issue you describe, it seems the broker is slow (or not able at all) to complete a committed transaction. Why this is the case is hard to say from the information you provided, but I would inspect broker logs in more details (especially the broker hosting the corresponding TX coordinator) to find out more. Maybe collecting DEBUG logs could also help.
The only suspicious config I found is `replication.factor`, which you set to 2. For EOS, the recommended configuration is a replication factor of 3, plus min-in-sync-replicas of 2. Not sure what you current min-in-sync-replica config is. It would also be good to verify the corresponding configs of the broke side internal __transaction_state topic.
HTH to get you started with trouble shooting. Let us know what you find. -Matthias On 11/7/25 8:51 PM, Steven Schlansker wrote:
Happy Friday everyone, We run a moderately large Kafka 4.1.0 Streams topology. Over time we've started to receive bug reports from users about seemingly "impossible" aggregate values. After investigation, we hypothesize this is due to stores (particularly aggregates) double-counting some messages (https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores) So, we're starting to experiment with exactly-once semantics. I try to launch a single node Streams app StreamsConfig values: acceptable.recovery.lag = 10000 application.id = search-indexing-prod-scs-tmp-145 application.server = localhost:8080 bootstrap.servers = [kafka-broker-0.kafka-broker.default:9093, kafka-broker-1.kafka-broker.default:9093, kafka-broker-2.kafka-broker.default:9093] buffered.records.per.partition = 500 built.in.metrics.version = latest cache.max.bytes.buffering = 10485760 client.id = commit.interval.ms = 100 connections.max.idle.ms = 540000 default.client.supplier = class org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier default.deserialization.exception.handler = class org.apache.kafka.streams.errors.LogAndFailExceptionHandler default.dsl.store = rocksDB default.key.serde = null default.list.key.serde.inner = null default.list.key.serde.type = null default.list.value.serde.inner = null default.list.value.serde.type = null default.production.exception.handler = class org.apache.kafka.streams.errors.DefaultProductionExceptionHandler default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp default.value.serde = null deserialization.exception.handler = class org.apache.kafka.streams.errors.LogAndFailExceptionHandler dsl.store.suppliers.class = class org.apache.kafka.streams.state.BuiltInDslStoreSuppliers$RocksDBDslStoreSuppliers enable.metrics.push = true ensure.explicit.internal.resource.naming = true group.protocol = classic log.summary.interval.ms = 120000 max.task.idle.ms = 0 max.warmup.replicas = 2 metadata.max.age.ms = 300000 metadata.recovery.rebootstrap.trigger.ms = 300000 metadata.recovery.strategy = rebootstrap metric.reporters = [org.apache.kafka.common.metrics.JmxReporter] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 num.standby.replicas = 0 num.stream.threads = 8 poll.ms = 100 probing.rebalance.interval.ms = 600000 processing.exception.handler = class org.apache.kafka.streams.errors.LogAndFailProcessingExceptionHandler processing.guarantee = exactly_once_v2 processor.wrapper.class = class com.paywholesail.service.search.core.streams.NamedFrameProcessorWrapper production.exception.handler = class org.apache.kafka.streams.errors.DefaultProductionExceptionHandler rack.aware.assignment.non_overlap_cost = null rack.aware.assignment.strategy = balance_subtopology rack.aware.assignment.tags = [zone] rack.aware.assignment.traffic_cost = null receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 repartition.purge.interval.ms = 30000 replication.factor = 2 request.timeout.ms = 40000 retry.backoff.ms = 100 rocksdb.config.setter = class com.paywholesail.service.search.core.streams.SearchRocksDBConfigSetter security.protocol = SASL_PLAINTEXT send.buffer.bytes = 131072 state.cleanup.delay.ms = 600000 state.dir = /volumes/search-index/40-streams-data statestore.cache.max.bytes = 21474836480 task.assignor.class = null task.timeout.ms = 300000 topology.optimization = all upgrade.from = null window.size.ms = null windowed.inner.class.serde = null windowstore.changelog.additional.retention.ms = 86400000 Almost immediately, the application starts spamming logs like: [Producer clientId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-StreamThread-5-producer, transactionalId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-5] Got error produce response with correlation id 12 on topic-partition search-indexing-prod-scs-tmp-145-current-date-per-tz-eoc-table-changelog-28, retrying (2147483646 attempts left). Error: CONCURRENT_TRANSACTIONS [Producer clientId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-StreamThread-8-producer, transactionalId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-8] Got error produce response with correlation id 15 on topic-partition search-indexing-prod-scs-tmp-145-indexed-tag-group-by-company-repartition-13, retrying (2147483646 attempts left). Error: CONCURRENT_TRANSACTIONS [Producer clientId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-StreamThread-8-producer, transactionalId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-8] Got error produce response with correlation id 15 on topic-partition search-indexing-prod-scs-tmp-145-indexed-tag-group-by-company-repartition-2, retrying (2147483646 attempts left). Error: CONCURRENT_TRANSACTIONS [Producer clientId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-StreamThread-8-producer, transactionalId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-8] Got error produce response with correlation id 15 on topic-partition search-indexing-prod-scs-tmp-145-indexed-tag-group-by-company-repartition-6, retrying (2147483646 attempts left). Error: CONCURRENT_TRANSACTIONS [Producer clientId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-StreamThread-8-producer, transactionalId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-8] Got error produce response with correlation id 15 on topic-partition search-indexing-prod-scs-tmp-145-user-access-group-by-company-repartition-13, retrying (2147483646 attempts left). Error: CONCURRENT_TRANSACTIONS [Producer clientId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-StreamThread-7-producer, transactionalId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-7] Got error produce response with correlation id 10 on topic-partition search-indexing-prod-scs-tmp-145-payment-account-share-materialized-changelog-14, retrying (2147483646 attempts left). Error: CONCURRENT_TRANSACTIONS (continues about 60,000 times) After about 20 minutes of this, it gives up: org.apache.kafka.streams.errors.StreamsException: Error encountered trying to commit a transaction at org.apache.kafka.streams.processor.internals.StreamsProducer.commitTransaction(StreamsProducer.java:262) at org.apache.kafka.streams.processor.internals.TaskExecutor.commitOffsetsOrTransaction(TaskExecutor.java:184) at org.apache.kafka.streams.processor.internals.TaskExecutor.commitTasksAndMaybeUpdateCommittableOffsets(TaskExecutor.java:152) at org.apache.kafka.streams.processor.internals.TaskManager.commitTasksAndMaybeUpdateCommittableOffsets(TaskManager.java:1957) at org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1924) at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1782) at org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:1276) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:926) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:886) Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state. Around the same time, the 3 brokers (also 4.1.0) starts logging lots of: ERROR [ReplicaManager broker=102] Error processing append operation on partition x4ERcnG9RqmG2vdJvboRKw:search-indexing-prod-scs-tmp-145-notification-repartition-3 (kafka.server.ReplicaManager) org.apache.kafka.common.errors.OutOfOrderSequenceException: Invalid sequence number for producer 370010 at offset 2827 in partition search-indexing-prod-scs-tmp-145-notification-repartition-29: 1 (incoming seq. number). Expected sequence 0 for transactions v2 idempotent producer with no existing state [2025-11-08 00:06:56,628] ERROR [ReplicaManager broker=102] Error processing append operation on partition x4ERcnG9RqmG2vdJvboRKw:search-indexing-prod-scs-tmp-145-notification-repartition-17 (kafka.server.ReplicaManager). org.apache.kafka.common.errors.OutOfOrderSequenceException: Invalid sequence number for producer 370012 at offset 2742 in partition search-indexing-prod-scs-tmp-145-notification-repartition-17: 1 (incoming seq. number). Expected sequence 0 for transactions v2 idempotent producer with no existing state. [2025-11-08 00:06:57,750] ERROR [ReplicaManager broker=100] Error processing append operation on partition x4ERcnG9RqmG2vdJvboRKw:search-indexing-prod-scs-tmp-145-notification-repartition-20 (kafka.server.ReplicaManager) org.apache.kafka.common.errors.OutOfOrderSequenceException: Out of order sequence number for producer 368019 at offset 3196 in partition search-indexing-prod-scs-tmp-145-notification-repartition-20: 2 (incoming seq. number), 0 (current end sequence number) This is our first experience with Streams EOSv2 and Kafka transactions in general, so I hope I didn't configure something horribly badly, but I am not sure where we've gone wrong. If anyone can offer any advice that'd be much appreciated. Thanks, Steven
