Yes, that is exactly correct. I *assume* I won't run into any concurrency issues here - that another thread will not be writing to the same store for the same key while this is being read.
If there are no concurrency issues here (again, it's the same key so I doubt it), then a similar approach would allow a transformer to peek into the backing store and retrieve the previous value before writing the new value, effectively allowing a lambda to do a comparison if necessary. That (and my original use case) would be useful additions to the DSL api. On Wed, Feb 20, 2019 at 12:07 PM Guozhang Wang <wangg...@gmail.com> wrote: > Hello Trey, > > Just to clarify on your solution, the "calls-store" is the same one > materialized from calls table as "val callsTable = builder.table("calls", > ... Materialized.as(.. "call-store"))". So basically you are using the > transformer to update the original materialized store for the callsTable > here, is that right? > > > Guozhang > > On Tue, Feb 19, 2019 at 8:58 AM Trey Hutcheson <trey.hutche...@gmail.com> > wrote: > > > Ok, I have a solution - I implemented a custom Transformer implementation > > that simply accepts a key/value and writes it to a state store, then > > returns the input values. It's a "write-through" transformer. It's > > basically like a peek operation, but saves it to the backing state store. > > But since it's not a terminal operation, the stream can still perform a > .to > > later. > > > > Basically, the transformer looks like this (in pseudo code, type > parameters > > omitted) > > class WriteThroughTransformer(val storeName: String): Transformer { > > lateinit var context: ProcessorContext > > lateinit var store: KeyValueStore > > > > // initialize context and store > > override fun init() ... > > > > // do a simple store.put - return inputs as KeyVvalue > > override fun transform() ... > > } > > > > And then a handy extension function: > > fun <K,V> KStream<K,V>.writeThrough(storeName: String): KStream<K?, V?> = > > this.transform( // supplier code) > > > > And then finally, the stream chain looks like this: > > actions.filter { _, action -> action.action == Actions.ANSWER } > > .join(callsTable) { _, call -> call } > > .mapValues { call -> doAnswer(call) } > > .writeThrough("calls-store") > > .to("answered-calls") > > > > This approach doesn't actually send a message to the calls topic, but > > that's ok. The stream listening on answered-calls will eventually do that > > anyway. > > > > On Mon, Feb 18, 2019 at 2:02 PM Trey Hutcheson <trey.hutche...@gmail.com > > > > wrote: > > > > > Ok, I tried your suggestion (along with several variations) and it just > > > doesn't work as I'd hoped. The constructed topology emits messages from > > > .mapValues call to both "calls" and "answered-calls" sinks, with the > two > > > sinks being sibling to one another, which still causes a race > condition. > > > > > > So what are my options? I could spin off an asynchronous/delayed action > > to > > > send a message to the "answered-calls" topic, and just hope that the > > > answered call had been successfully persisted to the calls backing > state > > > store. That seems awfully brittle to me. > > > > > > I could just move state outside of the streaming topology altogether, > > > using an external state store, so I could explicitly govern transaction > > > demarcation and guarantee consistency. But that seems antithetical to > > kafka > > > streams in general. > > > > > > On Sat, Feb 16, 2019 at 3:24 PM Trey Hutcheson < > trey.hutche...@gmail.com > > > > > > wrote: > > > > > >> Thanks for the response John. I'll see if I can track down that > ticket. > > >> > > >> And thank you for your suggestion; I will try that once I get back to > > the > > >> code. That is an approach I had not considered. > > >> > > >> On Fri, Feb 15, 2019 at 10:16 PM John Roesler <j...@confluent.io> > > wrote: > > >> > > >>> Hi Trey, > > >>> > > >>> I think there is a ticket open requesting to be able to re-use the > > source > > >>> topic, so I don't think it's an intentional restriction, just a > > >>> consequence > > >>> of the way the code is structured at the moment. > > >>> > > >>> Is it sufficient to send the update to "calls" and "answered-calls" > at > > >>> the > > >>> same time? You could do something like: > > >>> > > >>> val answeredCalls = > > >>> actions.filter { _, action -> action == Actions.ANSWER } > > >>> .join(callsTable) { id, call -> call } // now a KTable > > >>> .mapValues { call -> doAnswer(call) } // actual answer > implementation > > >>> > > >>> answeredCalls.to("calls"); > > >>> answeredCalls.to("answered-calls"); > > >>> > > >>> Does that help? > > >>> > > >>> -John > > >>> > > >>> > > >>> On Fri, Feb 15, 2019 at 4:18 PM Trey Hutcheson < > > trey.hutche...@gmail.com > > >>> > > > >>> wrote: > > >>> > > >>> > For context, imagine I'm building an IVR simulator. Desired > workflow: > > >>> > > > >>> > IVR knows about a ringing call. IVR receives an IPC instruction to > > >>> answer > > >>> > the call. That instruction is realized by sending a message > > >>> {action=ANSWER} > > >>> > to the "actions" topic. > > >>> > > > >>> > At this point, the system needs to do two things: actually answer > the > > >>> call, > > >>> > and then start a recording of the call, in that order. Because of > > >>> > implementation peculiarities external to the system, assume that > > these > > >>> two > > >>> > things cannot be executed together atomically. > > >>> > > > >>> > So this is what I'd *like* to do (warning, kotlin code, types > omitted > > >>> for > > >>> > brevity): > > >>> > > > >>> > val callsTable = builder.table("calls", ...) > > >>> > val actions = builder.stream("actions", ..) > > >>> > > > >>> > actions.filter { _, action -> action == Actions.ANSWER } > > >>> > .join(callsTable) { id, call -> call } // now a KTable > > >>> > .mapValues { call -> doAnswer(call) } // actual answer > > implementation > > >>> > .through("calls") // persist in state store > > >>> > .to("answered-calls") // let other actors in the system know the > > >>> call was > > >>> > answered, such as start the recording process > > >>> > > > >>> > Now in the current version of the streams library (2.1.0), that > > little > > >>> bit > > >>> > of topology throws an exception when trying to build it, with a > > message > > >>> > that a source has already been defined for the "calls" topic. So > > >>> apparently > > >>> > the call to .through materializes a view and defines a source, > which > > >>> was > > >>> > already defined in the call to builder.table("calls")? > > >>> > > > >>> > So how do I do what I want? This sequence needs to happen in > order. I > > >>> have > > >>> > tried .branch, but that just ends up in a race condition (the thing > > >>> doing > > >>> > to recording has to join to calls table and filter that the call > has > > >>> been > > >>> > answered). > > >>> > > > >>> > I could create a custom processor that forwards to both sinks - but > > >>> does > > >>> > that really solve the problem? And if it did, how do I create a > > >>> > KafkaStreams instance from a combination of StreamBuilder and > > Topology? > > >>> > > > >>> > Thanks for the insight > > >>> > Trey > > >>> > > > >>> > > >> > > > > > -- > -- Guozhang >