Happy Friday everyone,

We run a moderately large Kafka 4.1.0 Streams topology. Over time we've started 
to receive bug reports from users about seemingly "impossible" aggregate values.
After investigation, we hypothesize this is due to stores (particularly 
aggregates) double-counting some messages
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores)

So, we're starting to experiment with exactly-once semantics. I try to launch a 
single node Streams app

StreamsConfig values:
 acceptable.recovery.lag = 10000 
 application.id = search-indexing-prod-scs-tmp-145 
 application.server = localhost:8080 
 bootstrap.servers = [kafka-broker-0.kafka-broker.default:9093, 
kafka-broker-1.kafka-broker.default:9093, 
kafka-broker-2.kafka-broker.default:9093] 
 buffered.records.per.partition = 500 
 built.in.metrics.version = latest 
 cache.max.bytes.buffering = 10485760 
 client.id = 
 commit.interval.ms = 100 
 connections.max.idle.ms = 540000 
 default.client.supplier = class 
org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier 
 default.deserialization.exception.handler = class 
 org.apache.kafka.streams.errors.LogAndFailExceptionHandler 
 default.dsl.store = rocksDB 
 default.key.serde = null 
 default.list.key.serde.inner = null 
 default.list.key.serde.type = null 
 default.list.value.serde.inner = null 
 default.list.value.serde.type = null 
 default.production.exception.handler = class 
org.apache.kafka.streams.errors.DefaultProductionExceptionHandler 
 default.timestamp.extractor = class 
org.apache.kafka.streams.processor.FailOnInvalidTimestamp 
 default.value.serde = null 
 deserialization.exception.handler = class 
org.apache.kafka.streams.errors.LogAndFailExceptionHandler 
 dsl.store.suppliers.class = class 
org.apache.kafka.streams.state.BuiltInDslStoreSuppliers$RocksDBDslStoreSuppliers
 
 enable.metrics.push = true 
 ensure.explicit.internal.resource.naming = true 
 group.protocol = classic 
 log.summary.interval.ms = 120000 
 max.task.idle.ms = 0 
 max.warmup.replicas = 2 
 metadata.max.age.ms = 300000 
 metadata.recovery.rebootstrap.trigger.ms = 300000 
 metadata.recovery.strategy = rebootstrap 
 metric.reporters = [org.apache.kafka.common.metrics.JmxReporter] 
 metrics.num.samples = 2 
 metrics.recording.level = INFO 
 metrics.sample.window.ms = 30000 
 num.standby.replicas = 0 
 num.stream.threads = 8 
 poll.ms = 100 
 probing.rebalance.interval.ms = 600000 
 processing.exception.handler = class 
org.apache.kafka.streams.errors.LogAndFailProcessingExceptionHandler 
 processing.guarantee = exactly_once_v2 
 processor.wrapper.class = class 
com.paywholesail.service.search.core.streams.NamedFrameProcessorWrapper 
 production.exception.handler = class 
org.apache.kafka.streams.errors.DefaultProductionExceptionHandler 
 rack.aware.assignment.non_overlap_cost = null 
 rack.aware.assignment.strategy = balance_subtopology 
 rack.aware.assignment.tags = [zone] 
 rack.aware.assignment.traffic_cost = null 
 receive.buffer.bytes = 32768 
 reconnect.backoff.max.ms = 1000 
 reconnect.backoff.ms = 50 
 repartition.purge.interval.ms = 30000 
 replication.factor = 2 
 request.timeout.ms = 40000 
 retry.backoff.ms = 100 
 rocksdb.config.setter = class 
com.paywholesail.service.search.core.streams.SearchRocksDBConfigSetter 
 security.protocol = SASL_PLAINTEXT 
 send.buffer.bytes = 131072 
 state.cleanup.delay.ms = 600000 
 state.dir = /volumes/search-index/40-streams-data 
 statestore.cache.max.bytes = 21474836480 
 task.assignor.class = null 
 task.timeout.ms = 300000 
 topology.optimization = all 
 upgrade.from = null 
 window.size.ms = null 
 windowed.inner.class.serde = null 
 windowstore.changelog.additional.retention.ms = 86400000

Almost immediately, the application starts spamming logs like:

[Producer 
clientId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-StreamThread-5-producer,
 
transactionalId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-5]
 Got error produce response with correlation id 12 on topic-partition 
search-indexing-prod-scs-tmp-145-current-date-per-tz-eoc-table-changelog-28, 
retrying (2147483646 attempts left). Error: CONCURRENT_TRANSACTIONS
[Producer 
clientId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-StreamThread-8-producer,
 
transactionalId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-8]
 Got error produce response with correlation id 15 on topic-partition 
search-indexing-prod-scs-tmp-145-indexed-tag-group-by-company-repartition-13, 
retrying (2147483646 attempts left). Error: CONCURRENT_TRANSACTIONS
[Producer 
clientId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-StreamThread-8-producer,
 
transactionalId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-8]
 Got error produce response with correlation id 15 on topic-partition 
search-indexing-prod-scs-tmp-145-indexed-tag-group-by-company-repartition-2, 
retrying (2147483646 attempts left). Error: CONCURRENT_TRANSACTIONS
[Producer 
clientId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-StreamThread-8-producer,
 
transactionalId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-8]
 Got error produce response with correlation id 15 on topic-partition 
search-indexing-prod-scs-tmp-145-indexed-tag-group-by-company-repartition-6, 
retrying (2147483646 attempts left). Error: CONCURRENT_TRANSACTIONS
[Producer 
clientId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-StreamThread-8-producer,
 
transactionalId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-8]
 Got error produce response with correlation id 15 on topic-partition 
search-indexing-prod-scs-tmp-145-user-access-group-by-company-repartition-13, 
retrying (2147483646 attempts left). Error: CONCURRENT_TRANSACTIONS
[Producer 
clientId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-StreamThread-7-producer,
 
transactionalId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-7]
 Got error produce response with correlation id 10 on topic-partition 
search-indexing-prod-scs-tmp-145-payment-account-share-materialized-changelog-14,
 retrying (2147483646 attempts left). Error: CONCURRENT_TRANSACTIONS

(continues about 60,000 times)

After about 20 minutes of this, it gives up:
org.apache.kafka.streams.errors.StreamsException: Error encountered trying to 
commit a transaction 
  at 
org.apache.kafka.streams.processor.internals.StreamsProducer.commitTransaction(StreamsProducer.java:262)
  at 
org.apache.kafka.streams.processor.internals.TaskExecutor.commitOffsetsOrTransaction(TaskExecutor.java:184)
 
  at 
org.apache.kafka.streams.processor.internals.TaskExecutor.commitTasksAndMaybeUpdateCommittableOffsets(TaskExecutor.java:152)
 at 
org.apache.kafka.streams.processor.internals.TaskManager.commitTasksAndMaybeUpdateCommittableOffsets(TaskManager.java:1957)
  at 
org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1924)
  at 
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1782)
  at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:1276)
  at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:926)
  at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:886)
    Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The 
producer attempted a transactional operation in an invalid state.

Around the same time, the 3 brokers (also 4.1.0) starts logging lots of:
ERROR [ReplicaManager broker=102] Error processing append operation on 
partition 
x4ERcnG9RqmG2vdJvboRKw:search-indexing-prod-scs-tmp-145-notification-repartition-3
 (kafka.server.ReplicaManager)
org.apache.kafka.common.errors.OutOfOrderSequenceException: Invalid sequence 
number for producer 370010 at offset 2827 in partition 
search-indexing-prod-scs-tmp-145-notification-repartition-29: 1 (incoming seq. 
number). Expected sequence 0 for transactions v2 idempotent producer with no 
existing state
[2025-11-08 00:06:56,628] ERROR [ReplicaManager broker=102] Error processing 
append operation on partition 
x4ERcnG9RqmG2vdJvboRKw:search-indexing-prod-scs-tmp-145-notification-repartition-17
 (kafka.server.ReplicaManager).
org.apache.kafka.common.errors.OutOfOrderSequenceException: Invalid sequence 
number for producer 370012 at offset 2742 in partition 
search-indexing-prod-scs-tmp-145-notification-repartition-17: 1 (incoming seq. 
number). Expected sequence 0 for transactions v2 idempotent producer with no 
existing state.
[2025-11-08 00:06:57,750] ERROR [ReplicaManager broker=100] Error processing 
append operation on partition 
x4ERcnG9RqmG2vdJvboRKw:search-indexing-prod-scs-tmp-145-notification-repartition-20
 (kafka.server.ReplicaManager)
org.apache.kafka.common.errors.OutOfOrderSequenceException: Out of order 
sequence number for producer 368019 at offset 3196 in partition 
search-indexing-prod-scs-tmp-145-notification-repartition-20: 2 (incoming seq. 
number), 0 (current end sequence number)

This is our first experience with Streams EOSv2 and Kafka transactions in 
general, so I hope I didn't configure something horribly badly, but I am not 
sure where we've gone wrong.
If anyone can offer any advice that'd be much appreciated.

Thanks,
Steven



Reply via email to