Hi Aaron,

Very nicely done. Out of pure curiosity: you were doing this passing
through some intermediate step (a file, an enrichment script consuming
from one topic and writing on the other) or did you patch pmacct or were
you somehow able to do all of it on the fly?

Paolo

On Tue, Jan 16, 2018 at 08:59:16AM -0800, Aaron Finney wrote:
> If I understand the original question correctly, this is the same issue I
> was having writing to RiakTS; the records are considered immutable, so
> writing the same "key" (group of pmacct primitives) replaced the previous
> value. I solved this by adding the Kafka offset value of the first message
> processed in each refresh cycle to the batch of rows written to riak as a
> "message batch ID" field. Problem solved. :) we are using a 5s refresh
> time, btw, with no quantization of the time stamp - we do all of that later
> in the pipeline.
> 
> Of course, with the demise of Basho, we were faced with many more
> significant issues related to RiakTS. :D  We eventually chose CrateDB to
> replace it and it's worked out very well for us.
> 
> 
> On Jan 16, 2018 7:13 AM, "Paolo Lucente" <pa...@pmacct.net> wrote:
> 
> 
> Hi Georgios,
> 
> Thanks for the interesting email. Inline:
> 
> On Fri, Jan 12, 2018 at 07:29:32PM +0100, Georgios Kaklamanos wrote:
> >
> > I'm using nfacctd, with Kafka, and the data are written in InfluxDB, and
> > I'd like some help to clarify an issue that I encountered.
> >
> > As far as I've understood nfacctd's way of operation, when using
> > historical accounting, it is using the timestamps of the flow generation
> > to do the aggregates.
> >
> > So, if I have:
> >
> > kafka_refresh_time[x]: 300
> > kafka_history[x]: 5m
> > kafka_history_roundoff[x]: m
> >
> > Then, it will aggregate all flows with timestamp_start say 18:00 to
> > 18:05, and then at 18:05, have an "event_type": "purge", with
> > {"stamp_inserted" : 18:00}.
> >
> > However, if an out-of-order record X arrives (say timestamp_arrival:
> > 18:07 but timestamp_start: 18:04), it will emit again another message
> with:
> >
> > { "event_type": "purge", "stamp_inserted" : 18:00}.
> >
> >
> > And this would contain only the payload of record X, not X + the
> > aggregate of 18:00-18:05, of that entry, right?
> 
> That is all correct.
> 
> > Since InfluxDB doesn't contain any read-add-replace, and I'm using the
> > stamp_inserted as the time value, the old entries are replaced as soon
> > as X type of records arrive.
> >
> >
> > To my understanding, I could use the kafka_startup_delay parameter, to
> > solve this, but would it be possible to have a value here that is larger
> > than the kafka_history / kafka_refresh_time (Assuming the active timeout
> > is also large)?
> >
> > [A potential solution might be to use nfacctd_time_new, however I'd
> > prefer to keep the original times.]
> 
> I reckon objects in InfluxDB are 'immutable'. And you are correct about
> the two alternatives you would have out of the box.
> 
> One would essentially be to cache  objects in (pmacct) memory for longer
> (and hence using more memory) so to ensure not to have to update that
> object again. You are right that if active timeouts are large going down
> this path would be an overkill. From a pure implementation standpoint, i
> would even just recommend to use a kafka_refresh_time larger than
> kafka_history (instead of resorting to kafka_startup_delay).
> 
> The other would be to use the arrival time rather than the start time of
> the flow which, i agree, while solving the con of the other solution, it
> may not be suitable for all use-cases.
> 
> Consider you have a third one: to be checked whether it can work against
> InfluxDB, have some sort of auto-increment field (like you have in MySQL
> for example) so that no records will ever update / be seen as duplicate.
> Of course this will generate more records in the DB and you will pay the
> price at query time by needing always GROUP BYs. Does it look something
> worth exploring?
> 
> > Alternatively, would it be possible (if it also makes sense in other
> > usecases) for you to add two (optional) parameters for the kafka plugin,
> > that do something like:
> >
> > - kafka_update_event_type: if the event is an update, then the
> > event_type, field of the message is set to "update", and not to purge
> >
> > - kafka_update_event_topic: specify a different topic for the events
> > that are updates.
> 
> Heh, guess the cache was purged at 10:05 with all objects belonging to
> the 10:00 time-bin; then a new time-bin starts, to be purged at 10:10;
> during such time-bin guess you receive an object belonging to the 10:00
> time-bin - you would make an assumption that is an update, it could very
> well be not one.
> 
> Paolo
> 
> _______________________________________________
> pmacct-discussion mailing list
> http://www.pmacct.net/#mailinglists

_______________________________________________
pmacct-discussion mailing list
http://www.pmacct.net/#mailinglists

Reply via email to