There should be no concurrency issues since all of the processor nodes above should be within the same sub-topology and hence the same set of tasks, and a single task should be accessed by a single stream thread at any given time.
Guozhang On Wed, Feb 20, 2019 at 10:30 AM Trey Hutcheson <trey.hutche...@gmail.com> wrote: > Yes, that is exactly correct. I *assume* I won't run into any concurrency > issues here - that another thread will not be writing to the same store for > the same key while this is being read. > > If there are no concurrency issues here (again, it's the same key so I > doubt it), then a similar approach would allow a transformer to peek into > the backing store and retrieve the previous value before writing the new > value, effectively allowing a lambda to do a comparison if necessary. That > (and my original use case) would be useful additions to the DSL api. > > On Wed, Feb 20, 2019 at 12:07 PM Guozhang Wang <wangg...@gmail.com> wrote: > > > Hello Trey, > > > > Just to clarify on your solution, the "calls-store" is the same one > > materialized from calls table as "val callsTable = builder.table("calls", > > ... Materialized.as(.. "call-store"))". So basically you are using the > > transformer to update the original materialized store for the callsTable > > here, is that right? > > > > > > Guozhang > > > > On Tue, Feb 19, 2019 at 8:58 AM Trey Hutcheson <trey.hutche...@gmail.com > > > > wrote: > > > > > Ok, I have a solution - I implemented a custom Transformer > implementation > > > that simply accepts a key/value and writes it to a state store, then > > > returns the input values. It's a "write-through" transformer. It's > > > basically like a peek operation, but saves it to the backing state > store. > > > But since it's not a terminal operation, the stream can still perform a > > .to > > > later. > > > > > > Basically, the transformer looks like this (in pseudo code, type > > parameters > > > omitted) > > > class WriteThroughTransformer(val storeName: String): Transformer { > > > lateinit var context: ProcessorContext > > > lateinit var store: KeyValueStore > > > > > > // initialize context and store > > > override fun init() ... > > > > > > // do a simple store.put - return inputs as KeyVvalue > > > override fun transform() ... > > > } > > > > > > And then a handy extension function: > > > fun <K,V> KStream<K,V>.writeThrough(storeName: String): KStream<K?, > V?> = > > > this.transform( // supplier code) > > > > > > And then finally, the stream chain looks like this: > > > actions.filter { _, action -> action.action == Actions.ANSWER } > > > .join(callsTable) { _, call -> call } > > > .mapValues { call -> doAnswer(call) } > > > .writeThrough("calls-store") > > > .to("answered-calls") > > > > > > This approach doesn't actually send a message to the calls topic, but > > > that's ok. The stream listening on answered-calls will eventually do > that > > > anyway. > > > > > > On Mon, Feb 18, 2019 at 2:02 PM Trey Hutcheson < > trey.hutche...@gmail.com > > > > > > wrote: > > > > > > > Ok, I tried your suggestion (along with several variations) and it > just > > > > doesn't work as I'd hoped. The constructed topology emits messages > from > > > > .mapValues call to both "calls" and "answered-calls" sinks, with the > > two > > > > sinks being sibling to one another, which still causes a race > > condition. > > > > > > > > So what are my options? I could spin off an asynchronous/delayed > action > > > to > > > > send a message to the "answered-calls" topic, and just hope that the > > > > answered call had been successfully persisted to the calls backing > > state > > > > store. That seems awfully brittle to me. > > > > > > > > I could just move state outside of the streaming topology altogether, > > > > using an external state store, so I could explicitly govern > transaction > > > > demarcation and guarantee consistency. But that seems antithetical to > > > kafka > > > > streams in general. > > > > > > > > On Sat, Feb 16, 2019 at 3:24 PM Trey Hutcheson < > > trey.hutche...@gmail.com > > > > > > > > wrote: > > > > > > > >> Thanks for the response John. I'll see if I can track down that > > ticket. > > > >> > > > >> And thank you for your suggestion; I will try that once I get back > to > > > the > > > >> code. That is an approach I had not considered. > > > >> > > > >> On Fri, Feb 15, 2019 at 10:16 PM John Roesler <j...@confluent.io> > > > wrote: > > > >> > > > >>> Hi Trey, > > > >>> > > > >>> I think there is a ticket open requesting to be able to re-use the > > > source > > > >>> topic, so I don't think it's an intentional restriction, just a > > > >>> consequence > > > >>> of the way the code is structured at the moment. > > > >>> > > > >>> Is it sufficient to send the update to "calls" and "answered-calls" > > at > > > >>> the > > > >>> same time? You could do something like: > > > >>> > > > >>> val answeredCalls = > > > >>> actions.filter { _, action -> action == Actions.ANSWER } > > > >>> .join(callsTable) { id, call -> call } // now a KTable > > > >>> .mapValues { call -> doAnswer(call) } // actual answer > > implementation > > > >>> > > > >>> answeredCalls.to("calls"); > > > >>> answeredCalls.to("answered-calls"); > > > >>> > > > >>> Does that help? > > > >>> > > > >>> -John > > > >>> > > > >>> > > > >>> On Fri, Feb 15, 2019 at 4:18 PM Trey Hutcheson < > > > trey.hutche...@gmail.com > > > >>> > > > > >>> wrote: > > > >>> > > > >>> > For context, imagine I'm building an IVR simulator. Desired > > workflow: > > > >>> > > > > >>> > IVR knows about a ringing call. IVR receives an IPC instruction > to > > > >>> answer > > > >>> > the call. That instruction is realized by sending a message > > > >>> {action=ANSWER} > > > >>> > to the "actions" topic. > > > >>> > > > > >>> > At this point, the system needs to do two things: actually answer > > the > > > >>> call, > > > >>> > and then start a recording of the call, in that order. Because of > > > >>> > implementation peculiarities external to the system, assume that > > > these > > > >>> two > > > >>> > things cannot be executed together atomically. > > > >>> > > > > >>> > So this is what I'd *like* to do (warning, kotlin code, types > > omitted > > > >>> for > > > >>> > brevity): > > > >>> > > > > >>> > val callsTable = builder.table("calls", ...) > > > >>> > val actions = builder.stream("actions", ..) > > > >>> > > > > >>> > actions.filter { _, action -> action == Actions.ANSWER } > > > >>> > .join(callsTable) { id, call -> call } // now a KTable > > > >>> > .mapValues { call -> doAnswer(call) } // actual answer > > > implementation > > > >>> > .through("calls") // persist in state store > > > >>> > .to("answered-calls") // let other actors in the system know > the > > > >>> call was > > > >>> > answered, such as start the recording process > > > >>> > > > > >>> > Now in the current version of the streams library (2.1.0), that > > > little > > > >>> bit > > > >>> > of topology throws an exception when trying to build it, with a > > > message > > > >>> > that a source has already been defined for the "calls" topic. So > > > >>> apparently > > > >>> > the call to .through materializes a view and defines a source, > > which > > > >>> was > > > >>> > already defined in the call to builder.table("calls")? > > > >>> > > > > >>> > So how do I do what I want? This sequence needs to happen in > > order. I > > > >>> have > > > >>> > tried .branch, but that just ends up in a race condition (the > thing > > > >>> doing > > > >>> > to recording has to join to calls table and filter that the call > > has > > > >>> been > > > >>> > answered). > > > >>> > > > > >>> > I could create a custom processor that forwards to both sinks - > but > > > >>> does > > > >>> > that really solve the problem? And if it did, how do I create a > > > >>> > KafkaStreams instance from a combination of StreamBuilder and > > > Topology? > > > >>> > > > > >>> > Thanks for the insight > > > >>> > Trey > > > >>> > > > > >>> > > > >> > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang