Thanks for the response John. I'll see if I can track down that ticket. And thank you for your suggestion; I will try that once I get back to the code. That is an approach I had not considered.
On Fri, Feb 15, 2019 at 10:16 PM John Roesler <j...@confluent.io> wrote: > Hi Trey, > > I think there is a ticket open requesting to be able to re-use the source > topic, so I don't think it's an intentional restriction, just a consequence > of the way the code is structured at the moment. > > Is it sufficient to send the update to "calls" and "answered-calls" at the > same time? You could do something like: > > val answeredCalls = > actions.filter { _, action -> action == Actions.ANSWER } > .join(callsTable) { id, call -> call } // now a KTable > .mapValues { call -> doAnswer(call) } // actual answer implementation > > answeredCalls.to("calls"); > answeredCalls.to("answered-calls"); > > Does that help? > > -John > > > On Fri, Feb 15, 2019 at 4:18 PM Trey Hutcheson <trey.hutche...@gmail.com> > wrote: > > > For context, imagine I'm building an IVR simulator. Desired workflow: > > > > IVR knows about a ringing call. IVR receives an IPC instruction to answer > > the call. That instruction is realized by sending a message > {action=ANSWER} > > to the "actions" topic. > > > > At this point, the system needs to do two things: actually answer the > call, > > and then start a recording of the call, in that order. Because of > > implementation peculiarities external to the system, assume that these > two > > things cannot be executed together atomically. > > > > So this is what I'd *like* to do (warning, kotlin code, types omitted for > > brevity): > > > > val callsTable = builder.table("calls", ...) > > val actions = builder.stream("actions", ..) > > > > actions.filter { _, action -> action == Actions.ANSWER } > > .join(callsTable) { id, call -> call } // now a KTable > > .mapValues { call -> doAnswer(call) } // actual answer implementation > > .through("calls") // persist in state store > > .to("answered-calls") // let other actors in the system know the call > was > > answered, such as start the recording process > > > > Now in the current version of the streams library (2.1.0), that little > bit > > of topology throws an exception when trying to build it, with a message > > that a source has already been defined for the "calls" topic. So > apparently > > the call to .through materializes a view and defines a source, which was > > already defined in the call to builder.table("calls")? > > > > So how do I do what I want? This sequence needs to happen in order. I > have > > tried .branch, but that just ends up in a race condition (the thing doing > > to recording has to join to calls table and filter that the call has been > > answered). > > > > I could create a custom processor that forwards to both sinks - but does > > that really solve the problem? And if it did, how do I create a > > KafkaStreams instance from a combination of StreamBuilder and Topology? > > > > Thanks for the insight > > Trey > > >