Hi,
All this makes perfect sense now and I could not be more clearer on how
kafka and streams handle times.
So if we use event time semantics (with or without custom timestamp
extractor) getting out of order records is something expected and ones
stream topology design should take care of it.

Right now log append time works in our case because we don't have more than
one producer writing to a partition of a topic.
But yes once we need multiple I suppose we need to consider using event
time and take care of out of order records.

BTW do you ever plan to host summits in India (Mumbai), would be happy to
attend one.

Thanks again
Sachin


On Sun, Feb 23, 2020 at 5:35 PM Matthias J. Sax <mj...@apache.org> wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
>
> >> This really helped to understand that grace period takes care of
> >> out of order records rather than late arriving records.
>
> Well, the grace period defines if (or when) an out-of-order record is
> consider late. Of course, per definition of "late', those records are
> not processed but are dropped.
>
> Note, that a late record is a special case of an out-of-order record.
> If data is ordered, it cannot be late.
>
> About ordering guarantees: Brokers guarantee offset-order per
> partition. However, for out-of-order data we consider record
> timestamps, but not offsets.
>
> You are correct thought, that if you configure a topic to use
> "AppendTime", out-of-order records are not possible. However, using
> "AppendTime" to "avoid" out-of-order data is kind of a hack, as you
> loose actual event-time semantics (ie, if you want to join on
> event-time and you change your config to use "AppendTime" instead you
> modify your data and will get a different result -- note that the
> timestamp is a first class citizen with this regard and modifying it
> something you should be careful about).
>
> If a topic is configure with "CreateTime" (what is the default and in
> general the most useful configuration), than out-of-order records are
> possible:
>
> - - a single producer might "reorder" data if it retries sends
> internally (using `max.in.flight.request=1` or `idempotence=true`
> would guarantee order)
>
> - - an application can set an explicit timestamp for each record before
> writing it into a topic; hence, if the upstream producer application
> does send out-of-order data, it would land like this in the topic
>
> - - often multiple producer are writing into the same topic partition:
> for this case, writes are interleaved and thus out-of-order records
> are expected in general (note that his pattern applies to Kafka
> Streams in each repartition step, and thus, even if you input topic
> have ordered data, repartitioning introduced out-of-order records
> downstream).
>
> Hence, even with the default timestamp extractor you might see
> out-of-order data. Similar, for a custom timestamp extractor. In the
> end it does not really make a big difference if the timestamp is
> stored in the payload or in the record timestamp field: for both
> cases, it really depends on the upstream application that produces the
> data.
>
> Btw: I gave a talk about time semantics at Kafka Summit recently, so
> you might want to check out the recording (there will be a follow up
> talk at Kafka Summit London in April focusing on time semantics in
> Kafka Streams):
>
> https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-
> and-why
> <https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-and-why>
>
>
> - -Matthias
>
> On 2/22/20 7:43 PM, Sachin Mittal wrote:
> > Hi, This really helped to understand that grace period takes care
> > of out of order records rather than late arriving records.
> >
> > I however have a question that why would a record arrive out of
> > order. Doesn't kafka guarantees the order. If we use default
> > timestamp extractor then it will use the embedded time stamp in the
> > record which would be: - event time if message time stamp type is
> > create time. or - ingestion time if the message time stamp type is
> > set as log append time.
> >
> > I guess is these two cases especially the second case when we use
> > log append time, out of order record will never happen. Please let
> > me know if my this understanding is correct. So in this case there
> > would be no point setting grace period.
> >
> > I suppose grace period makes sense when we use a custom timestamp
> > extractor where timestamp is extracted based on record's payload,
> > in this case there are chances that records are processed out of
> > order.
> >
> > Please confirm this.
> >
> > Thanks Sachin
> >
> >
> >
> >
> >
> >
> > On Sat, Feb 22, 2020 at 5:05 PM Matthias J. Sax <mj...@apache.org>
> > wrote:
> >
> >> Sachin,
> >>
> >> "late" data is data that arrives after the grace period and is
> >> not processed but dropped for this reason. What you mean is
> >> "out-of-order data" for which you can use the grace period to
> >> process it -- increasing the window size would be a semantic
> >> change, while increasing the grace period allows you get the same
> >> result for ordered and unordered input.
> >>
> >> Let's look at an example with a join-window of 5 seconds
> >> <key,value,timestamp>
> >>
> >> Stream1: <k1,v1,10> <k2,v2,20> <k1,v3,26> Stream2: <k1,w1,12>
> >> <k2,w2,30> <k1,w3,13>
> >>
> >> With a grace period of zero, the computation and result would be
> >> as follows:
> >>
> >> s1 -> k1 (insert into store) s2 -> k1 (insert into store + join)
> >> -> result <k1,v1+w1,12> s1 -> k2 (insert into store + remove k1
> >> from store because window size is only 5 and grace period is
> >> zero) s1 -> k1 (insert into store + remove k2 from store because
> >> window size is only 5 and grace period is zero) s2 -> k2 (insert
> >> into store -> no result because k2 from s1 was already removed)
> >> s2 -> k1 (out-of-order record that is also late, drop on the
> >> floor).
> >>
> >> Note that the last record from s2 should actually be in the
> >> result and if it would not have been out-or-order it would have
> >> joined with the first record from s1.
> >>
> >> If we increase the grace period (note that default grace period
> >> is 24h) to for example to 50, we would get the following:
> >>
> >> s1 -> k1 (insert into store) s2 -> k1 (insert into store + join)
> >> -> result <k1,v1+w1,12> s1 -> k2 (insert into store) s1 -> k1
> >> (insert into store -- does not join because window is only 5) s2
> >> -> k2 (insert into store -- does not join because window is only
> >> 5) s2 -> k1 (out-of-order record, outside of the window but
> >> processed normally because it's within the grace period: insert
> >> into store + join) -> result <k1,v1+w3,13>
> >>
> >> This result is semantically "the same" as the result above -- if
> >> is different though as we allow to process out-of-order data. The
> >> missing join result from the last record of s2 and the first
> >> record of s1 is now in the result as desired.
> >>
> >> On the other hand, if we increase the window size to 50, we get
> >> a semantically different result:
> >>
> >> s1 -> k1 (insert into store) s2 -> k1 (insert into store + join)
> >> -> result <k1,v1+w1,12> s1 -> k2 (insert into store) s1 -> k1
> >> (insert into store + join) -> result <k1,v3+w1,26> s2 -> k2
> >> (insert into store + join) -> result <k2,v2+w2,30> s2 -> k1
> >> (out-of-order record, within the window: insert into store +
> >> join) -> 2 result2 <k1,v1+w3,13>, <k1,v3+w3,26>
> >>
> >> Because we changes the window size, we get 5 result records
> >> instead of 2 (or 1) as in the first two examples.
> >>
> >> Does this make sense?
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 2/21/20 7:35 PM, Sachin Mittal wrote:
> >>> Hi, Reading the kafka docs I see that grace period is defined
> >>> as: the time to admit late-arriving events after the end of the
> >>> window
> >>>
> >>> I however have not understood as when to use it?
> >>>
> >>> If I see that some records are arriving after the end of the
> >>> window and hence not included in the join, should I not simply
> >>> increase the window size to accommodate that.
> >>>
> >>> When do I really need to use grace and not alter the window
> >>> size.
> >>>
> >>> Thanks Sachin
> >>>
> >>
> >>
> >
> -----BEGIN PGP SIGNATURE-----
>
> iQIzBAEBCgAdFiEEI8mthP+5zxXZZdDSO4miYXKq/OgFAl5Sal0ACgkQO4miYXKq
> /Oh6zxAAng5mModZjyI+TGuZibI1kSdfSwGjmOWnMMdyw1XLCp64G+7x4ZgL3b3v
> 9bNpgEjmF4Y3C5SOKNXE8s02pVeqkCYN8JHJfa7laS3GeKiSyCARCno5y8eRItbA
> JQnq9MgEY8I+sIWZbdviMU/FgDKVS2E2TTI5H/XMx0EtClYUakWrIlQ3ubMVeakH
> 9DJBrV+VZbCKYOmnGMpyJ635oImuHLl0+ANgZAIbngrV0jPZ7Mc9WY8F1nLwzrwZ
> 8GwRhcDHkd4HSLK7THWxHFPPvoV2BbyzDxWHon9gYahk5rOyaATKeTOoWsu7AOqA
> uxTgL97LdK8Ovj0ZFkGYwGmI1lMNbtQBYJlcO2wePuEVZBfLgRH+62KmYqmhaWaZ
> mbXOBhZZYSWc3xdKbDedKUB3CqocV5hgZM5MLBaHeJ8KJa0iChH+frzOp59BHt0B
> vJrEeojuoe9TeMfKanAiGAu+0OAFh5Bvmy8tc8sM6hdZ4QWOeg0dMrPzMiMFYFqf
> NEkphFRdC0m8JD1CuvCsgue0pTRqL1y9B/MzgIRZQjKkR4GCO0J3j5PeF0bSjtyp
> evRvZnq2VarZxNf2daJ0KppsoV659ff/5DH4WL5J+m8y2bnBFVVC/y1cBTt9cNSI
> B/dGHVCJFiGalNgFsC8eipAXqxG5/xrE0vH4atqfFE4vnTNRYYg=
> =qp80
> -----END PGP SIGNATURE-----
>

Reply via email to