Aha! Thanks, Renato, that's very clear.

I think there's a couple of ways you can model this, but one thing that
applies to all of them is that you should consider the `max.task.idle.ms`
configuration option. If you set it higher than `max.poll.interval.ms`,
then Streams will be able to ensure that it processes records from
multiple inputs in timestamp order (as long as the upstream systems
have already buffered the data). It's not strictly necessary for the algorithms
I'd propose, but It's probably more intuitive e.g., if you process the "order"
record before the "item" records for that order.

Offhand, it seems like you could take two high-level approaches here.

One is to collect the items into an "order items" table, keyed by order
id, and _then_ join it with the order table. This is probably more attractive
if you also have other uses for the order table that don't involve the items.
It looks like this:

KStream<ItemID, Item> items = ...
KTable<OrderID, Order> orders = ...

KTable<OrderID, HashMap<ItemID, Item>> orderItems = 
  items
    .groupBy( item -> new KeyValue(item.orderID, item))
    .aggregate(
      () -> new HashMap(),
      (orderId, item, items) -> { items.put(item.itemID, item); return items; }
    )

KTable<OrderID, OrderWithItems> result = 
  orders.join(
    orderItems,
    (order, itemsForOrder) -> new OrderWithItems(order, itemsForOrder)
  );

Note that this is just a sketch. In reality, you'll have to specify a serde for
the HashMap, etc.


The other idea I had was to treat it as a pure aggregation, by first merging
orders and items into a single stream and just doing the stream aggregation
on it. Something like:

KStream<ItemID, Item> items = ...
KStream<OrderID, Orders> orders = ...

KStream<OrderID, Item> itemsByOrderId =
  items.selectKey( (itemId, item) -> item.orderID );

KStream<OrderID, Object> merged = orders.merge(itemsByOrderId);

KTable<OrderID, OrderWithItems> result =
  merged
    .groupByKey()
    .aggregate(
      () -> new OrderWithItems(),
      (orderId, object, orderWithItems) -> {
        if (object instanceOf Order) {
          orderWithItems.setOrder((Order) object);
        } else if (object instanceOf Item) {
          orderWithItems.addItem((Item) object);
        } else {
          throw new IllegalArgumentException("Unexpected value: " + object);
        }
      }
    );

I think the most important consideration between these is just which
one you and your team find more intuitive. They should have about the
same performance characteristics except:
* The first option needs both input KTables to be stored (to compute the join)
* The second option stores just one KTable, consisting of the _result_
The sizes of these two data sets should actually be about the same
*except* that if you _also_ choose to materialize the _result_ of the join
then the first approach would use twice the storage.

But I'd really strongly recommend to favor clear code over efficient code,
unless you know ahead of time that storage efficiency is going to be a
big problem.

I hope this helps!
-John


On Wed, Feb 19, 2020, at 16:19, Renato Melo wrote:
>  Hi John,
> Thank you for your reply.
> 
> Let me clarify.
> 
> I used the word aggregate, but we are using aggregate functions. Our 
> case is a relationship whole-part between messageA and message1, 2, n. 
> Like order and order items.
> 
> So translating our case, messageA is the order and message1 and 2 are items.
> 
> When I said we aggregate, I was trying to say we add item to the order. 
> 
> So we have an order in the KTable.
> 
> When the first item arrives, Kafka Streams joins the item to order.
> 
> Then we add the item to the order. Do some calculations. And them we 
> have a separated Kafka producer that pushes the order back to the 
> KTable.
> After the first item we expected this:
> Order (item1)
> Then the second item arrives and the Kafka Streams joins the item2 to 
> Order in the streams, but the order is not updated yet. So we add the 
> item2 to order and instead of having:
> Order(item1, item2) 
> 
> we have 
> 
> Order(item2)
> I hope I made more clear our scenario.
> Regards,
> 
> Renato de Melo
> 
>    Em quarta-feira, 19 de fevereiro de 2020 18:12:15 BRT, John Roesler 
> <vvcep...@apache.org> escreveu:  
>  
>  Hi Renato,
> 
> Can you describe a little more about the nature of the join+aggregation
> logic? It sounds a little like the KTable represents the result of aggregating
> messages from the KStream?
> 
> If that's the case, the operation you probably wanted was like:
> 
> > KStream.groupBy().aggregate()
> 
> which produces a KTable view of the aggregation result, and also guarantees
> that when processing the second message, you'll see the result of having 
> processed the first.
> 
> Let me know if I've misunderstood.
> 
> Thanks,
> -John
> 
> On Wed, Feb 19, 2020, at 14:03, Renato Melo wrote:
> > Hi Kafka Community,
> > 
> > Please take a look into my use case:
> > 
> > Fist message1
> > 1. We have a KStream joined to a KTable(Compact Topic).
> > 2. We received a message1 from the KStream, aggregates the message1 to 
> > the joined messageA from KTable. 
> > 3. We pushes back the messageA with aggregated message1 into KTable.
> > 
> > Second message 2
> > 4. Message2 arrives on KStream and joins to the expected update 
> > MessageA from the KTable. For our surprise messageA was not yet updated.
> > 5. We aggregate message2 into messageA.
> > 6. We pushes messageA to the KTable(Compact topic) and the first 
> > aggregated message is overwritten.
> > 
> > Is there a way to speed up the update in the Ktable (Compact Topic)?
> > 
> > Is something wrong with my use case?
> > 
> > I do appreciate any help. Thank you in advanced.
> > 
> > Renato de Melo
> > 
> > 
> >
>

Reply via email to