If I understand the original question correctly, this is the same issue I
was having writing to RiakTS; the records are considered immutable, so
writing the same "key" (group of pmacct primitives) replaced the previous
value. I solved this by adding the Kafka offset value of the first message
processed in each refresh cycle to the batch of rows written to riak as a
"message batch ID" field. Problem solved. :) we are using a 5s refresh
time, btw, with no quantization of the time stamp - we do all of that later
in the pipeline.

Of course, with the demise of Basho, we were faced with many more
significant issues related to RiakTS. :D  We eventually chose CrateDB to
replace it and it's worked out very well for us.


On Jan 16, 2018 7:13 AM, "Paolo Lucente" <pa...@pmacct.net> wrote:


Hi Georgios,

Thanks for the interesting email. Inline:

On Fri, Jan 12, 2018 at 07:29:32PM +0100, Georgios Kaklamanos wrote:
>
> I'm using nfacctd, with Kafka, and the data are written in InfluxDB, and
> I'd like some help to clarify an issue that I encountered.
>
> As far as I've understood nfacctd's way of operation, when using
> historical accounting, it is using the timestamps of the flow generation
> to do the aggregates.
>
> So, if I have:
>
> kafka_refresh_time[x]: 300
> kafka_history[x]: 5m
> kafka_history_roundoff[x]: m
>
> Then, it will aggregate all flows with timestamp_start say 18:00 to
> 18:05, and then at 18:05, have an "event_type": "purge", with
> {"stamp_inserted" : 18:00}.
>
> However, if an out-of-order record X arrives (say timestamp_arrival:
> 18:07 but timestamp_start: 18:04), it will emit again another message
with:
>
> { "event_type": "purge", "stamp_inserted" : 18:00}.
>
>
> And this would contain only the payload of record X, not X + the
> aggregate of 18:00-18:05, of that entry, right?

That is all correct.

> Since InfluxDB doesn't contain any read-add-replace, and I'm using the
> stamp_inserted as the time value, the old entries are replaced as soon
> as X type of records arrive.
>
>
> To my understanding, I could use the kafka_startup_delay parameter, to
> solve this, but would it be possible to have a value here that is larger
> than the kafka_history / kafka_refresh_time (Assuming the active timeout
> is also large)?
>
> [A potential solution might be to use nfacctd_time_new, however I'd
> prefer to keep the original times.]

I reckon objects in InfluxDB are 'immutable'. And you are correct about
the two alternatives you would have out of the box.

One would essentially be to cache  objects in (pmacct) memory for longer
(and hence using more memory) so to ensure not to have to update that
object again. You are right that if active timeouts are large going down
this path would be an overkill. From a pure implementation standpoint, i
would even just recommend to use a kafka_refresh_time larger than
kafka_history (instead of resorting to kafka_startup_delay).

The other would be to use the arrival time rather than the start time of
the flow which, i agree, while solving the con of the other solution, it
may not be suitable for all use-cases.

Consider you have a third one: to be checked whether it can work against
InfluxDB, have some sort of auto-increment field (like you have in MySQL
for example) so that no records will ever update / be seen as duplicate.
Of course this will generate more records in the DB and you will pay the
price at query time by needing always GROUP BYs. Does it look something
worth exploring?

> Alternatively, would it be possible (if it also makes sense in other
> usecases) for you to add two (optional) parameters for the kafka plugin,
> that do something like:
>
> - kafka_update_event_type: if the event is an update, then the
> event_type, field of the message is set to "update", and not to purge
>
> - kafka_update_event_topic: specify a different topic for the events
> that are updates.

Heh, guess the cache was purged at 10:05 with all objects belonging to
the 10:00 time-bin; then a new time-bin starts, to be purged at 10:10;
during such time-bin guess you receive an object belonging to the 10:00
time-bin - you would make an assumption that is an update, it could very
well be not one.

Paolo

_______________________________________________
pmacct-discussion mailing list
http://www.pmacct.net/#mailinglists
_______________________________________________
pmacct-discussion mailing list
http://www.pmacct.net/#mailinglists

Reply via email to