I ended up creating an adapter of sorts for this situation. It might be convenient to have a Stream.statefulEach(Field keyFields, Field inputFields, Combiner/ReducerAggregator agg, Fields outputFields) method for this situation.
> On Oct 13, 2014, at 9:35 AM, Yuval Oren <[email protected]> wrote: > > Hi there, > > I’m trying to denormalize a user’s join date in a trident topology. My input > looks like this: > > Field1 User ID Event Date > ====== ======= ========== > a 1 10/1 > b 1 10/2 > c 2 10/3 > > And the output I want is: > > Field1 User ID Event Date Join Date > ====== ======= ========== ========= > a 1 10/1 10/1 > b 1 10/2 10/1 > c 2 10/1 10/3 > > > What is the right way to do this? If I use persistentAggregate and > stateQuery, how can I be sure that the state is updated before the query > happens. i.e. > > TridentState joinDates = input.persistentAggregate(…); > Stream desiredOutput = input.stateQuery(joinDates,…); > > > -- > Yuval Oren > N3TWORK >
