Ok, this is the kotlin implementation of the writeThrough functionality. It's not very sophisticated and performs no guards or exception handling, but it gets the point across. I apologize if this does not format correctly:
fun <K, V> KStream<K, V>.writeThrough(storeName: String): KStream<K?, V?> { return this.transform( TransformerSupplier{ WriteThroughTransformer<K, V>(storeName) }, storeName) } class WriteThroughTransformer<K, V>(val storeName: String) : Transformer<K, V, KeyValue<K?, V?>> { lateinit var context: ProcessorContext lateinit var store: KeyValueStore<K, V> override fun transform(key: K?, value: V?): KeyValue<K?, V?> { store.put(key, value) return KeyValue(key, value) } override fun init(context: ProcessorContext) { this.context = context this.store = context.getStateStore(storeName) as KeyValueStore<K, V> } override fun close() { } } And usage: actions.filter { _, action -> action.action == Actions.ANSWER } .join(callsTable) { _, call -> call } .mapValues { call -> doAnswer(call) } .writeThrough("calls-store") // extension method; types inferred .to("answered-calls", Produced.with(..)) On Wed, Feb 20, 2019 at 12:33 PM Guozhang Wang <wangg...@gmail.com> wrote: > There should be no concurrency issues since all of the processor nodes > above should be within the same sub-topology and hence the same set of > tasks, and a single task should be accessed by a single stream thread at > any given time. > > > Guozhang > > On Wed, Feb 20, 2019 at 10:30 AM Trey Hutcheson <trey.hutche...@gmail.com> > wrote: > > > Yes, that is exactly correct. I *assume* I won't run into any concurrency > > issues here - that another thread will not be writing to the same store > for > > the same key while this is being read. > > > > If there are no concurrency issues here (again, it's the same key so I > > doubt it), then a similar approach would allow a transformer to peek into > > the backing store and retrieve the previous value before writing the new > > value, effectively allowing a lambda to do a comparison if necessary. > That > > (and my original use case) would be useful additions to the DSL api. > > > > On Wed, Feb 20, 2019 at 12:07 PM Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > Hello Trey, > > > > > > Just to clarify on your solution, the "calls-store" is the same one > > > materialized from calls table as "val callsTable = > builder.table("calls", > > > ... Materialized.as(.. "call-store"))". So basically you are using the > > > transformer to update the original materialized store for the > callsTable > > > here, is that right? > > > > > > > > > Guozhang > > > > > > On Tue, Feb 19, 2019 at 8:58 AM Trey Hutcheson < > trey.hutche...@gmail.com > > > > > > wrote: > > > > > > > Ok, I have a solution - I implemented a custom Transformer > > implementation > > > > that simply accepts a key/value and writes it to a state store, then > > > > returns the input values. It's a "write-through" transformer. It's > > > > basically like a peek operation, but saves it to the backing state > > store. > > > > But since it's not a terminal operation, the stream can still > perform a > > > .to > > > > later. > > > > > > > > Basically, the transformer looks like this (in pseudo code, type > > > parameters > > > > omitted) > > > > class WriteThroughTransformer(val storeName: String): Transformer { > > > > lateinit var context: ProcessorContext > > > > lateinit var store: KeyValueStore > > > > > > > > // initialize context and store > > > > override fun init() ... > > > > > > > > // do a simple store.put - return inputs as KeyVvalue > > > > override fun transform() ... > > > > } > > > > > > > > And then a handy extension function: > > > > fun <K,V> KStream<K,V>.writeThrough(storeName: String): KStream<K?, > > V?> = > > > > this.transform( // supplier code) > > > > > > > > And then finally, the stream chain looks like this: > > > > actions.filter { _, action -> action.action == Actions.ANSWER } > > > > .join(callsTable) { _, call -> call } > > > > .mapValues { call -> doAnswer(call) } > > > > .writeThrough("calls-store") > > > > .to("answered-calls") > > > > > > > > This approach doesn't actually send a message to the calls topic, but > > > > that's ok. The stream listening on answered-calls will eventually do > > that > > > > anyway. > > > > > > > > On Mon, Feb 18, 2019 at 2:02 PM Trey Hutcheson < > > trey.hutche...@gmail.com > > > > > > > > wrote: > > > > > > > > > Ok, I tried your suggestion (along with several variations) and it > > just > > > > > doesn't work as I'd hoped. The constructed topology emits messages > > from > > > > > .mapValues call to both "calls" and "answered-calls" sinks, with > the > > > two > > > > > sinks being sibling to one another, which still causes a race > > > condition. > > > > > > > > > > So what are my options? I could spin off an asynchronous/delayed > > action > > > > to > > > > > send a message to the "answered-calls" topic, and just hope that > the > > > > > answered call had been successfully persisted to the calls backing > > > state > > > > > store. That seems awfully brittle to me. > > > > > > > > > > I could just move state outside of the streaming topology > altogether, > > > > > using an external state store, so I could explicitly govern > > transaction > > > > > demarcation and guarantee consistency. But that seems antithetical > to > > > > kafka > > > > > streams in general. > > > > > > > > > > On Sat, Feb 16, 2019 at 3:24 PM Trey Hutcheson < > > > trey.hutche...@gmail.com > > > > > > > > > > wrote: > > > > > > > > > >> Thanks for the response John. I'll see if I can track down that > > > ticket. > > > > >> > > > > >> And thank you for your suggestion; I will try that once I get back > > to > > > > the > > > > >> code. That is an approach I had not considered. > > > > >> > > > > >> On Fri, Feb 15, 2019 at 10:16 PM John Roesler <j...@confluent.io> > > > > wrote: > > > > >> > > > > >>> Hi Trey, > > > > >>> > > > > >>> I think there is a ticket open requesting to be able to re-use > the > > > > source > > > > >>> topic, so I don't think it's an intentional restriction, just a > > > > >>> consequence > > > > >>> of the way the code is structured at the moment. > > > > >>> > > > > >>> Is it sufficient to send the update to "calls" and > "answered-calls" > > > at > > > > >>> the > > > > >>> same time? You could do something like: > > > > >>> > > > > >>> val answeredCalls = > > > > >>> actions.filter { _, action -> action == Actions.ANSWER } > > > > >>> .join(callsTable) { id, call -> call } // now a KTable > > > > >>> .mapValues { call -> doAnswer(call) } // actual answer > > > implementation > > > > >>> > > > > >>> answeredCalls.to("calls"); > > > > >>> answeredCalls.to("answered-calls"); > > > > >>> > > > > >>> Does that help? > > > > >>> > > > > >>> -John > > > > >>> > > > > >>> > > > > >>> On Fri, Feb 15, 2019 at 4:18 PM Trey Hutcheson < > > > > trey.hutche...@gmail.com > > > > >>> > > > > > >>> wrote: > > > > >>> > > > > >>> > For context, imagine I'm building an IVR simulator. Desired > > > workflow: > > > > >>> > > > > > >>> > IVR knows about a ringing call. IVR receives an IPC instruction > > to > > > > >>> answer > > > > >>> > the call. That instruction is realized by sending a message > > > > >>> {action=ANSWER} > > > > >>> > to the "actions" topic. > > > > >>> > > > > > >>> > At this point, the system needs to do two things: actually > answer > > > the > > > > >>> call, > > > > >>> > and then start a recording of the call, in that order. Because > of > > > > >>> > implementation peculiarities external to the system, assume > that > > > > these > > > > >>> two > > > > >>> > things cannot be executed together atomically. > > > > >>> > > > > > >>> > So this is what I'd *like* to do (warning, kotlin code, types > > > omitted > > > > >>> for > > > > >>> > brevity): > > > > >>> > > > > > >>> > val callsTable = builder.table("calls", ...) > > > > >>> > val actions = builder.stream("actions", ..) > > > > >>> > > > > > >>> > actions.filter { _, action -> action == Actions.ANSWER } > > > > >>> > .join(callsTable) { id, call -> call } // now a KTable > > > > >>> > .mapValues { call -> doAnswer(call) } // actual answer > > > > implementation > > > > >>> > .through("calls") // persist in state store > > > > >>> > .to("answered-calls") // let other actors in the system know > > the > > > > >>> call was > > > > >>> > answered, such as start the recording process > > > > >>> > > > > > >>> > Now in the current version of the streams library (2.1.0), that > > > > little > > > > >>> bit > > > > >>> > of topology throws an exception when trying to build it, with a > > > > message > > > > >>> > that a source has already been defined for the "calls" topic. > So > > > > >>> apparently > > > > >>> > the call to .through materializes a view and defines a source, > > > which > > > > >>> was > > > > >>> > already defined in the call to builder.table("calls")? > > > > >>> > > > > > >>> > So how do I do what I want? This sequence needs to happen in > > > order. I > > > > >>> have > > > > >>> > tried .branch, but that just ends up in a race condition (the > > thing > > > > >>> doing > > > > >>> > to recording has to join to calls table and filter that the > call > > > has > > > > >>> been > > > > >>> > answered). > > > > >>> > > > > > >>> > I could create a custom processor that forwards to both sinks - > > but > > > > >>> does > > > > >>> > that really solve the problem? And if it did, how do I create a > > > > >>> > KafkaStreams instance from a combination of StreamBuilder and > > > > Topology? > > > > >>> > > > > > >>> > Thanks for the insight > > > > >>> > Trey > > > > >>> > > > > > >>> > > > > >> > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > -- > -- Guozhang >