Hi Nicolas,

There is a lot here, so let's try to split the concerns around some themes:

1. The Processor API is flexible and can definitely do what you want, but as 
you mentioned, at the cost of you having to manually craft the code.
2. Why are the versions used? I sense there is concern about records arriving 
out of order so the versions give each record with the same ID an order. Is 
that correct?
3. If you didn't have the version and the count requirement I'd say using a 
KTable to interpret the stream and then aggregating on that would be 
sufficient. There might be a way to do that with a mixture of the DSL and the 
processor API.

Another alternative might be to use the Interactive Query APIs 
 to first get all your data in KTables and then query it periodically (you can 
decide on the frequency manually).


> On 12 Jan 2017, at 22:19, Nicolas Fouché <nfou...@onfocus.io> wrote:
> Hi,
> long long technical story, sorry for that.
> I'm dealing with a special case. My input topic receives records containing
> an id in the key (and another field for partitioning), and a version number
> in the value, amongst other metrics. Records with the same id are sent
> every 5 seconds, and the version number increments.
> These metrics in the record value are used in aggregations to compute
> `sums` and `counts` (then stored in a DB to compute averages), and to
> compute a few other data structures like cumulative time buckets. If the
> aggregation receives the same record with updated metrics, I have to
> decrement `sum` by the metric value of the previous record, and increment
> `sum` by the new metric value. Also, the `count` would be incremented by 1
> only if the record is seen for the first time (which is not the same as
> "version number = 1").
> To implement this, we would write a processor which would compute the diff
> of metrics by storing the last version of each record in its state. This
> diff is sent to the aggregation, this diff also tells if the record was the
> first (so `count` is incremented). I think this can only written with the
> low level API.
> That could work well, except we have a dozen type of records, with a few
> metrics each, and quite a few fields to compute in aggregations. Each time
> we deal with this type of "duplicate" records, we would have to write all
> the code to compute the diffs again, and the aggregation algorithm becomes
> way less trivial (we deal with cumulative time buckets, if one knows what I
> mean).
> So we got another idea, which does not seem to feel right in a *streaming*
> environment, and quite inefficient:
> ====
> The goal is to "buffer" records until we're quite sure no new version will
> be received. And if a new version is actually received, it's ignored.
> A generic low level processor would be used in topologies which receive the
> same records with updated metrics and an incremented version.
> One state store: contains the records, used to know if a record was already
> received and when, and if the record was already transferred.
> Algorithm:
> On each new record:
> - GET the previous record in the store by Key
> - ignore the new record if:
> -- the record version is lower than the one in the store
> -- the record timestamp is at least 5 minutes newer than the one in store
> - PUT (and thus replace) the record in the store
> Every 1 minute:
> - for each record in the store
> -- if the record has the field "forwarded == true"
> --- DELETE it from the store if the record is one hour old
> -- else
> --- if the timestamp is more that 5 minutes old
> ---- PUT the record in the store with the field "forwarded" set to true
> ---- forward the record
> ===
> Caveats:
> - low-level processors don't have access to the record's ingestion
> timestamp. So we would have to add it to the record value before producing
> the record.
> - no secondary indexes, so we do complete iterations on each `ponctuate`
> - it feels so wrong
> Any suggestions ? It feels like a KStream of KTable records...
> Thanks.

Reply via email to