Ok, I tried your suggestion (along with several variations) and it just doesn't work as I'd hoped. The constructed topology emits messages from .mapValues call to both "calls" and "answered-calls" sinks, with the two sinks being sibling to one another, which still causes a race condition.
So what are my options? I could spin off an asynchronous/delayed action to send a message to the "answered-calls" topic, and just hope that the answered call had been successfully persisted to the calls backing state store. That seems awfully brittle to me. I could just move state outside of the streaming topology altogether, using an external state store, so I could explicitly govern transaction demarcation and guarantee consistency. But that seems antithetical to kafka streams in general. On Sat, Feb 16, 2019 at 3:24 PM Trey Hutcheson <trey.hutche...@gmail.com> wrote: > Thanks for the response John. I'll see if I can track down that ticket. > > And thank you for your suggestion; I will try that once I get back to the > code. That is an approach I had not considered. > > On Fri, Feb 15, 2019 at 10:16 PM John Roesler <j...@confluent.io> wrote: > >> Hi Trey, >> >> I think there is a ticket open requesting to be able to re-use the source >> topic, so I don't think it's an intentional restriction, just a >> consequence >> of the way the code is structured at the moment. >> >> Is it sufficient to send the update to "calls" and "answered-calls" at the >> same time? You could do something like: >> >> val answeredCalls = >> actions.filter { _, action -> action == Actions.ANSWER } >> .join(callsTable) { id, call -> call } // now a KTable >> .mapValues { call -> doAnswer(call) } // actual answer implementation >> >> answeredCalls.to("calls"); >> answeredCalls.to("answered-calls"); >> >> Does that help? >> >> -John >> >> >> On Fri, Feb 15, 2019 at 4:18 PM Trey Hutcheson <trey.hutche...@gmail.com> >> wrote: >> >> > For context, imagine I'm building an IVR simulator. Desired workflow: >> > >> > IVR knows about a ringing call. IVR receives an IPC instruction to >> answer >> > the call. That instruction is realized by sending a message >> {action=ANSWER} >> > to the "actions" topic. >> > >> > At this point, the system needs to do two things: actually answer the >> call, >> > and then start a recording of the call, in that order. Because of >> > implementation peculiarities external to the system, assume that these >> two >> > things cannot be executed together atomically. >> > >> > So this is what I'd *like* to do (warning, kotlin code, types omitted >> for >> > brevity): >> > >> > val callsTable = builder.table("calls", ...) >> > val actions = builder.stream("actions", ..) >> > >> > actions.filter { _, action -> action == Actions.ANSWER } >> > .join(callsTable) { id, call -> call } // now a KTable >> > .mapValues { call -> doAnswer(call) } // actual answer implementation >> > .through("calls") // persist in state store >> > .to("answered-calls") // let other actors in the system know the call >> was >> > answered, such as start the recording process >> > >> > Now in the current version of the streams library (2.1.0), that little >> bit >> > of topology throws an exception when trying to build it, with a message >> > that a source has already been defined for the "calls" topic. So >> apparently >> > the call to .through materializes a view and defines a source, which was >> > already defined in the call to builder.table("calls")? >> > >> > So how do I do what I want? This sequence needs to happen in order. I >> have >> > tried .branch, but that just ends up in a race condition (the thing >> doing >> > to recording has to join to calls table and filter that the call has >> been >> > answered). >> > >> > I could create a custom processor that forwards to both sinks - but does >> > that really solve the problem? And if it did, how do I create a >> > KafkaStreams instance from a combination of StreamBuilder and Topology? >> > >> > Thanks for the insight >> > Trey >> > >> >