Thanks for the response John. I'll see if I can track down that ticket.

And thank you for your suggestion; I will try that once I get back to the
code. That is an approach I had not considered.

On Fri, Feb 15, 2019 at 10:16 PM John Roesler <j...@confluent.io> wrote:

> Hi Trey,
>
> I think there is a ticket open requesting to be able to re-use the source
> topic, so I don't think it's an intentional restriction, just a consequence
> of the way the code is structured at the moment.
>
> Is it sufficient to send the update to "calls" and "answered-calls" at the
> same time? You could do something like:
>
> val answeredCalls =
>  actions.filter { _, action -> action == Actions.ANSWER }
>   .join(callsTable) { id, call -> call }  // now a KTable
>   .mapValues { call -> doAnswer(call) } // actual answer implementation
>
> answeredCalls.to("calls");
> answeredCalls.to("answered-calls");
>
> Does that help?
>
> -John
>
>
> On Fri, Feb 15, 2019 at 4:18 PM Trey Hutcheson <trey.hutche...@gmail.com>
> wrote:
>
> > For context, imagine I'm building an IVR simulator. Desired workflow:
> >
> > IVR knows about a ringing call. IVR receives an IPC instruction to answer
> > the call. That instruction is realized by sending a message
> {action=ANSWER}
> > to the "actions" topic.
> >
> > At this point, the system needs to do two things: actually answer the
> call,
> > and then start a recording of the call, in that order. Because of
> > implementation peculiarities external to the system, assume that these
> two
> > things cannot be executed together atomically.
> >
> > So this is what I'd *like* to do (warning, kotlin code, types omitted for
> > brevity):
> >
> > val callsTable = builder.table("calls", ...)
> > val actions = builder.stream("actions", ..)
> >
> > actions.filter { _, action -> action == Actions.ANSWER }
> >   .join(callsTable) { id, call -> call }  // now a KTable
> >   .mapValues { call -> doAnswer(call) } // actual answer implementation
> >   .through("calls") // persist in state store
> >   .to("answered-calls") // let other actors in the system know the call
> was
> > answered, such as start the recording process
> >
> > Now in the current version of the streams library (2.1.0), that little
> bit
> > of topology throws an exception when trying to build it, with a message
> > that a source has already been defined for the "calls" topic. So
> apparently
> > the call to .through materializes a view and defines a source, which was
> > already defined in the call to builder.table("calls")?
> >
> > So how do I do what I want? This sequence needs to happen in order. I
> have
> > tried .branch, but that just ends up in a race condition (the thing doing
> > to recording has to join to calls table and filter that the call has been
> > answered).
> >
> > I could create a custom processor that forwards to both sinks - but does
> > that really solve the problem? And if it did, how do I create a
> > KafkaStreams instance from a combination of StreamBuilder and Topology?
> >
> > Thanks for the insight
> > Trey
> >
>

Reply via email to