Hi,

I am observing somewhat unexpected (from my point of view) behaviour
while ke-key / re-partitioning operations in order to prepare a
KTable-KTable join.

Assume two (simplified) source data structures from two respective topics:

class User {
  Long id; // PK
  String name;
}

class Attribute {
  Long id; // PK
  Integer number;
  Long user_id; // FK
}

Now in order to build an aggregate user containing all of its
attributes (0-n), the 'attributes' topic needs to be re-keyed to its
FK ('native' FK join is not possible as there's no right join
operation) using a collection object.

class GroupedAttributes {
  List<Integer> numbers = new ArrayList<>();
  public GroupedAttributes add(Integer v) {
    numbers.add(v);
    return this;
  }
  public GroupedAttributes remove(Integer v) {
    numbers.remove(v);
    return this;
  }
}

Re-Key operation:

KTable<Long, GroupedAttributes> groupedAttributes = attributes // this
is a KTable<Long, Attribute>
    .groupBy(
        (k, v) -> KeyValue.pair(v.userId(), v.number()),
        Grouped.with(
            "attributes-grouped",
            Serdes.Long(),
            Serdes.Integer()))
    .aggregate(
        GroupedAttributes::new,
        (k, v, a) -> a.add(v),
        (k, v, a) -> a.remove(v),
        Named.as("attributes-grouped-aggregated"),
        Materialized.with(Serdes.Long, groupedAttributesSerde));

This internally creates a state store and associated topic
'attributes-grouped-aggregated-changelog' containing the aggregated
'number' attributes re-keyed to their FK (user_id).

Now for a User associated with exactly one Attribute, I'd expected the
topic to contain exactly one record with the user's key and a
GroupedAttributes object with one item. But: in fact that topic
contains thousands of records for that particular user with an ever
growing list of always the same attribute 'number', which is
eventually reduced to the (expected) final object with one attribute
'number'.

E.g.:

offset: 1, key: 100, { "numbers": [1] }
offset: 3, key: 100, { "numbers": [1, 1] }
offset: 6, key: 100, { "numbers": [1, 1, 1] }
offset: 9, key: 100, { "numbers": [1, 1, 1, 1] }
...
offset 262211, key: 100, { "numbers": [1, 1] }
offset 262213, key: 100, { "numbers": [1] }

Can anyone please shed some light on the internal workings and explain
if this is expected behaviour?

Best wishes,
Karsten

Reply via email to