Thank you John for the explanation! I confirm that introducing a full integration I have reproduced the problem.
We have reviewed our pipelines using your suggestion (ValueTransformer and a state store) and now it seems to work correctly! If someone is interested here is the improved version of the same pipeline: https://github.com/davideicardi/es4kafka/blob/ca6f27a9db5e38ac029493ae4e1ddd47ade8266e/examples/bank-account/src/main/scala/bank/StreamingPipeline.scala regards Davide On Sun, Jan 31, 2021 at 9:23 PM John Roesler <[email protected]> wrote: > Hi David, > > Thank you for the question. > > If I can confirm, it looks like the "operations" topic is > the only input to the topology, and the topology reads the > "operations" topic joined with the "account" table and > generates a "movements" stream. It reads (and aggregates) > the "movements" stream to create the "account" table. > > I think your concern is dead-on. The processing of incoming > records from either the "operations" topic or the > "movements" topic is synchronous, BUT the production of > messages to the "movements" topic and subsequent consumption > of those messages in the join is asynchronous. In other > words, you can indeed get "insufficient funds" in your > application. > > Following your scenario, this can happen: > > 1. consume Operation(10) > 1. join (balance is $0) > 1. produce Movement(10) > > 2. consume Operation(10) > 2. join (balance is $0) > 2. produce Movement(10) > > 3. consume Movement(10) // the first one > 3. update table to value $10 > > 4. consume Operation(-20) > 4. join (balance is $10) // only seen the first one > 4. produce Movement(0, "insufficient funds) > > 5. consume Movement(10) // the second one (too late) > 5. update table to value $20 > > 6. consume Movement(0, "insufficient funds) > ... > > Other interleavings are also possible. > > To get synchronous processing, what you want is a single > subtopology where all the data flows are internal (i.e., > that re-entrant topic is the source of the race condition). > > If you don't know about it already, you can print out your > topology and visualize it with > https://zz85.github.io/kafka-streams-viz/ . > > Technically, you can actually model this as a single > aggregation: > > operationStream > .groupByKey > .aggregate(Account(0)){ (_, operation, account) => > if (account.balance >= -operation.amount) { > account.copy(balance=account.balance+operation.amount) > } else { > account.copy(error="Insufficient funds") > } > } > .toStream.to(topicAccounts) > > But if you want to decouple the "insufficient funds" error > handling from the account maintenence, you might look at a > ValueTransformer, in which you maintain the Accounts in a > key/value store and then forward an Either[Account, Error] > result, which you can then direct however you please. > > Either way, maintaining the table and the balance checking > logic in a single operation guarantees you won't have race > conditions. > > A final note: the reason your tests don't show the race > condition is that they are using TopologyTestDriver, which > synchronously propagates each individual input record all > the way through the topology. If you also set up a full > integration test, I suspect that you'll quickly see the race > condition surface. > > I hope this helps, > -John > > > On Sun, 2021-01-31 at 11:37 +0100, Davide Icardi wrote: > > I'm working on a project where I want to use Kafka Streams for Event > > Sourcing. > > > > General idea is that I have a "commands" topic/KStream, an "events" > > topic/KStream and a "snapshots" topic/KTable. > > Snapshots contains the current state of the entities. Commands are > > validated using the "snapshots" and transformed to "events". > > > > Group EVENTS stream by key and aggregate them to a SNAPSHOTS table. > > Left join COMMANDS stream with the SNAPSHOTS table and output new > > EVENTS. > > > > For example, to apply this pattern to a simple bank-account scenario I > can > > have: > > - operations stream as "commands" (requests to deposit or withdraw an > > amount of money, eg. "deposit $10" => Operation(+10) ) > > - movements stream as "events" (actual deposit or withdraw event, eg. > "$10 > > deposited" => Movement(+10) ) > > - account table as a "snapshots" (account balance, eg. "$20 in account > > balance" => Account(20) ) > > - account id is used as key for all topics and tables > > > > The topology can be written like: > > > > case class Operation(amount: Int) > > case class Movement(amount: Int, error: String = "") > > case class Account(balance: Int) > > > > // events > > val movementsStream = streamBuilder.stream[String, > > Movement](Config.topicMovements) > > // snapshots > > val accountTable = movementsStream > > .groupByKey > > .aggregate(Account(0)){ (_, movement, account) => > > account.copy(balance = account.balance + movement.amount) > > } > > accountTable.toStream.to(Config.topicAccounts) > > // commands > > val operationsStream = streamBuilder.stream[String, > > Operation](Config.topicOperations) > > operationsStream > > .leftJoin(accountTable) { (operation, accountOrNull) => > > val account = Option(accountOrNull).getOrElse(Account(0)) > > if (account.balance >= -operation.amount) { > > Movement(operation.amount) > > } else { > > Movement(0, error = "insufficient funds") > > } > > } > > .to(Config.topicMovements) > > > > (see full code here: > > > https://github.com/davideicardi/es4kafka/blob/master/examples/bank-account/src/main/scala/bank/StreamingPipeline.scala > > ) > > > > Now let's imagine a scenario where I deposit $ 10, then I deposit again $ > > 10 and then I withdraw $ 20: > > inOperations.pipeInput("alice", Operation(10)) > > inOperations.pipeInput("alice", Operation(10)) > > inOperations.pipeInput("alice", Operation(-20)) > > > > Can I assume that when processing the third message (withdraw $ 20) the > > account table already processed the previous two movements (10+10)? > > In other words, can I assume that: > > - if operations are valid I never receive an "insufficent funds" error > > event ? > > - in the above topology, account KTable is always updated before > processing > > the next operation from KStream ? > > > > From my tests it seems to work, but I would like to have some advice if > > this is a safe assumption. > > (see test here: > > > https://github.com/davideicardi/es4kafka/blob/master/examples/bank-account/src/test/scala/bank/StreamingPipelineSpec.scala > > ) > > > > thanks > > Davide Icardi > > >
