Hi David,

Thank you for the question.

If I can confirm, it looks like the "operations" topic is
the only input to the topology, and the topology reads the
"operations" topic joined with the "account" table and
generates a "movements" stream. It reads (and aggregates)
the "movements" stream to create the "account" table.

I think your concern is dead-on. The processing of incoming
records from either the "operations" topic or the
"movements" topic is synchronous, BUT the production of
messages to the "movements" topic and subsequent consumption
of those messages in the join is asynchronous. In other
words, you can indeed get "insufficient funds" in your
application.

Following your scenario, this can happen:

1. consume Operation(10)
1. join (balance is $0)
1. produce Movement(10)

2. consume Operation(10)
2. join (balance is $0)
2. produce Movement(10)

3. consume Movement(10) // the first one
3. update table to value $10

4. consume Operation(-20)
4. join (balance is $10) // only seen the first one
4. produce Movement(0, "insufficient funds)

5. consume Movement(10) // the second one (too late)
5. update table to value $20

6. consume Movement(0, "insufficient funds)
...

Other interleavings are also possible.

To get synchronous processing, what you want is a single
subtopology where all the data flows are internal (i.e.,
that re-entrant topic is the source of the race condition).

If you don't know about it already, you can print out your
topology and visualize it with
https://zz85.github.io/kafka-streams-viz/ .

Technically, you can actually model this as a single
aggregation:

operationStream
 .groupByKey
 .aggregate(Account(0)){ (_, operation, account) =>
  if (account.balance >= -operation.amount) {
   account.copy(balance=account.balance+operation.amount)
  } else {
   account.copy(error="Insufficient funds")
  }
 }
 .toStream.to(topicAccounts)

But if you want to decouple the "insufficient funds" error
handling from the account maintenence, you might look at a
ValueTransformer, in which you maintain the Accounts in a
key/value store and then forward an Either[Account, Error]
result, which you can then direct however you please.

Either way, maintaining the table and the balance checking
logic in a single operation guarantees you won't have race
conditions.

A final note: the reason your tests don't show the race
condition is that they are using TopologyTestDriver, which
synchronously propagates each individual input record all
the way through the topology. If you also set up a full
integration test, I suspect that you'll quickly see the race
condition surface.

I hope this helps,
-John


On Sun, 2021-01-31 at 11:37 +0100, Davide Icardi wrote:
> I'm working on a project where I want to use Kafka Streams for Event
> Sourcing.
> 
> General idea is that I have a "commands" topic/KStream, an "events"
> topic/KStream and a "snapshots" topic/KTable.
> Snapshots contains the current state of the entities. Commands are
> validated using the "snapshots" and transformed to "events".
> 
>     Group EVENTS stream by key and aggregate them to a SNAPSHOTS table.
>     Left join COMMANDS stream with the SNAPSHOTS table and output new
> EVENTS.
> 
> For example, to apply this pattern to a simple bank-account scenario I can
> have:
> - operations stream as "commands" (requests to deposit or withdraw an
> amount of money, eg. "deposit $10" => Operation(+10) )
> - movements stream as "events" (actual deposit or withdraw event, eg. "$10
> deposited" => Movement(+10) )
> - account table as a "snapshots" (account balance, eg. "$20 in account
> balance" => Account(20) )
> - account id is used as key for all topics and tables
> 
> The topology can be written like:
> 
>     case class Operation(amount: Int)
>     case class Movement(amount: Int, error: String = "")
>     case class Account(balance: Int)
> 
>     // events
>     val movementsStream = streamBuilder.stream[String,
> Movement](Config.topicMovements)
>     // snapshots
>     val accountTable = movementsStream
>       .groupByKey
>       .aggregate(Account(0)){ (_, movement, account) =>
>         account.copy(balance = account.balance + movement.amount)
>       }
>     accountTable.toStream.to(Config.topicAccounts)
>     // commands
>     val operationsStream = streamBuilder.stream[String,
> Operation](Config.topicOperations)
>     operationsStream
>       .leftJoin(accountTable) { (operation, accountOrNull) =>
>         val account = Option(accountOrNull).getOrElse(Account(0))
>         if (account.balance >= -operation.amount) {
>           Movement(operation.amount)
>         } else {
>           Movement(0, error = "insufficient funds")
>         }
>       }
>       .to(Config.topicMovements)
> 
> (see full code here:
> https://github.com/davideicardi/es4kafka/blob/master/examples/bank-account/src/main/scala/bank/StreamingPipeline.scala
> )
> 
> Now let's imagine a scenario where I deposit $ 10, then I deposit again $
> 10 and then I withdraw $ 20:
>         inOperations.pipeInput("alice", Operation(10))
>         inOperations.pipeInput("alice", Operation(10))
>         inOperations.pipeInput("alice", Operation(-20))
> 
> Can I assume that when processing the third message (withdraw $ 20) the
> account table already processed the previous two movements (10+10)?
> In other words, can I assume that:
> - if operations are valid I never receive an "insufficent funds" error
> event ?
> - in the above topology, account KTable is always updated before processing
> the next operation from KStream ?
> 
> From my tests it seems to work, but I would like to have some advice if
> this is a safe assumption.
> (see test here:
> https://github.com/davideicardi/es4kafka/blob/master/examples/bank-account/src/test/scala/bank/StreamingPipelineSpec.scala
> )
> 
> thanks
> Davide Icardi


Reply via email to