We currently do not have a KIP for it yet.

On Wed, Jun 7, 2017 at 3:21 AM, Frank Lyaruu <flya...@gmail.com> wrote:

> I tried to use a TimestampExtractor that uses our timestamps from the
> messages, and use a 'map' operation on the KTable to set it to current, to
> have a precise point where I discard our original timestamps. That does not
> work, (I verified by writing a separate java Kafka Consumer and spit out
> the timestamps) as the TimestampExtractor only gets called once, and it
> will stick with that time. I did not really have a good reason not to
> simply use the WallclockTimeExtractor, and that one seems to do exactly
> what I wanted.
>
> So, I'm good! I am interested in the community discussion Guozhang
> mentions. Is there a KIP for that?
>
> regards, Frank
>
>
> On Mon, Jun 5, 2017 at 8:25 PM, Matthias J. Sax <matth...@confluent.io>
> wrote:
>
> > Frank,
> >
> > If you use "now", I assume you are calling System.currentTimeMillis().
> > If yes, you can also use predefined WallclockTimestampExtractor that
> > ships with Streams (no need to write your own one).
> >
> > > I thought that the Timestamp extractor would then also use
> > >> that updated timestamp as 'stream time', but I don't really see that
> > >> happening, so that assumption was wrong.
> >
> > Yes, this should happen. Not sure why you don't observe this. And thus,
> > the producer should use this timestamp to write the records.
> >
> > How did you verify the timestamps that are set for your output records?
> >
> >
> > -Matthias
> >
> >
> > On 6/5/17 6:15 AM, Frank Lyaruu wrote:
> > > Thanks Guozhang,
> > >
> > > I figured I could use a custom timestamp extractor, and set that
> > timestamp
> > > to 'now' when reading a source topic, as the original timestamp is
> pretty
> > > much irrelevant. I thought that the Timestamp extractor would then also
> > use
> > > that updated timestamp as 'stream time', but I don't really see that
> > > happening, so that assumption was wrong.
> > >
> > > If I could configure a timestamp extractor that would also be used by
> the
> > > producer I think I'd be in business, but right now I don't see an
> elegant
> > > way forward, so any ideas for work arounds are welcome.
> > >
> > > regards, Frank
> > >
> > > On Mon, Jun 5, 2017 at 7:01 AM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >
> > >> Frank, thanks for sharing with your findings.
> > >>
> > >> I think this is a general issue to consider in Streams, and the
> > community
> > >> has been thinking about it: we write intermediate topics with the
> stream
> > >> time that is inherited from the source topic's timestamps, however
> that
> > >> timestamp is used for log rolling / retention etc as well, and these
> two
> > >> purposes (use timestamps in processing for out-of-ordering and late
> > >> arrivals, and operations on the Kafka topics) could rely on different
> > >> timestamp semantics. We need to revisit on timestamps can be
> maintained
> > >> across the topology in Streams.
> > >>
> > >> Guozhang
> > >>
> > >> On Sat, Jun 3, 2017 at 10:54 AM, Frank Lyaruu <flya...@gmail.com>
> > wrote:
> > >>
> > >>> Hi Matthias,
> > >>>
> > >>> Ok, that clarifies quite a bit. I never really went into the
> timestamp
> > >>> aspects, as time does not really play a role in my application (aside
> > >> from
> > >>> the repartition topics, I have no KStreams or Windowed operation,
> just
> > >>> different kind of KTable join).
> > >>>
> > >>> I do think that the fail case I see (With this version joining two
> > 'old'
> > >>> KTables causes a small percentage of records to vanish) is far from
> > >>> intuitive, and it somehow worked fine until a few weeks ago.
> > >>>
> > >>> I think your option 3 should work. I'll make a custom timestamp
> > extractor
> > >>> (I actually do have a timestamp in my messages), and I'll set it to
> the
> > >>> current time as they enter the streams application.
> > >>>
> > >>> Thanks, that helped, regards, Frank
> > >>>
> > >>> On Fri, Jun 2, 2017 at 9:17 PM, Matthias J. Sax <
> matth...@confluent.io
> > >
> > >>> wrote:
> > >>>
> > >>>> Hi Frank,
> > >>>>
> > >>>> yes, retention policy is based on the embedded record timestamps and
> > >> not
> > >>>> on system time. Thus, if you send messages with an old timestamp,
> they
> > >>>> can trigger log/segment rolling.
> > >>>>
> > >>>>>> I see that the repartition topics have timestamp.type =
> CreateTime,
> > >>> does
> > >>>>>> that mean it uses the timestamp of the
> > >>>>>> original message?
> > >>>>
> > >>>> Yes. That's the default setting on the broker side. For Streams, we
> > >>>> maintain a so-called "stream time" that is computed based on the
> input
> > >>>> record timestamps. This "stream time" is used to set the timestamp
> for
> > >>>> records that are written by Stream. (so it's more or less the
> > timestamp
> > >>>> of the input records).
> > >>>>
> > >>>>>> Shouldn't that be LogAppendTime for repartition topics?
> > >>>>
> > >>>> No. Streams needs to preserve the original timestamp to guaranteed
> > >>>> correct semantics for downstream window operations. Thus, it should
> be
> > >>>> CreateTime -- if you switch to LogAppendTime, you might break your
> > >>>> application logic and get wrong results.
> > >>>>
> > >>>>>> Or is there a way to configure that?
> > >>>>
> > >>>> You can configure this on a per topic basis on the brokers.
> > >>>>
> > >>>>>> If I hack into my Kafka streams code to force it to use
> > >> LogAppendTime
> > >>>> seems
> > >>>>>> to solve my problem, but that appears to
> > >>>>>> take a huge toll on the brokers. Throughput plummets, and I don't
> > >>> really
> > >>>>>> know why.
> > >>>>
> > >>>> I am not sure what you mean by this? As it's a topic config, I don't
> > >>>> understand how you can force this within you Streams application?
> > >>>>
> > >>>>
> > >>>> IMHO, you have multiple options thoug:
> > >>>>  - increase the retention time for you re-partitioning topics
> > >>>>  - you could change the retention policy to number of bytes instead
> of
> > >>>> time for the re-partitioning topics
> > >>>>  - you can implement a custom timestamp extractor and adjust the
> > >>>> timestamps accordingly ("stream time" is based on whatever timestamp
> > >>>> extractor return)
> > >>>>
> > >>>> However, if you have records with old timestamps, I am wondering why
> > >>>> they are not truncated in your input topic? Do you not face the same
> > >>>> issue there?
> > >>>>
> > >>>> All my topics are compacted, I use no windowed operations at all,
> the
> > >>> only
> > >>> 'delete'
> > >>> topics are the repartitioning internal topics.
> > >>>
> > >>>>
> > >>>> -Matthias
> > >>>>
> > >>>>
> > >>>>
> > >>>>
> > >>>>
> > >>>> On 6/2/17 9:33 AM, Frank Lyaruu wrote:
> > >>>>> Hi Kafka people,
> > >>>>>
> > >>>>> I'm running an application that pushes database changes into a
> Kafka
> > >>>> topic.
> > >>>>> I'm also running a Kafka streams application
> > >>>>> that listens to these topics, and groups them using the high level
> > >> API,
> > >>>> and
> > >>>>> inserts them to another database.
> > >>>>>
> > >>>>> All topics are compacted, with the exception of the 'repartition
> > >>> topics',
> > >>>>> which are configured to be retained for 36 hours.
> > >>>>>
> > >>>>> Note that the changes in the original kafka topics can be old
> > >>> (generally
> > >>>>> more than 36 hours), as they only change when
> > >>>>> the data changes.
> > >>>>>
> > >>>>> When I start an instance of the Kafka Streams application, I see
> the
> > >>>>> repartition topics being deleted immediately,
> > >>>>> sometimes before they are processed, and it looks like the
> > >> repartition
> > >>>>> messages use the same timestamp as the
> > >>>>> original message.
> > >>>>>
> > >>>>> I see that the repartition topics have timestamp.type = CreateTime,
> > >>> does
> > >>>>> that mean it uses the timestamp of the
> > >>>>> original message? Shouldn't that be LogAppendTime for repartition
> > >>> topics?
> > >>>>> Or is there a way to configure that?
> > >>>>>
> > >>>>> If I hack into my Kafka streams code to force it to use
> LogAppendTime
> > >>>> seems
> > >>>>> to solve my problem, but that appears to
> > >>>>> take a huge toll on the brokers. Throughput plummets, and I don't
> > >>> really
> > >>>>> know why.
> > >>>>>
> > >>>>> Any ideas?
> > >>>>>
> > >>>>> Frank
> > >>>>>
> > >>>>
> > >>>>
> > >>>
> > >>
> > >>
> > >>
> > >> --
> > >> -- Guozhang
> > >>
> > >
> >
> >
>



-- 
-- Guozhang

Reply via email to