Ok, I have a solution - I implemented a custom Transformer implementation that simply accepts a key/value and writes it to a state store, then returns the input values. It's a "write-through" transformer. It's basically like a peek operation, but saves it to the backing state store. But since it's not a terminal operation, the stream can still perform a .to later.
Basically, the transformer looks like this (in pseudo code, type parameters omitted) class WriteThroughTransformer(val storeName: String): Transformer { lateinit var context: ProcessorContext lateinit var store: KeyValueStore // initialize context and store override fun init() ... // do a simple store.put - return inputs as KeyVvalue override fun transform() ... } And then a handy extension function: fun <K,V> KStream<K,V>.writeThrough(storeName: String): KStream<K?, V?> = this.transform( // supplier code) And then finally, the stream chain looks like this: actions.filter { _, action -> action.action == Actions.ANSWER } .join(callsTable) { _, call -> call } .mapValues { call -> doAnswer(call) } .writeThrough("calls-store") .to("answered-calls") This approach doesn't actually send a message to the calls topic, but that's ok. The stream listening on answered-calls will eventually do that anyway. On Mon, Feb 18, 2019 at 2:02 PM Trey Hutcheson <trey.hutche...@gmail.com> wrote: > Ok, I tried your suggestion (along with several variations) and it just > doesn't work as I'd hoped. The constructed topology emits messages from > .mapValues call to both "calls" and "answered-calls" sinks, with the two > sinks being sibling to one another, which still causes a race condition. > > So what are my options? I could spin off an asynchronous/delayed action to > send a message to the "answered-calls" topic, and just hope that the > answered call had been successfully persisted to the calls backing state > store. That seems awfully brittle to me. > > I could just move state outside of the streaming topology altogether, > using an external state store, so I could explicitly govern transaction > demarcation and guarantee consistency. But that seems antithetical to kafka > streams in general. > > On Sat, Feb 16, 2019 at 3:24 PM Trey Hutcheson <trey.hutche...@gmail.com> > wrote: > >> Thanks for the response John. I'll see if I can track down that ticket. >> >> And thank you for your suggestion; I will try that once I get back to the >> code. That is an approach I had not considered. >> >> On Fri, Feb 15, 2019 at 10:16 PM John Roesler <j...@confluent.io> wrote: >> >>> Hi Trey, >>> >>> I think there is a ticket open requesting to be able to re-use the source >>> topic, so I don't think it's an intentional restriction, just a >>> consequence >>> of the way the code is structured at the moment. >>> >>> Is it sufficient to send the update to "calls" and "answered-calls" at >>> the >>> same time? You could do something like: >>> >>> val answeredCalls = >>> actions.filter { _, action -> action == Actions.ANSWER } >>> .join(callsTable) { id, call -> call } // now a KTable >>> .mapValues { call -> doAnswer(call) } // actual answer implementation >>> >>> answeredCalls.to("calls"); >>> answeredCalls.to("answered-calls"); >>> >>> Does that help? >>> >>> -John >>> >>> >>> On Fri, Feb 15, 2019 at 4:18 PM Trey Hutcheson <trey.hutche...@gmail.com >>> > >>> wrote: >>> >>> > For context, imagine I'm building an IVR simulator. Desired workflow: >>> > >>> > IVR knows about a ringing call. IVR receives an IPC instruction to >>> answer >>> > the call. That instruction is realized by sending a message >>> {action=ANSWER} >>> > to the "actions" topic. >>> > >>> > At this point, the system needs to do two things: actually answer the >>> call, >>> > and then start a recording of the call, in that order. Because of >>> > implementation peculiarities external to the system, assume that these >>> two >>> > things cannot be executed together atomically. >>> > >>> > So this is what I'd *like* to do (warning, kotlin code, types omitted >>> for >>> > brevity): >>> > >>> > val callsTable = builder.table("calls", ...) >>> > val actions = builder.stream("actions", ..) >>> > >>> > actions.filter { _, action -> action == Actions.ANSWER } >>> > .join(callsTable) { id, call -> call } // now a KTable >>> > .mapValues { call -> doAnswer(call) } // actual answer implementation >>> > .through("calls") // persist in state store >>> > .to("answered-calls") // let other actors in the system know the >>> call was >>> > answered, such as start the recording process >>> > >>> > Now in the current version of the streams library (2.1.0), that little >>> bit >>> > of topology throws an exception when trying to build it, with a message >>> > that a source has already been defined for the "calls" topic. So >>> apparently >>> > the call to .through materializes a view and defines a source, which >>> was >>> > already defined in the call to builder.table("calls")? >>> > >>> > So how do I do what I want? This sequence needs to happen in order. I >>> have >>> > tried .branch, but that just ends up in a race condition (the thing >>> doing >>> > to recording has to join to calls table and filter that the call has >>> been >>> > answered). >>> > >>> > I could create a custom processor that forwards to both sinks - but >>> does >>> > that really solve the problem? And if it did, how do I create a >>> > KafkaStreams instance from a combination of StreamBuilder and Topology? >>> > >>> > Thanks for the insight >>> > Trey >>> > >>> >>