Hi Nicolas,

For KStream-KTable join, if a record coming from the KStream did not find
the matching record from the other materialized KTable, then the join
result is lost since as you noted, even if the changelog record arrives to
KTable later, it will not trigger a join as the KStream is not
materialized. Using the early timestamps on KTable has the effects of
trying to first process records from KTable and materialize them before
processing records from KStream so that it is likely that the corresponding
matching key has already exist in KTable when the joining record from
KStream arrives and being processed. But again, it is not perfectly
guaranteed since this is best-effort.

I think ideally what you want is a windowed KStream-KTable join, where the
KTable's key space is bounded and hence can be completely materialized
(across partitions if you run multiple instances of your same piece of
code), whereas the KStream is unbounded and hence you need to window it in
order to materialize it so that if there is a late record from the KTable,
it may still find the matching record from the windowed KStream. Is that
right?


Guozhang



On Wed, Jul 20, 2016 at 4:25 AM, Nicolas PHUNG <nsphung.apa...@gmail.com>
wrote:

> @Guozhang Ok I've tried and it doesn't have the expected behavior. For
> KStream-KStream join, there's the issue to have to produce the same
> changelog record to be able to join within the windows. And for
> KStream-KTable, an update/insert in the changelog record doesn't trigger
> join missed that was in the record stream (+you can't specify a windows for
> a KStream-KTable).
>
> On Wed, Jul 20, 2016 at 11:14 AM, Nicolas PHUNG <nsphung.apa...@gmail.com>
> wrote:
>
> > Hi,
> >
> > Thank you for your answer @Matthias. Indeed, I need a kind of symmetric
> > join. However, KStream-KStream join doesn't match with my use case: I
> will
> > need to generate the events in the changelog (e.g a campaign marketing
> with
> > a certain Id/Key) because they live only for the join in a defined
> windows.
> > Let's say I got a click in the record stream and then the campaign entity
> > arrive later on within the windows, the join enriched stream works. But
> > after the windows, new click arrive in the record stream and won't be
> able
> > to find the campaign entity since the windows containing the information
> > has passed. I haven't tried KTable-KTable yet but I think my clicks for
> > example doesn't really match a changelog stream in my opinion.
> >
> > @Guozhang Ok so If I fake the timestamp in the record stream (KStream)
> > (let's say add 1 day), I could manage to give one day chance to the late
> > arrival in the changelog stream (my KTable) for the Join to be processed.
> > Let me try. Thanks
> >
> > Regards,
> >
> > On Tue, Jul 19, 2016 at 11:45 PM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> >
> >> Hello Nicolas,
> >>
> >> If this missing matched record issue is mainly due to the order these
> two
> >> streams were processed (e.g., say your corresponding changelog record
> was
> >> a
> >> bit late compared with the record stream's record with the same key),
> you
> >> can try to "hint" Kafka Streams library to give the changelog stream a
> bit
> >> more time ahead by specifying its timestamps using the
> TimestampExtractor
> >> with an earlier value against the record stream. And Kafka Streams will
> do
> >> a best-effort "stream synchronization" to make sure these two streams
> were
> >> processed at roughly the same pace based on record timestamps, which
> will
> >> result in records from the changelog stream to be processed in-priori to
> >> the record stream.
> >>
> >>
> >> Guozhang
> >>
> >>
> >> On Tue, Jul 19, 2016 at 6:10 AM, Matthias J. Sax <matth...@confluent.io
> >
> >> wrote:
> >>
> >> > Hi Nicolas,
> >> >
> >> > your are right, it is currently not possible to get a result from a
> >> > KTable update (and this is by design). The idea is, that the KStream
> is
> >> > enriched with the *current state* of KTable -- thus, for each KStream
> >> > record a look-up in KTable is done. (In this sense, a KStream-KTable
> >> > join in asymmetric.)
> >> >
> >> > If you need a symmetric join (ie, lookup for both directions), you can
> >> > either use a KTable-KTable or KStream-KStream join. Not sure, if this
> >> > might work for your use case.
> >> >
> >> > -Matthias
> >> >
> >> >
> >> > On 07/19/2016 01:36 PM, Nicolas PHUNG wrote:
> >> > > Hi,
> >> > >
> >> > > I'm using Kafka 0.10.0.0 with the Confluent platform 3.0.0
> >> > >
> >> > > I manage to join a record stream (KStream / clicks stream) with a
> >> > changelog
> >> > > stream (KTable / an entity like a campaign related to a click for
> >> > example).
> >> > > When the entity in the KTable is inserted first (and the first time
> of
> >> > > course) in Kafka, the record stream is processed as expected with
> the
> >> > join
> >> > > in a new enriched stream. This is good.
> >> > >
> >> > > My issue is when the record stream generate a record that contains a
> >> > key/id
> >> > > that hasn't been insert yet in the changelog stream/KTable. My
> process
> >> > > generate a record stream without information in the enriched stream.
> >> > Would
> >> > > it be possible to recall this enriched stream process once the
> >> changelog
> >> > > record on my KTable received the missing id/key ? From my
> >> understanding,
> >> > > it's not possible right now to this with a KStream-KTable join. Is
> >> there
> >> > a
> >> > > way to do something like this ?
> >> > >
> >> > > Thanks.
> >> > >
> >> > > Regards,
> >> > > Nicolas PHUNG
> >> > >
> >> >
> >> >
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
>



-- 
-- Guozhang

Reply via email to