Hello Steven,

Hard to say.

To give some background around "concurrent TX" error. When a client calls commitTransaction(), a 2-phase commit is executed, and only the first phase is sync, ie, commitTransaction() blocks only until the first phase is completed (ie, after the commit itself was successful). However, after a TX is committed, it is no completed yet, and the completion of the TX happens in the second phase which happens async on the broker.

As long as the second phase is not completed, the same client cannot start a new TX, and if it tried to do so, the broker returns the "concurrent TX" error you observe.

If the broker is healthy and completes the second phase quickly, the error should be rare and transient; as you have noted, it's a retriable error. The client just needs to wait until the broker completed the TX before it can resume.

Given the issue you describe, it seems the broker is slow (or not able at all) to complete a committed transaction. Why this is the case is hard to say from the information you provided, but I would inspect broker logs in more details (especially the broker hosting the corresponding TX coordinator) to find out more. Maybe collecting DEBUG logs could also help.

The only suspicious config I found is `replication.factor`, which you set to 2. For EOS, the recommended configuration is a replication factor of 3, plus min-in-sync-replicas of 2. Not sure what you current min-in-sync-replica config is. It would also be good to verify the corresponding configs of the broke side internal __transaction_state topic.

HTH to get you started with trouble shooting. Let us know what you find.


-Matthias


On 11/7/25 8:51 PM, Steven Schlansker wrote:
Happy Friday everyone,

We run a moderately large Kafka 4.1.0 Streams topology. Over time we've started to 
receive bug reports from users about seemingly "impossible" aggregate values.
After investigation, we hypothesize this is due to stores (particularly 
aggregates) double-counting some messages
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores)

So, we're starting to experiment with exactly-once semantics. I try to launch a 
single node Streams app

StreamsConfig values:
  acceptable.recovery.lag = 10000
  application.id = search-indexing-prod-scs-tmp-145
  application.server = localhost:8080
  bootstrap.servers = [kafka-broker-0.kafka-broker.default:9093, 
kafka-broker-1.kafka-broker.default:9093, 
kafka-broker-2.kafka-broker.default:9093]
  buffered.records.per.partition = 500
  built.in.metrics.version = latest
  cache.max.bytes.buffering = 10485760
  client.id =
  commit.interval.ms = 100
  connections.max.idle.ms = 540000
  default.client.supplier = class 
org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier
  default.deserialization.exception.handler = class
  org.apache.kafka.streams.errors.LogAndFailExceptionHandler
  default.dsl.store = rocksDB
  default.key.serde = null
  default.list.key.serde.inner = null
  default.list.key.serde.type = null
  default.list.value.serde.inner = null
  default.list.value.serde.type = null
  default.production.exception.handler = class 
org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
  default.timestamp.extractor = class 
org.apache.kafka.streams.processor.FailOnInvalidTimestamp
  default.value.serde = null
  deserialization.exception.handler = class 
org.apache.kafka.streams.errors.LogAndFailExceptionHandler
  dsl.store.suppliers.class = class 
org.apache.kafka.streams.state.BuiltInDslStoreSuppliers$RocksDBDslStoreSuppliers
  enable.metrics.push = true
  ensure.explicit.internal.resource.naming = true
  group.protocol = classic
  log.summary.interval.ms = 120000
  max.task.idle.ms = 0
  max.warmup.replicas = 2
  metadata.max.age.ms = 300000
  metadata.recovery.rebootstrap.trigger.ms = 300000
  metadata.recovery.strategy = rebootstrap
  metric.reporters = [org.apache.kafka.common.metrics.JmxReporter]
  metrics.num.samples = 2
  metrics.recording.level = INFO
  metrics.sample.window.ms = 30000
  num.standby.replicas = 0
  num.stream.threads = 8
  poll.ms = 100
  probing.rebalance.interval.ms = 600000
  processing.exception.handler = class 
org.apache.kafka.streams.errors.LogAndFailProcessingExceptionHandler
  processing.guarantee = exactly_once_v2
  processor.wrapper.class = class 
com.paywholesail.service.search.core.streams.NamedFrameProcessorWrapper
  production.exception.handler = class 
org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
  rack.aware.assignment.non_overlap_cost = null
  rack.aware.assignment.strategy = balance_subtopology
  rack.aware.assignment.tags = [zone]
  rack.aware.assignment.traffic_cost = null
  receive.buffer.bytes = 32768
  reconnect.backoff.max.ms = 1000
  reconnect.backoff.ms = 50
  repartition.purge.interval.ms = 30000
  replication.factor = 2
  request.timeout.ms = 40000
  retry.backoff.ms = 100
  rocksdb.config.setter = class 
com.paywholesail.service.search.core.streams.SearchRocksDBConfigSetter
  security.protocol = SASL_PLAINTEXT
  send.buffer.bytes = 131072
  state.cleanup.delay.ms = 600000
  state.dir = /volumes/search-index/40-streams-data
  statestore.cache.max.bytes = 21474836480
  task.assignor.class = null
  task.timeout.ms = 300000
  topology.optimization = all
  upgrade.from = null
  window.size.ms = null
  windowed.inner.class.serde = null
  windowstore.changelog.additional.retention.ms = 86400000

Almost immediately, the application starts spamming logs like:

[Producer 
clientId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-StreamThread-5-producer,
 
transactionalId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-5]
 Got error produce response with correlation id 12 on topic-partition 
search-indexing-prod-scs-tmp-145-current-date-per-tz-eoc-table-changelog-28, 
retrying (2147483646 attempts left). Error: CONCURRENT_TRANSACTIONS
[Producer 
clientId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-StreamThread-8-producer,
 
transactionalId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-8]
 Got error produce response with correlation id 15 on topic-partition 
search-indexing-prod-scs-tmp-145-indexed-tag-group-by-company-repartition-13, 
retrying (2147483646 attempts left). Error: CONCURRENT_TRANSACTIONS
[Producer 
clientId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-StreamThread-8-producer,
 
transactionalId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-8]
 Got error produce response with correlation id 15 on topic-partition 
search-indexing-prod-scs-tmp-145-indexed-tag-group-by-company-repartition-2, 
retrying (2147483646 attempts left). Error: CONCURRENT_TRANSACTIONS
[Producer 
clientId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-StreamThread-8-producer,
 
transactionalId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-8]
 Got error produce response with correlation id 15 on topic-partition 
search-indexing-prod-scs-tmp-145-indexed-tag-group-by-company-repartition-6, 
retrying (2147483646 attempts left). Error: CONCURRENT_TRANSACTIONS
[Producer 
clientId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-StreamThread-8-producer,
 
transactionalId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-8]
 Got error produce response with correlation id 15 on topic-partition 
search-indexing-prod-scs-tmp-145-user-access-group-by-company-repartition-13, 
retrying (2147483646 attempts left). Error: CONCURRENT_TRANSACTIONS
[Producer 
clientId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-StreamThread-7-producer,
 
transactionalId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-7]
 Got error produce response with correlation id 10 on topic-partition 
search-indexing-prod-scs-tmp-145-payment-account-share-materialized-changelog-14,
 retrying (2147483646 attempts left). Error: CONCURRENT_TRANSACTIONS

(continues about 60,000 times)

After about 20 minutes of this, it gives up:
org.apache.kafka.streams.errors.StreamsException: Error encountered trying to 
commit a transaction
   at 
org.apache.kafka.streams.processor.internals.StreamsProducer.commitTransaction(StreamsProducer.java:262)
   at 
org.apache.kafka.streams.processor.internals.TaskExecutor.commitOffsetsOrTransaction(TaskExecutor.java:184)
   at 
org.apache.kafka.streams.processor.internals.TaskExecutor.commitTasksAndMaybeUpdateCommittableOffsets(TaskExecutor.java:152)
 at 
org.apache.kafka.streams.processor.internals.TaskManager.commitTasksAndMaybeUpdateCommittableOffsets(TaskManager.java:1957)
   at 
org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1924)
   at 
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1782)
   at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:1276)
   at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:926)
   at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:886)
     Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The 
producer attempted a transactional operation in an invalid state.

Around the same time, the 3 brokers (also 4.1.0) starts logging lots of:
ERROR [ReplicaManager broker=102] Error processing append operation on 
partition 
x4ERcnG9RqmG2vdJvboRKw:search-indexing-prod-scs-tmp-145-notification-repartition-3
 (kafka.server.ReplicaManager)
org.apache.kafka.common.errors.OutOfOrderSequenceException: Invalid sequence 
number for producer 370010 at offset 2827 in partition 
search-indexing-prod-scs-tmp-145-notification-repartition-29: 1 (incoming seq. 
number). Expected sequence 0 for transactions v2 idempotent producer with no 
existing state
[2025-11-08 00:06:56,628] ERROR [ReplicaManager broker=102] Error processing 
append operation on partition 
x4ERcnG9RqmG2vdJvboRKw:search-indexing-prod-scs-tmp-145-notification-repartition-17
 (kafka.server.ReplicaManager).
org.apache.kafka.common.errors.OutOfOrderSequenceException: Invalid sequence 
number for producer 370012 at offset 2742 in partition 
search-indexing-prod-scs-tmp-145-notification-repartition-17: 1 (incoming seq. 
number). Expected sequence 0 for transactions v2 idempotent producer with no 
existing state.
[2025-11-08 00:06:57,750] ERROR [ReplicaManager broker=100] Error processing 
append operation on partition 
x4ERcnG9RqmG2vdJvboRKw:search-indexing-prod-scs-tmp-145-notification-repartition-20
 (kafka.server.ReplicaManager)
org.apache.kafka.common.errors.OutOfOrderSequenceException: Out of order 
sequence number for producer 368019 at offset 3196 in partition 
search-indexing-prod-scs-tmp-145-notification-repartition-20: 2 (incoming seq. 
number), 0 (current end sequence number)

This is our first experience with Streams EOSv2 and Kafka transactions in 
general, so I hope I didn't configure something horribly badly, but I am not 
sure where we've gone wrong.
If anyone can offer any advice that'd be much appreciated.

Thanks,
Steven




Reply via email to