Hi Brian ,

Thank a lot for your advice and your help :)
Steve

Le ven. 23 août 2019 à 23:36, Matt Burgess <mattyb...@apache.org> a écrit :

> Steve,
>
> In a flow-based system like NiFi it can be difficult to tell when
> there are "no more events with the same key". However if you have a
> notion of the max amount of time where events would appear, you might
> be able to use MergeRecord or MergeContent with a Correlation
> Attribute Name of "eventUuid" with a Min and Max Bin Age of that time
> interval (and make sure the Max Records is set higher than you'd ever
> expect for the number of events in the system over that time). When
> MergeRecord emits a flow file, you are assured that all the records
> have the same eventUuid, and then you might use QueryRecord to execute
> SQL on the records which allows you to do some aggregations of the
> measures.
>
> This won't be a perfect system as MergeRecord/Content only has the
> number of bins (unique eventUuids, e.g.) you specify, so it's possible
> that a single flow file does not contain all events for that UUID over
> that time period. You may be able to use an incremental pipeline like
> MergeRecord -> PartitionRecord -> MergeRecord -> QueryRecord. If
> there's a timestamp field in each record you could have something like
> "MIN(timestamp) as min_timestamp, MAX(timestamp) as max_timestamp,
> AVG(myEventMeasure) GROUP BY eventUuid" to do the aggregates. It might
> not be an exact window size but "max_timestamp - min_timestamp" would
> give you the width of the window for the records in each flow file.
>
> Queries over streams is an active research area, the folks working on
> Apache Calcite are doing some really cool things, and we've got
> Calcite powering QueryRecord under the hood, so I'm hoping we can
> continue to enjoy the fruits of their labor :)
>
> Regards,
> Matt
>
> On Fri, Aug 23, 2019 at 4:03 PM Steve Robert
> <contact.steverob...@gmail.com> wrote:
> >
> > Hi  Brian ,
> >
> > thank you for your reply.
> > To be precise I studied the possibility to open a window on an attribute
> value and  close it after an user-defined time and  if there are no more
> events with the same key.
> > In flink side lot operator send Event with SUCCESS OR FAIL . operator is
> running in paralleles.
> > I wanted to be able to do a aggregation to determine if there was an
> error or not during a life cycle iteration. each life cycle is unique and
> associated with a UUID
> >
> > Le ven. 23 août 2019 à 21:00, Bryan Bende <bbe...@gmail.com> a écrit :
> >>
> >> I think AttributeRollingWindow was made to aggregate a numeric
> >> attribute. So for example, if all flows files had an attribute called
> >> "amount" that was an integer, then you could say Value to Track =
> >> ${amount} and it would aggregate those values over window.
> >>
> >> In your case the attribute you have is not the value to aggregate, its
> >> a key. The value in your case is a constant of 1 where you want to
> >> increment the aggregated value by 1 whenever you see the key.
> >>
> >> I guess it depends what you want to do with the aggregated value, but
> >> your situation seems closer to a counter which could be tracked with
> >> UpdateCounter setting the Counter Name to ${eventUuid} and the Delta
> >> to 1, but then I don't know exactly how you would use this counter
> >> later.
> >>
> >> On Fri, Aug 23, 2019 at 1:59 PM Steve Robert
> >> <contact.steverob...@gmail.com> wrote:
> >> >
> >> > Hi Guys ,
> >> >
> >> > I apologize in advance if my question seems trivial but I am new on
> Nifi.
> >> > I'm studying Nifi for an integration with Flink that I'm used to.
> >> >
> >> > Since flink I send events using Site-to-Site to Nifi.
> >> > the Flowfile have attribute
> "eventUuid":"97f82c90-0782-4aab-8850-56ee60b0b73d"
> >> >
> >> > I would like to do an aggregation on events based on the value of
> this attribute,
> >> >
> >> > so I look at the documentation of AttributeRollingWindow.
> >> >
> >> > I understood by looking at the errors that this operator does not
> accept strings and therefore can not track a value on String
> >> > Value to Track : ${eventUuid}   will throw NumberFormatExceptions.
> >> >
> >> > Value to Track : ${eventUuid:toNumber()}  will not work  because UUID
> 128 bit can't be convert to an 32 bytes number;
> >> >
> >> > Is this a limitation or a bad approach on my part? I can not use an
> event on the timestamp here because this UUID is generated at the beginning
> of the workflow
> >> > Thank a lot for your  help
> >> > Steve
> >> >
> >> >
>

Reply via email to