Ok, I have a solution - I implemented a custom Transformer implementation
that simply accepts a key/value and writes it to a state store, then
returns the input values. It's a "write-through" transformer. It's
basically like a peek operation, but saves it to the backing state store.
But since it's not a terminal operation, the stream can still perform a .to
later.

Basically, the transformer looks like this (in pseudo code, type parameters
omitted)
class WriteThroughTransformer(val storeName: String): Transformer {
  lateinit var context: ProcessorContext
  lateinit var store: KeyValueStore

  // initialize context and store
  override fun init() ...

  // do a simple store.put - return inputs as KeyVvalue
  override fun transform() ...
}

And then a handy extension function:
fun <K,V> KStream<K,V>.writeThrough(storeName: String): KStream<K?, V?> =
this.transform( // supplier code)

And then finally, the stream chain looks like this:
actions.filter { _, action -> action.action == Actions.ANSWER }
    .join(callsTable) { _, call -> call }
    .mapValues { call -> doAnswer(call) }
    .writeThrough("calls-store")
    .to("answered-calls")

This approach doesn't actually send a message to the calls topic, but
that's ok. The stream listening on answered-calls will eventually do that
anyway.

On Mon, Feb 18, 2019 at 2:02 PM Trey Hutcheson <trey.hutche...@gmail.com>
wrote:

> Ok, I tried your suggestion (along with several variations) and it just
> doesn't work as I'd hoped. The constructed topology emits messages from
> .mapValues call to both "calls" and "answered-calls" sinks, with the two
> sinks being sibling to one another, which still causes a race condition.
>
> So what are my options? I could spin off an asynchronous/delayed action to
> send a message to the "answered-calls" topic, and just hope that the
> answered call had been successfully persisted to the calls backing state
> store. That seems awfully brittle to me.
>
> I could just move state outside of the streaming topology altogether,
> using an external state store, so I could explicitly govern transaction
> demarcation and guarantee consistency. But that seems antithetical to kafka
> streams in general.
>
> On Sat, Feb 16, 2019 at 3:24 PM Trey Hutcheson <trey.hutche...@gmail.com>
> wrote:
>
>> Thanks for the response John. I'll see if I can track down that ticket.
>>
>> And thank you for your suggestion; I will try that once I get back to the
>> code. That is an approach I had not considered.
>>
>> On Fri, Feb 15, 2019 at 10:16 PM John Roesler <j...@confluent.io> wrote:
>>
>>> Hi Trey,
>>>
>>> I think there is a ticket open requesting to be able to re-use the source
>>> topic, so I don't think it's an intentional restriction, just a
>>> consequence
>>> of the way the code is structured at the moment.
>>>
>>> Is it sufficient to send the update to "calls" and "answered-calls" at
>>> the
>>> same time? You could do something like:
>>>
>>> val answeredCalls =
>>>  actions.filter { _, action -> action == Actions.ANSWER }
>>>   .join(callsTable) { id, call -> call }  // now a KTable
>>>   .mapValues { call -> doAnswer(call) } // actual answer implementation
>>>
>>> answeredCalls.to("calls");
>>> answeredCalls.to("answered-calls");
>>>
>>> Does that help?
>>>
>>> -John
>>>
>>>
>>> On Fri, Feb 15, 2019 at 4:18 PM Trey Hutcheson <trey.hutche...@gmail.com
>>> >
>>> wrote:
>>>
>>> > For context, imagine I'm building an IVR simulator. Desired workflow:
>>> >
>>> > IVR knows about a ringing call. IVR receives an IPC instruction to
>>> answer
>>> > the call. That instruction is realized by sending a message
>>> {action=ANSWER}
>>> > to the "actions" topic.
>>> >
>>> > At this point, the system needs to do two things: actually answer the
>>> call,
>>> > and then start a recording of the call, in that order. Because of
>>> > implementation peculiarities external to the system, assume that these
>>> two
>>> > things cannot be executed together atomically.
>>> >
>>> > So this is what I'd *like* to do (warning, kotlin code, types omitted
>>> for
>>> > brevity):
>>> >
>>> > val callsTable = builder.table("calls", ...)
>>> > val actions = builder.stream("actions", ..)
>>> >
>>> > actions.filter { _, action -> action == Actions.ANSWER }
>>> >   .join(callsTable) { id, call -> call }  // now a KTable
>>> >   .mapValues { call -> doAnswer(call) } // actual answer implementation
>>> >   .through("calls") // persist in state store
>>> >   .to("answered-calls") // let other actors in the system know the
>>> call was
>>> > answered, such as start the recording process
>>> >
>>> > Now in the current version of the streams library (2.1.0), that little
>>> bit
>>> > of topology throws an exception when trying to build it, with a message
>>> > that a source has already been defined for the "calls" topic. So
>>> apparently
>>> > the call to .through materializes a view and defines a source, which
>>> was
>>> > already defined in the call to builder.table("calls")?
>>> >
>>> > So how do I do what I want? This sequence needs to happen in order. I
>>> have
>>> > tried .branch, but that just ends up in a race condition (the thing
>>> doing
>>> > to recording has to join to calls table and filter that the call has
>>> been
>>> > answered).
>>> >
>>> > I could create a custom processor that forwards to both sinks - but
>>> does
>>> > that really solve the problem? And if it did, how do I create a
>>> > KafkaStreams instance from a combination of StreamBuilder and Topology?
>>> >
>>> > Thanks for the insight
>>> > Trey
>>> >
>>>
>>

Reply via email to