I ended up creating an adapter of sorts for this situation. It might be 
convenient to have a Stream.statefulEach(Field keyFields, Field inputFields, 
Combiner/ReducerAggregator agg, Fields outputFields) method for this situation.



> On Oct 13, 2014, at 9:35 AM, Yuval Oren <[email protected]> wrote:
> 
> Hi there,
> 
> I’m trying to denormalize a user’s join date in a trident topology. My input 
> looks like this:
> 
> Field1 User ID Event Date
> ====== ======= ==========
> a      1       10/1
> b      1       10/2
> c      2       10/3
> 
> And the output I want is:
> 
> Field1 User ID Event Date Join Date
> ====== ======= ========== =========
> a      1       10/1       10/1
> b      1       10/2       10/1
> c      2       10/1       10/3
> 
> 
> What is the right way to do this? If I use persistentAggregate and 
> stateQuery, how can I be sure that the state is updated before the query 
> happens. i.e.
> 
> TridentState joinDates = input.persistentAggregate(…);
> Stream desiredOutput = input.stateQuery(joinDates,…);
> 
> 
> --
> Yuval Oren
> N3TWORK
> 

Reply via email to