Hi David, Thank you for the question.
If I can confirm, it looks like the "operations" topic is the only input to the topology, and the topology reads the "operations" topic joined with the "account" table and generates a "movements" stream. It reads (and aggregates) the "movements" stream to create the "account" table. I think your concern is dead-on. The processing of incoming records from either the "operations" topic or the "movements" topic is synchronous, BUT the production of messages to the "movements" topic and subsequent consumption of those messages in the join is asynchronous. In other words, you can indeed get "insufficient funds" in your application. Following your scenario, this can happen: 1. consume Operation(10) 1. join (balance is $0) 1. produce Movement(10) 2. consume Operation(10) 2. join (balance is $0) 2. produce Movement(10) 3. consume Movement(10) // the first one 3. update table to value $10 4. consume Operation(-20) 4. join (balance is $10) // only seen the first one 4. produce Movement(0, "insufficient funds) 5. consume Movement(10) // the second one (too late) 5. update table to value $20 6. consume Movement(0, "insufficient funds) ... Other interleavings are also possible. To get synchronous processing, what you want is a single subtopology where all the data flows are internal (i.e., that re-entrant topic is the source of the race condition). If you don't know about it already, you can print out your topology and visualize it with https://zz85.github.io/kafka-streams-viz/ . Technically, you can actually model this as a single aggregation: operationStream .groupByKey .aggregate(Account(0)){ (_, operation, account) => if (account.balance >= -operation.amount) { account.copy(balance=account.balance+operation.amount) } else { account.copy(error="Insufficient funds") } } .toStream.to(topicAccounts) But if you want to decouple the "insufficient funds" error handling from the account maintenence, you might look at a ValueTransformer, in which you maintain the Accounts in a key/value store and then forward an Either[Account, Error] result, which you can then direct however you please. Either way, maintaining the table and the balance checking logic in a single operation guarantees you won't have race conditions. A final note: the reason your tests don't show the race condition is that they are using TopologyTestDriver, which synchronously propagates each individual input record all the way through the topology. If you also set up a full integration test, I suspect that you'll quickly see the race condition surface. I hope this helps, -John On Sun, 2021-01-31 at 11:37 +0100, Davide Icardi wrote: > I'm working on a project where I want to use Kafka Streams for Event > Sourcing. > > General idea is that I have a "commands" topic/KStream, an "events" > topic/KStream and a "snapshots" topic/KTable. > Snapshots contains the current state of the entities. Commands are > validated using the "snapshots" and transformed to "events". > > Group EVENTS stream by key and aggregate them to a SNAPSHOTS table. > Left join COMMANDS stream with the SNAPSHOTS table and output new > EVENTS. > > For example, to apply this pattern to a simple bank-account scenario I can > have: > - operations stream as "commands" (requests to deposit or withdraw an > amount of money, eg. "deposit $10" => Operation(+10) ) > - movements stream as "events" (actual deposit or withdraw event, eg. "$10 > deposited" => Movement(+10) ) > - account table as a "snapshots" (account balance, eg. "$20 in account > balance" => Account(20) ) > - account id is used as key for all topics and tables > > The topology can be written like: > > case class Operation(amount: Int) > case class Movement(amount: Int, error: String = "") > case class Account(balance: Int) > > // events > val movementsStream = streamBuilder.stream[String, > Movement](Config.topicMovements) > // snapshots > val accountTable = movementsStream > .groupByKey > .aggregate(Account(0)){ (_, movement, account) => > account.copy(balance = account.balance + movement.amount) > } > accountTable.toStream.to(Config.topicAccounts) > // commands > val operationsStream = streamBuilder.stream[String, > Operation](Config.topicOperations) > operationsStream > .leftJoin(accountTable) { (operation, accountOrNull) => > val account = Option(accountOrNull).getOrElse(Account(0)) > if (account.balance >= -operation.amount) { > Movement(operation.amount) > } else { > Movement(0, error = "insufficient funds") > } > } > .to(Config.topicMovements) > > (see full code here: > https://github.com/davideicardi/es4kafka/blob/master/examples/bank-account/src/main/scala/bank/StreamingPipeline.scala > ) > > Now let's imagine a scenario where I deposit $ 10, then I deposit again $ > 10 and then I withdraw $ 20: > inOperations.pipeInput("alice", Operation(10)) > inOperations.pipeInput("alice", Operation(10)) > inOperations.pipeInput("alice", Operation(-20)) > > Can I assume that when processing the third message (withdraw $ 20) the > account table already processed the previous two movements (10+10)? > In other words, can I assume that: > - if operations are valid I never receive an "insufficent funds" error > event ? > - in the above topology, account KTable is always updated before processing > the next operation from KStream ? > > From my tests it seems to work, but I would like to have some advice if > this is a safe assumption. > (see test here: > https://github.com/davideicardi/es4kafka/blob/master/examples/bank-account/src/test/scala/bank/StreamingPipelineSpec.scala > ) > > thanks > Davide Icardi