Ok, I tried your suggestion (along with several variations) and it just
doesn't work as I'd hoped. The constructed topology emits messages from
.mapValues call to both "calls" and "answered-calls" sinks, with the two
sinks being sibling to one another, which still causes a race condition.

So what are my options? I could spin off an asynchronous/delayed action to
send a message to the "answered-calls" topic, and just hope that the
answered call had been successfully persisted to the calls backing state
store. That seems awfully brittle to me.

I could just move state outside of the streaming topology altogether, using
an external state store, so I could explicitly govern transaction
demarcation and guarantee consistency. But that seems antithetical to kafka
streams in general.

On Sat, Feb 16, 2019 at 3:24 PM Trey Hutcheson <trey.hutche...@gmail.com>
wrote:

> Thanks for the response John. I'll see if I can track down that ticket.
>
> And thank you for your suggestion; I will try that once I get back to the
> code. That is an approach I had not considered.
>
> On Fri, Feb 15, 2019 at 10:16 PM John Roesler <j...@confluent.io> wrote:
>
>> Hi Trey,
>>
>> I think there is a ticket open requesting to be able to re-use the source
>> topic, so I don't think it's an intentional restriction, just a
>> consequence
>> of the way the code is structured at the moment.
>>
>> Is it sufficient to send the update to "calls" and "answered-calls" at the
>> same time? You could do something like:
>>
>> val answeredCalls =
>>  actions.filter { _, action -> action == Actions.ANSWER }
>>   .join(callsTable) { id, call -> call }  // now a KTable
>>   .mapValues { call -> doAnswer(call) } // actual answer implementation
>>
>> answeredCalls.to("calls");
>> answeredCalls.to("answered-calls");
>>
>> Does that help?
>>
>> -John
>>
>>
>> On Fri, Feb 15, 2019 at 4:18 PM Trey Hutcheson <trey.hutche...@gmail.com>
>> wrote:
>>
>> > For context, imagine I'm building an IVR simulator. Desired workflow:
>> >
>> > IVR knows about a ringing call. IVR receives an IPC instruction to
>> answer
>> > the call. That instruction is realized by sending a message
>> {action=ANSWER}
>> > to the "actions" topic.
>> >
>> > At this point, the system needs to do two things: actually answer the
>> call,
>> > and then start a recording of the call, in that order. Because of
>> > implementation peculiarities external to the system, assume that these
>> two
>> > things cannot be executed together atomically.
>> >
>> > So this is what I'd *like* to do (warning, kotlin code, types omitted
>> for
>> > brevity):
>> >
>> > val callsTable = builder.table("calls", ...)
>> > val actions = builder.stream("actions", ..)
>> >
>> > actions.filter { _, action -> action == Actions.ANSWER }
>> >   .join(callsTable) { id, call -> call }  // now a KTable
>> >   .mapValues { call -> doAnswer(call) } // actual answer implementation
>> >   .through("calls") // persist in state store
>> >   .to("answered-calls") // let other actors in the system know the call
>> was
>> > answered, such as start the recording process
>> >
>> > Now in the current version of the streams library (2.1.0), that little
>> bit
>> > of topology throws an exception when trying to build it, with a message
>> > that a source has already been defined for the "calls" topic. So
>> apparently
>> > the call to .through materializes a view and defines a source, which was
>> > already defined in the call to builder.table("calls")?
>> >
>> > So how do I do what I want? This sequence needs to happen in order. I
>> have
>> > tried .branch, but that just ends up in a race condition (the thing
>> doing
>> > to recording has to join to calls table and filter that the call has
>> been
>> > answered).
>> >
>> > I could create a custom processor that forwards to both sinks - but does
>> > that really solve the problem? And if it did, how do I create a
>> > KafkaStreams instance from a combination of StreamBuilder and Topology?
>> >
>> > Thanks for the insight
>> > Trey
>> >
>>
>

Reply via email to