Ok, this is the kotlin implementation of the writeThrough functionality.
It's not very sophisticated and performs no guards or exception handling,
but it gets the point across. I apologize if this does not format correctly:

fun <K, V> KStream<K, V>.writeThrough(storeName: String): KStream<K?, V?> {
    return this.transform( TransformerSupplier{ WriteThroughTransformer<K,
V>(storeName) }, storeName)
}

class WriteThroughTransformer<K, V>(val storeName: String) : Transformer<K,
V, KeyValue<K?, V?>> {
    lateinit var context: ProcessorContext
    lateinit var store: KeyValueStore<K, V>

    override fun transform(key: K?, value: V?): KeyValue<K?, V?> {
        store.put(key, value)
        return KeyValue(key, value)
    }

    override fun init(context: ProcessorContext) {
        this.context = context
        this.store = context.getStateStore(storeName) as KeyValueStore<K, V>
    }

    override fun close() {
    }
}

And usage:
actions.filter { _, action -> action.action == Actions.ANSWER }
    .join(callsTable) { _, call -> call }
    .mapValues { call -> doAnswer(call) }
    .writeThrough("calls-store")  // extension method; types inferred
    .to("answered-calls", Produced.with(..))


On Wed, Feb 20, 2019 at 12:33 PM Guozhang Wang <wangg...@gmail.com> wrote:

> There should be no concurrency issues since all of the processor nodes
> above should be within the same sub-topology and hence the same set of
> tasks, and a single task should be accessed by a single stream thread at
> any given time.
>
>
> Guozhang
>
> On Wed, Feb 20, 2019 at 10:30 AM Trey Hutcheson <trey.hutche...@gmail.com>
> wrote:
>
> > Yes, that is exactly correct. I *assume* I won't run into any concurrency
> > issues here - that another thread will not be writing to the same store
> for
> > the same key while this is being read.
> >
> > If there are no concurrency issues here (again, it's the same key so I
> > doubt it), then a similar approach would allow a transformer to peek into
> > the backing store and retrieve the previous value before writing the new
> > value, effectively allowing a lambda to do a comparison if necessary.
> That
> > (and my original use case) would be useful additions to the DSL api.
> >
> > On Wed, Feb 20, 2019 at 12:07 PM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > Hello Trey,
> > >
> > > Just to clarify on your solution, the "calls-store" is the same one
> > > materialized from calls table as "val callsTable =
> builder.table("calls",
> > > ... Materialized.as(.. "call-store"))". So basically you are using the
> > > transformer to update the original materialized store for the
> callsTable
> > > here, is that right?
> > >
> > >
> > > Guozhang
> > >
> > > On Tue, Feb 19, 2019 at 8:58 AM Trey Hutcheson <
> trey.hutche...@gmail.com
> > >
> > > wrote:
> > >
> > > > Ok, I have a solution - I implemented a custom Transformer
> > implementation
> > > > that simply accepts a key/value and writes it to a state store, then
> > > > returns the input values. It's a "write-through" transformer. It's
> > > > basically like a peek operation, but saves it to the backing state
> > store.
> > > > But since it's not a terminal operation, the stream can still
> perform a
> > > .to
> > > > later.
> > > >
> > > > Basically, the transformer looks like this (in pseudo code, type
> > > parameters
> > > > omitted)
> > > > class WriteThroughTransformer(val storeName: String): Transformer {
> > > >   lateinit var context: ProcessorContext
> > > >   lateinit var store: KeyValueStore
> > > >
> > > >   // initialize context and store
> > > >   override fun init() ...
> > > >
> > > >   // do a simple store.put - return inputs as KeyVvalue
> > > >   override fun transform() ...
> > > > }
> > > >
> > > > And then a handy extension function:
> > > > fun <K,V> KStream<K,V>.writeThrough(storeName: String): KStream<K?,
> > V?> =
> > > > this.transform( // supplier code)
> > > >
> > > > And then finally, the stream chain looks like this:
> > > > actions.filter { _, action -> action.action == Actions.ANSWER }
> > > >     .join(callsTable) { _, call -> call }
> > > >     .mapValues { call -> doAnswer(call) }
> > > >     .writeThrough("calls-store")
> > > >     .to("answered-calls")
> > > >
> > > > This approach doesn't actually send a message to the calls topic, but
> > > > that's ok. The stream listening on answered-calls will eventually do
> > that
> > > > anyway.
> > > >
> > > > On Mon, Feb 18, 2019 at 2:02 PM Trey Hutcheson <
> > trey.hutche...@gmail.com
> > > >
> > > > wrote:
> > > >
> > > > > Ok, I tried your suggestion (along with several variations) and it
> > just
> > > > > doesn't work as I'd hoped. The constructed topology emits messages
> > from
> > > > > .mapValues call to both "calls" and "answered-calls" sinks, with
> the
> > > two
> > > > > sinks being sibling to one another, which still causes a race
> > > condition.
> > > > >
> > > > > So what are my options? I could spin off an asynchronous/delayed
> > action
> > > > to
> > > > > send a message to the "answered-calls" topic, and just hope that
> the
> > > > > answered call had been successfully persisted to the calls backing
> > > state
> > > > > store. That seems awfully brittle to me.
> > > > >
> > > > > I could just move state outside of the streaming topology
> altogether,
> > > > > using an external state store, so I could explicitly govern
> > transaction
> > > > > demarcation and guarantee consistency. But that seems antithetical
> to
> > > > kafka
> > > > > streams in general.
> > > > >
> > > > > On Sat, Feb 16, 2019 at 3:24 PM Trey Hutcheson <
> > > trey.hutche...@gmail.com
> > > > >
> > > > > wrote:
> > > > >
> > > > >> Thanks for the response John. I'll see if I can track down that
> > > ticket.
> > > > >>
> > > > >> And thank you for your suggestion; I will try that once I get back
> > to
> > > > the
> > > > >> code. That is an approach I had not considered.
> > > > >>
> > > > >> On Fri, Feb 15, 2019 at 10:16 PM John Roesler <j...@confluent.io>
> > > > wrote:
> > > > >>
> > > > >>> Hi Trey,
> > > > >>>
> > > > >>> I think there is a ticket open requesting to be able to re-use
> the
> > > > source
> > > > >>> topic, so I don't think it's an intentional restriction, just a
> > > > >>> consequence
> > > > >>> of the way the code is structured at the moment.
> > > > >>>
> > > > >>> Is it sufficient to send the update to "calls" and
> "answered-calls"
> > > at
> > > > >>> the
> > > > >>> same time? You could do something like:
> > > > >>>
> > > > >>> val answeredCalls =
> > > > >>>  actions.filter { _, action -> action == Actions.ANSWER }
> > > > >>>   .join(callsTable) { id, call -> call }  // now a KTable
> > > > >>>   .mapValues { call -> doAnswer(call) } // actual answer
> > > implementation
> > > > >>>
> > > > >>> answeredCalls.to("calls");
> > > > >>> answeredCalls.to("answered-calls");
> > > > >>>
> > > > >>> Does that help?
> > > > >>>
> > > > >>> -John
> > > > >>>
> > > > >>>
> > > > >>> On Fri, Feb 15, 2019 at 4:18 PM Trey Hutcheson <
> > > > trey.hutche...@gmail.com
> > > > >>> >
> > > > >>> wrote:
> > > > >>>
> > > > >>> > For context, imagine I'm building an IVR simulator. Desired
> > > workflow:
> > > > >>> >
> > > > >>> > IVR knows about a ringing call. IVR receives an IPC instruction
> > to
> > > > >>> answer
> > > > >>> > the call. That instruction is realized by sending a message
> > > > >>> {action=ANSWER}
> > > > >>> > to the "actions" topic.
> > > > >>> >
> > > > >>> > At this point, the system needs to do two things: actually
> answer
> > > the
> > > > >>> call,
> > > > >>> > and then start a recording of the call, in that order. Because
> of
> > > > >>> > implementation peculiarities external to the system, assume
> that
> > > > these
> > > > >>> two
> > > > >>> > things cannot be executed together atomically.
> > > > >>> >
> > > > >>> > So this is what I'd *like* to do (warning, kotlin code, types
> > > omitted
> > > > >>> for
> > > > >>> > brevity):
> > > > >>> >
> > > > >>> > val callsTable = builder.table("calls", ...)
> > > > >>> > val actions = builder.stream("actions", ..)
> > > > >>> >
> > > > >>> > actions.filter { _, action -> action == Actions.ANSWER }
> > > > >>> >   .join(callsTable) { id, call -> call }  // now a KTable
> > > > >>> >   .mapValues { call -> doAnswer(call) } // actual answer
> > > > implementation
> > > > >>> >   .through("calls") // persist in state store
> > > > >>> >   .to("answered-calls") // let other actors in the system know
> > the
> > > > >>> call was
> > > > >>> > answered, such as start the recording process
> > > > >>> >
> > > > >>> > Now in the current version of the streams library (2.1.0), that
> > > > little
> > > > >>> bit
> > > > >>> > of topology throws an exception when trying to build it, with a
> > > > message
> > > > >>> > that a source has already been defined for the "calls" topic.
> So
> > > > >>> apparently
> > > > >>> > the call to .through materializes a view and defines a source,
> > > which
> > > > >>> was
> > > > >>> > already defined in the call to builder.table("calls")?
> > > > >>> >
> > > > >>> > So how do I do what I want? This sequence needs to happen in
> > > order. I
> > > > >>> have
> > > > >>> > tried .branch, but that just ends up in a race condition (the
> > thing
> > > > >>> doing
> > > > >>> > to recording has to join to calls table and filter that the
> call
> > > has
> > > > >>> been
> > > > >>> > answered).
> > > > >>> >
> > > > >>> > I could create a custom processor that forwards to both sinks -
> > but
> > > > >>> does
> > > > >>> > that really solve the problem? And if it did, how do I create a
> > > > >>> > KafkaStreams instance from a combination of StreamBuilder and
> > > > Topology?
> > > > >>> >
> > > > >>> > Thanks for the insight
> > > > >>> > Trey
> > > > >>> >
> > > > >>>
> > > > >>
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
> --
> -- Guozhang
>

Reply via email to