Hi Kyle, (cc-ing user list as well)

This could be an interesting scenario. Two things to help us think through it 
some more: 1) it seems you attached a figure, but I cannot seem to open it. 2) 
what about using the low level processor API instead of the DSL as approach 3? 
Do you have any thoughts on that?

Thanks
Eno

> On 13 Apr 2017, at 11:26, Winkelman, Kyle G <kyle.winkel...@optum.com> wrote:
> 
> Hello,
>  
> I am wondering if there is any way to aggregate together many streams at once 
> to build a larger object. Example (Healthcare Domain):
> I have streams of Medical, Pharmacy, and Lab claims. Key is PatientId, Value 
> is a different Avro Record for each stream.
> I was hoping there was a way to supply a single Initializer, () -> new 
> Patient(), and 3 aggregators, (key, value, patient) -> 
> patient.add******Claim(value).
>  
> Currently the only way that I see to do the above use case is by aggregating 
> each individual stream then joining them. This doesn’t scale well with a 
> large number of input streams because for each stream I would be creating 
> another state store.
>  
> I was hoping to get thoughts on a KCogroupedStream api. I have spent a little 
> time conceptualizing it.
>  
> Approach 1:
> In KGroupedStream add a cogroup method that takes the single initializer, a 
> list of other kgroupedstreams, and a list of other aggregators.
> This would then all flow through a single processor and a have a single 
> backing state store.
> The aggregator that the object will get sent to is determined by the 
> context().topic() which we should be able to trace back to one of the 
> kgroupedstreams in the list.
>  
> The problem I am having with this approach is that because everything is 
> going through the single processors and java doesn’t do the best with generic 
> types. I have to either pass in a list of Type objects for casting the object 
> before sending it to the aggregator or I must create aggregators that accept 
> an object and cast them to the appropriate type.
>  
> Approach 2:
> Create one processor for each aggregator and have a single state store. Then 
> have a single KStreamPassThrough that just passes on the new aggregate value.
> The positive for this is you know which stream it will be coming from and 
> won’t need to do the context().topic() trick.
>  
> The problem I am having with this approach is understanding if there is a 
> race condition. Obviously the source topics would be copartitioned. But would 
> it be multithreaded and possibly cause one of the processors to grab patient 
> 1 at the same time a different processor has grabbed patient 1?
> My understanding is that for each partition there would be a single complete 
> set of processors and a new incoming record would go completely through the 
> processor topology from a source node to a sink node before the next one is 
> sent through. Is this correct?
>  
> 
>  
> If anyone has any additional ideas about this let me know. I don’t know if I 
> have the time to actually create this api so if someone likes the idea and 
> wants to develop it feel free.
> 
> This e-mail, including attachments, may include confidential and/or
> proprietary information, and may be used only by the person or entity
> to which it is addressed. If the reader of this e-mail is not the intended
> recipient or his or her authorized agent, the reader is hereby notified
> that any dissemination, distribution or copying of this e-mail is
> prohibited. If you have received this e-mail in error, please notify the
> sender by replying to this message and delete this e-mail immediately.
> 

Reply via email to