Re: Dealing with noisy timestamps in kafka streams + 10.1

2017-07-17 Thread Guozhang Wang
Hello Greg,

Sorry for the long delay in responses, but very glad to see you have gone
this far to resolve the problem. All of your solutions make sense to me.

I think for the first problem your raised, it is a general problem not only
for Streams' sending changelog records but for any clients that needs to
send data to Kafka, that if the timestamp embedded with the record is wrong
it will cause the log rolling / retention to malfunction.

The second problem you raised is a good question for Streams specifically,
and to make it more general the question is whether we should use
processing timestamp or event timestamp when generating changelog records;
and as you already observed using event timestamp will cause reprocessing a
headache. We have been working towards better improving our time semantics
to cope with both semantics than enforcing users to pick one of the two,
and as a first step we have extended the punctuate function to allow both
event-time based punctuating as well as processing-time based (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics).
Will keep you posted whenever we make further steps tackling this.


Guozhang


On Fri, Jul 7, 2017 at 11:21 AM, Greg Fodor  wrote:

> After more digging, a good solution seems to be to set the topic timetstamp
> policy to LogAppendTime and also update our timestamp extractor to respect
> this (by just returning record.timestamp() if the timestamp type is set to
> LogAppendTime.) This gets us the semantics we want for the changelog topic
> while allowing CreateTime timestamps to land on the other topics.
>
> On Thu, Jul 6, 2017 at 9:32 PM, Greg Fodor  wrote:
>
> > I managed to answer some of my own questions :)
> >
> > For future google'ers:
> >
> > to deal with retention.ms see https://issues.apache.org/
> > jira/browse/KAFKA-4340
> > to deal with early rejection of bad timestamps the
> > message.timestamp.difference.max.ms config is relevant discussion here
> > https://issues.apache.org/jira/browse/KAFKA-5344
> >
> > In our case, we can live with setting the retention.ms during backfills.
> > Still would like to know if there are any better practices for dealing
> with
> > mis-stamped records during backills w/ state store topics.
> >
> >
> > On Thu, Jul 6, 2017 at 12:32 PM, Greg Fodor  wrote:
> >
> >> Hey all, we are currently working on migrating our system to kafka 10.2
> >> from 10.0 and one thing that we have hit that I wanted some advice on is
> >> dealing with the new log retention/rolling semantics that are based on
> >> timestamps.
> >>
> >> We send telemetry data from installed clients into kafka via kafka REST
> >> proxy and the timestamps we land the messages with are "create time"
> based
> >> that are timestamped on the sender side. We try to adjust for clock skew
> >> but this is not perfect and in practice we end up having some small
> subset
> >> of data landing into this topic with very erroneous timestamps (for
> >> example, some arrive with timestamps many years in the future.)
> >>
> >> The first problem we are hitting is that these corrupt timestamps now
> >> influence log segment rolling. For example, when reprocessing the entire
> >> log, we end up seeing a bunch of segment files generated for state
> stores
> >> changelogs in kafka streams that store these events since as corrupted
> >> timestamps come in a single one can trigger a segment roll if they are
> >> timestamped far in the future due to the new heuristics. The result is
> we
> >> end up with hundreds of small segment files (which actually in our
> current
> >> configuration ends up causing kafka to run out of memory, but that's
> >> another story :))
> >>
> >> The second problem we are hitting is when reprocessing the full log,
> >> since these timestamps are in the past as we run from the beginning, if
> we
> >> have a time based retention policy set on the state store changelog
> topic
> >> (say, a week) kafka ends up just deleting segments immediately since the
> >> timestamps are far in the past and the segments are considered expired.
> >> Previously this worked fine during reprocessing since the state store
> >> changelogs were just going to get deleted a week after the reprocess job
> >> ran since the retention policy was based upon segment file timestamp.
> >>
> >> Both of these problems could potentially be compensated for by writing a
> >> clever timestamp extractor that tried to a) normalize timestamps that
> >> appear very skewed and b) for changelog entries, extract a "logged at"
> >> instead of "created at" timestamp when landing into the state store
> >> changelog. The second problem could also be addressed by temporarily
> >> changing the topic configuration during a reprocess to prevent "old" log
> >> segments from being deleted. Neither of these seem ideal.
> >>
> >> I wanted to know if there are any recommendations on how to deal with
> >> this -- it 

Re: Dealing with noisy timestamps in kafka streams + 10.1

2017-07-07 Thread Greg Fodor
After more digging, a good solution seems to be to set the topic timetstamp
policy to LogAppendTime and also update our timestamp extractor to respect
this (by just returning record.timestamp() if the timestamp type is set to
LogAppendTime.) This gets us the semantics we want for the changelog topic
while allowing CreateTime timestamps to land on the other topics.

On Thu, Jul 6, 2017 at 9:32 PM, Greg Fodor  wrote:

> I managed to answer some of my own questions :)
>
> For future google'ers:
>
> to deal with retention.ms see https://issues.apache.org/
> jira/browse/KAFKA-4340
> to deal with early rejection of bad timestamps the
> message.timestamp.difference.max.ms config is relevant discussion here
> https://issues.apache.org/jira/browse/KAFKA-5344
>
> In our case, we can live with setting the retention.ms during backfills.
> Still would like to know if there are any better practices for dealing with
> mis-stamped records during backills w/ state store topics.
>
>
> On Thu, Jul 6, 2017 at 12:32 PM, Greg Fodor  wrote:
>
>> Hey all, we are currently working on migrating our system to kafka 10.2
>> from 10.0 and one thing that we have hit that I wanted some advice on is
>> dealing with the new log retention/rolling semantics that are based on
>> timestamps.
>>
>> We send telemetry data from installed clients into kafka via kafka REST
>> proxy and the timestamps we land the messages with are "create time" based
>> that are timestamped on the sender side. We try to adjust for clock skew
>> but this is not perfect and in practice we end up having some small subset
>> of data landing into this topic with very erroneous timestamps (for
>> example, some arrive with timestamps many years in the future.)
>>
>> The first problem we are hitting is that these corrupt timestamps now
>> influence log segment rolling. For example, when reprocessing the entire
>> log, we end up seeing a bunch of segment files generated for state stores
>> changelogs in kafka streams that store these events since as corrupted
>> timestamps come in a single one can trigger a segment roll if they are
>> timestamped far in the future due to the new heuristics. The result is we
>> end up with hundreds of small segment files (which actually in our current
>> configuration ends up causing kafka to run out of memory, but that's
>> another story :))
>>
>> The second problem we are hitting is when reprocessing the full log,
>> since these timestamps are in the past as we run from the beginning, if we
>> have a time based retention policy set on the state store changelog topic
>> (say, a week) kafka ends up just deleting segments immediately since the
>> timestamps are far in the past and the segments are considered expired.
>> Previously this worked fine during reprocessing since the state store
>> changelogs were just going to get deleted a week after the reprocess job
>> ran since the retention policy was based upon segment file timestamp.
>>
>> Both of these problems could potentially be compensated for by writing a
>> clever timestamp extractor that tried to a) normalize timestamps that
>> appear very skewed and b) for changelog entries, extract a "logged at"
>> instead of "created at" timestamp when landing into the state store
>> changelog. The second problem could also be addressed by temporarily
>> changing the topic configuration during a reprocess to prevent "old" log
>> segments from being deleted. Neither of these seem ideal.
>>
>> I wanted to know if there are any recommendations on how to deal with
>> this -- it seems like there is a conflict between having segment file
>> policies be based on message timestamps and also having message timestamps
>> be based on application creation time, since origin create time can often
>> be subject to noise/skew/errors. One potential path forward would be to be
>> able to have topic-specific settings for log rolling (including the ability
>> to use the legacy behavior for retention that relies upon filesystem
>> timestamps) but I am sure there are problems with this proposal.
>>
>> In general, I don't really feel like I have a good sense of what a
>> correct solution would be, other than messages always having two timestamps
>> and being able to have control over which timestamp is authoritative for
>> log segment management policies, but that obviously seems like something
>> that was considered and rejected for KIP-32 already.
>>
>
>


Re: Dealing with noisy timestamps in kafka streams + 10.1

2017-07-06 Thread Greg Fodor
I managed to answer some of my own questions :)

For future google'ers:

to deal with retention.ms see
https://issues.apache.org/jira/browse/KAFKA-4340
to deal with early rejection of bad timestamps the
message.timestamp.difference.max.ms config is relevant discussion here
https://issues.apache.org/jira/browse/KAFKA-5344

In our case, we can live with setting the retention.ms during backfills.
Still would like to know if there are any better practices for dealing with
mis-stamped records during backills w/ state store topics.


On Thu, Jul 6, 2017 at 12:32 PM, Greg Fodor  wrote:

> Hey all, we are currently working on migrating our system to kafka 10.2
> from 10.0 and one thing that we have hit that I wanted some advice on is
> dealing with the new log retention/rolling semantics that are based on
> timestamps.
>
> We send telemetry data from installed clients into kafka via kafka REST
> proxy and the timestamps we land the messages with are "create time" based
> that are timestamped on the sender side. We try to adjust for clock skew
> but this is not perfect and in practice we end up having some small subset
> of data landing into this topic with very erroneous timestamps (for
> example, some arrive with timestamps many years in the future.)
>
> The first problem we are hitting is that these corrupt timestamps now
> influence log segment rolling. For example, when reprocessing the entire
> log, we end up seeing a bunch of segment files generated for state stores
> changelogs in kafka streams that store these events since as corrupted
> timestamps come in a single one can trigger a segment roll if they are
> timestamped far in the future due to the new heuristics. The result is we
> end up with hundreds of small segment files (which actually in our current
> configuration ends up causing kafka to run out of memory, but that's
> another story :))
>
> The second problem we are hitting is when reprocessing the full log, since
> these timestamps are in the past as we run from the beginning, if we have a
> time based retention policy set on the state store changelog topic (say, a
> week) kafka ends up just deleting segments immediately since the timestamps
> are far in the past and the segments are considered expired. Previously
> this worked fine during reprocessing since the state store changelogs were
> just going to get deleted a week after the reprocess job ran since the
> retention policy was based upon segment file timestamp.
>
> Both of these problems could potentially be compensated for by writing a
> clever timestamp extractor that tried to a) normalize timestamps that
> appear very skewed and b) for changelog entries, extract a "logged at"
> instead of "created at" timestamp when landing into the state store
> changelog. The second problem could also be addressed by temporarily
> changing the topic configuration during a reprocess to prevent "old" log
> segments from being deleted. Neither of these seem ideal.
>
> I wanted to know if there are any recommendations on how to deal with this
> -- it seems like there is a conflict between having segment file policies
> be based on message timestamps and also having message timestamps be based
> on application creation time, since origin create time can often be subject
> to noise/skew/errors. One potential path forward would be to be able to
> have topic-specific settings for log rolling (including the ability to use
> the legacy behavior for retention that relies upon filesystem timestamps)
> but I am sure there are problems with this proposal.
>
> In general, I don't really feel like I have a good sense of what a correct
> solution would be, other than messages always having two timestamps and
> being able to have control over which timestamp is authoritative for log
> segment management policies, but that obviously seems like something that
> was considered and rejected for KIP-32 already.
>