Steve,

In a flow-based system like NiFi it can be difficult to tell when
there are "no more events with the same key". However if you have a
notion of the max amount of time where events would appear, you might
be able to use MergeRecord or MergeContent with a Correlation
Attribute Name of "eventUuid" with a Min and Max Bin Age of that time
interval (and make sure the Max Records is set higher than you'd ever
expect for the number of events in the system over that time). When
MergeRecord emits a flow file, you are assured that all the records
have the same eventUuid, and then you might use QueryRecord to execute
SQL on the records which allows you to do some aggregations of the
measures.

This won't be a perfect system as MergeRecord/Content only has the
number of bins (unique eventUuids, e.g.) you specify, so it's possible
that a single flow file does not contain all events for that UUID over
that time period. You may be able to use an incremental pipeline like
MergeRecord -> PartitionRecord -> MergeRecord -> QueryRecord. If
there's a timestamp field in each record you could have something like
"MIN(timestamp) as min_timestamp, MAX(timestamp) as max_timestamp,
AVG(myEventMeasure) GROUP BY eventUuid" to do the aggregates. It might
not be an exact window size but "max_timestamp - min_timestamp" would
give you the width of the window for the records in each flow file.

Queries over streams is an active research area, the folks working on
Apache Calcite are doing some really cool things, and we've got
Calcite powering QueryRecord under the hood, so I'm hoping we can
continue to enjoy the fruits of their labor :)

Regards,
Matt

On Fri, Aug 23, 2019 at 4:03 PM Steve Robert
<contact.steverob...@gmail.com> wrote:
>
> Hi  Brian ,
>
> thank you for your reply.
> To be precise I studied the possibility to open a window on an attribute 
> value and  close it after an user-defined time and  if there are no more 
> events with the same key.
> In flink side lot operator send Event with SUCCESS OR FAIL . operator is 
> running in paralleles.
> I wanted to be able to do a aggregation to determine if there was an error or 
> not during a life cycle iteration. each life cycle is unique and associated 
> with a UUID
>
> Le ven. 23 août 2019 à 21:00, Bryan Bende <bbe...@gmail.com> a écrit :
>>
>> I think AttributeRollingWindow was made to aggregate a numeric
>> attribute. So for example, if all flows files had an attribute called
>> "amount" that was an integer, then you could say Value to Track =
>> ${amount} and it would aggregate those values over window.
>>
>> In your case the attribute you have is not the value to aggregate, its
>> a key. The value in your case is a constant of 1 where you want to
>> increment the aggregated value by 1 whenever you see the key.
>>
>> I guess it depends what you want to do with the aggregated value, but
>> your situation seems closer to a counter which could be tracked with
>> UpdateCounter setting the Counter Name to ${eventUuid} and the Delta
>> to 1, but then I don't know exactly how you would use this counter
>> later.
>>
>> On Fri, Aug 23, 2019 at 1:59 PM Steve Robert
>> <contact.steverob...@gmail.com> wrote:
>> >
>> > Hi Guys ,
>> >
>> > I apologize in advance if my question seems trivial but I am new on Nifi.
>> > I'm studying Nifi for an integration with Flink that I'm used to.
>> >
>> > Since flink I send events using Site-to-Site to Nifi.
>> > the Flowfile have attribute 
>> > "eventUuid":"97f82c90-0782-4aab-8850-56ee60b0b73d"
>> >
>> > I would like to do an aggregation on events based on the value of this 
>> > attribute,
>> >
>> > so I look at the documentation of AttributeRollingWindow.
>> >
>> > I understood by looking at the errors that this operator does not accept 
>> > strings and therefore can not track a value on String
>> > Value to Track : ${eventUuid}   will throw NumberFormatExceptions.
>> >
>> > Value to Track : ${eventUuid:toNumber()}  will not work  because UUID 128 
>> > bit can't be convert to an 32 bytes number;
>> >
>> > Is this a limitation or a bad approach on my part? I can not use an event 
>> > on the timestamp here because this UUID is generated at the beginning of 
>> > the workflow
>> > Thank a lot for your  help
>> > Steve
>> >
>> >

Reply via email to