I had encountered similar problem and I was using stream to table join.
However if a record arrives late in the table, it would not join as per
stream to table join semantics.

Unfortunately windowing is not supported in stream to table join.
I suggest to use stream to stream join with appropriate time window to
solve this problem.

So you may want to converted the aggregated table into stream or join first
and do aggregation in the end.

Hope this helps.

Thanks
Sachin



On Mon, Jan 13, 2020 at 10:51 PM Samek, Jiří <sa...@avast.com.invalid>
wrote:

> Hello,
>
> According to the docs, Kafka Streams tasks pick from the partition with the
> smallest timestamp to process the next record.
> (
>
> https://kafka.apache.org/documentation/streams/core-concepts#streams_out_of_ordering
> )
> One can also configure max.task.idle.ms so that Kafka Streams tasks wait
> for all partitions to contain some buffered data before picking next
> record.
>
> I wonder, is it possible to make it so that consumption from selected
> subset of topics (and their partitions) is kept "behind" by defined amount
> of time (several seconds or several minutes)?
>
> Example: lets have topic A and B, both with one partition. I want to
> consume from A if timestamp(A) < timestamp(B) + 10s and from B if
> timestamp(B) < timestamp(A) - 10s.
>
> Scenario:
> I am asking because I have the following scenario.
> There is 1 + X input topics. Let's say the first input topic is Alpha and
> the other topics Beta.
> Alpha records are aggregated into KTable.
> Beta records are joined with aggregated Alpha data (stream-table join by
> key) and sent to another output topic.
> For business problem, it is important to process Alpha records before Beta
> records (based on timestamps), at least in the great majority of cases. In
> other words, a Beta event should be enriched with aggregated data from all
> Alpha events that happened before the Beta event.
> Events can be delayed on its way to Kafka and might by delayed also during
> processing in Kafka Streams - in actual setup I need to map key and
> re-partition topics.
>
> So far, I am using just Kafka Streams DSL. I might be able to use
> .transform() and TransformerSupplier from Processor API and construct new
> timestamps for Beta events - shifting the timestamp to future by a given
> amount of time. I guess, I could do it after mapping the key and before
> join, so the re-partitioned records have the modified timestamp.
> Then, if Kafka Streams' "pick from the partition with the smallest
> timestamp" is respected also for internal topics, it should work fine.
> Would you consider it a good approach?
>
> The "delay" would incur latency to Beta events processing. It wouldn't be
> trouble if delay is small. Otherwise, I guess ,I would need to use some
> kind of windowing. Although, windowing isn't supported by stream-table
> join, so that might be a challenge. Do you know if stream-table join
> windowing might be supported in a future version of Kafka Streams?
>
> Best Regards,
> Jiri Samek
>

Reply via email to