Hi Brian , Thank a lot for your advice and your help :) Steve
Le ven. 23 août 2019 à 23:36, Matt Burgess <mattyb...@apache.org> a écrit : > Steve, > > In a flow-based system like NiFi it can be difficult to tell when > there are "no more events with the same key". However if you have a > notion of the max amount of time where events would appear, you might > be able to use MergeRecord or MergeContent with a Correlation > Attribute Name of "eventUuid" with a Min and Max Bin Age of that time > interval (and make sure the Max Records is set higher than you'd ever > expect for the number of events in the system over that time). When > MergeRecord emits a flow file, you are assured that all the records > have the same eventUuid, and then you might use QueryRecord to execute > SQL on the records which allows you to do some aggregations of the > measures. > > This won't be a perfect system as MergeRecord/Content only has the > number of bins (unique eventUuids, e.g.) you specify, so it's possible > that a single flow file does not contain all events for that UUID over > that time period. You may be able to use an incremental pipeline like > MergeRecord -> PartitionRecord -> MergeRecord -> QueryRecord. If > there's a timestamp field in each record you could have something like > "MIN(timestamp) as min_timestamp, MAX(timestamp) as max_timestamp, > AVG(myEventMeasure) GROUP BY eventUuid" to do the aggregates. It might > not be an exact window size but "max_timestamp - min_timestamp" would > give you the width of the window for the records in each flow file. > > Queries over streams is an active research area, the folks working on > Apache Calcite are doing some really cool things, and we've got > Calcite powering QueryRecord under the hood, so I'm hoping we can > continue to enjoy the fruits of their labor :) > > Regards, > Matt > > On Fri, Aug 23, 2019 at 4:03 PM Steve Robert > <contact.steverob...@gmail.com> wrote: > > > > Hi Brian , > > > > thank you for your reply. > > To be precise I studied the possibility to open a window on an attribute > value and close it after an user-defined time and if there are no more > events with the same key. > > In flink side lot operator send Event with SUCCESS OR FAIL . operator is > running in paralleles. > > I wanted to be able to do a aggregation to determine if there was an > error or not during a life cycle iteration. each life cycle is unique and > associated with a UUID > > > > Le ven. 23 août 2019 à 21:00, Bryan Bende <bbe...@gmail.com> a écrit : > >> > >> I think AttributeRollingWindow was made to aggregate a numeric > >> attribute. So for example, if all flows files had an attribute called > >> "amount" that was an integer, then you could say Value to Track = > >> ${amount} and it would aggregate those values over window. > >> > >> In your case the attribute you have is not the value to aggregate, its > >> a key. The value in your case is a constant of 1 where you want to > >> increment the aggregated value by 1 whenever you see the key. > >> > >> I guess it depends what you want to do with the aggregated value, but > >> your situation seems closer to a counter which could be tracked with > >> UpdateCounter setting the Counter Name to ${eventUuid} and the Delta > >> to 1, but then I don't know exactly how you would use this counter > >> later. > >> > >> On Fri, Aug 23, 2019 at 1:59 PM Steve Robert > >> <contact.steverob...@gmail.com> wrote: > >> > > >> > Hi Guys , > >> > > >> > I apologize in advance if my question seems trivial but I am new on > Nifi. > >> > I'm studying Nifi for an integration with Flink that I'm used to. > >> > > >> > Since flink I send events using Site-to-Site to Nifi. > >> > the Flowfile have attribute > "eventUuid":"97f82c90-0782-4aab-8850-56ee60b0b73d" > >> > > >> > I would like to do an aggregation on events based on the value of > this attribute, > >> > > >> > so I look at the documentation of AttributeRollingWindow. > >> > > >> > I understood by looking at the errors that this operator does not > accept strings and therefore can not track a value on String > >> > Value to Track : ${eventUuid} will throw NumberFormatExceptions. > >> > > >> > Value to Track : ${eventUuid:toNumber()} will not work because UUID > 128 bit can't be convert to an 32 bytes number; > >> > > >> > Is this a limitation or a bad approach on my part? I can not use an > event on the timestamp here because this UUID is generated at the beginning > of the workflow > >> > Thank a lot for your help > >> > Steve > >> > > >> > >