Hi Kyle, (cc-ing user list as well) This could be an interesting scenario. Two things to help us think through it some more: 1) it seems you attached a figure, but I cannot seem to open it. 2) what about using the low level processor API instead of the DSL as approach 3? Do you have any thoughts on that?
Thanks Eno > On 13 Apr 2017, at 11:26, Winkelman, Kyle G <kyle.winkel...@optum.com> wrote: > > Hello, > > I am wondering if there is any way to aggregate together many streams at once > to build a larger object. Example (Healthcare Domain): > I have streams of Medical, Pharmacy, and Lab claims. Key is PatientId, Value > is a different Avro Record for each stream. > I was hoping there was a way to supply a single Initializer, () -> new > Patient(), and 3 aggregators, (key, value, patient) -> > patient.add******Claim(value). > > Currently the only way that I see to do the above use case is by aggregating > each individual stream then joining them. This doesn’t scale well with a > large number of input streams because for each stream I would be creating > another state store. > > I was hoping to get thoughts on a KCogroupedStream api. I have spent a little > time conceptualizing it. > > Approach 1: > In KGroupedStream add a cogroup method that takes the single initializer, a > list of other kgroupedstreams, and a list of other aggregators. > This would then all flow through a single processor and a have a single > backing state store. > The aggregator that the object will get sent to is determined by the > context().topic() which we should be able to trace back to one of the > kgroupedstreams in the list. > > The problem I am having with this approach is that because everything is > going through the single processors and java doesn’t do the best with generic > types. I have to either pass in a list of Type objects for casting the object > before sending it to the aggregator or I must create aggregators that accept > an object and cast them to the appropriate type. > > Approach 2: > Create one processor for each aggregator and have a single state store. Then > have a single KStreamPassThrough that just passes on the new aggregate value. > The positive for this is you know which stream it will be coming from and > won’t need to do the context().topic() trick. > > The problem I am having with this approach is understanding if there is a > race condition. Obviously the source topics would be copartitioned. But would > it be multithreaded and possibly cause one of the processors to grab patient > 1 at the same time a different processor has grabbed patient 1? > My understanding is that for each partition there would be a single complete > set of processors and a new incoming record would go completely through the > processor topology from a source node to a sink node before the next one is > sent through. Is this correct? > > > > If anyone has any additional ideas about this let me know. I don’t know if I > have the time to actually create this api so if someone likes the idea and > wants to develop it feel free. > > This e-mail, including attachments, may include confidential and/or > proprietary information, and may be used only by the person or entity > to which it is addressed. If the reader of this e-mail is not the intended > recipient or his or her authorized agent, the reader is hereby notified > that any dissemination, distribution or copying of this e-mail is > prohibited. If you have received this e-mail in error, please notify the > sender by replying to this message and delete this e-mail immediately. >