Re: Stream to KTable internals

2021-11-03 Thread Chad Preisler
I'm not sure. When I ran with trace logging turned on I saw a bunch of
messages like the ones below. Do those messages indicate
"enforced-processing"? It gets logged right after the call
to enforcedProcessingSensor.record.

Continuing to process although some partitions are empty on the broker.
There may be out-of-order processing for this task as a result. Partitions
with local data: [status-5]. Partitions we gave up waiting for, with their
corresponding deadlines: {event-5=1635881287722}. Configured
max.task.idle.ms: 2000. Current wall-clock time: 1635881287750.

Continuing to process although some partitions are empty on the broker.
There may be out-of-order processing for this task as a result. Partitions
with local data: [event-5]. Partitions we gave up waiting for, with their
corresponding deadlines: {status-5=1635881272754}. Configured
max.task.idle.ms: 2000. Current wall-clock time: 1635881277998.

On Wed, Nov 3, 2021 at 6:11 PM Matthias J. Sax  wrote:

> Can you check if the program ever does "enforced processing", ie,
> `max.task.idle.ms` passed, and we process despite an empty input buffer.
>
> Cf https://kafka.apache.org/documentation/#kafka_streams_task_monitoring
>
> As long as there is input data, we should never do "enforced processing"
> and the metric should stay at zero.
>
>
> -Matthias
>
> On 11/3/21 2:41 PM, Chad Preisler wrote:
> > Just a quick update. Setting max.task.idle.ms to 1 (10 seconds) had
> no
> > effect on this issue.
> >
> > On Tue, Nov 2, 2021 at 6:55 PM Chad Preisler 
> > wrote:
> >
> >> No unfortunately it is not the case. The table record is written about
> 20
> >> seconds before the stream record. I’ll crank up the time tomorrow and
> see
> >> what happens.
> >>
> >> On Tue, Nov 2, 2021 at 6:24 PM Matthias J. Sax 
> wrote:
> >>
> >>> Hard to tell, but as it seems that you can reproduce the issue, it
> might
> >>> be worth a try to increase the idle time further.
> >>>
> >>> I guess one corner case for stream-table join that is not resolved yet
> >>> is when stream and table record have the same timestamp... For this
> >>> case, the table record might not be processed first.
> >>>
> >>> Could you hit this case?
> >>>
> >>>
> >>> -Matthias
> >>>
> >>> On 11/2/21 3:13 PM, Chad Preisler wrote:
>  Thank you for the information. We are using the Kafka 3.0 client
> >>> library.
>  We are able to reliably reproduce this issue in our test environment
> >>> now. I
>  removed my timestamp extractor, and I set the max.task.idle.ms to
> >>> 2000. I
>  also turned on trace logging for package
>  org.apache.kafka.streams.processor.internals.
> 
>  To create the issue we stopped the application and ran enough data to
>  create a lag of 400 messages. We saw 5 missed joins.
> 
>    From the stream-thread log messages we saw the event message, our
> >>> stream
>  missed the join, and then several milliseconds later we saw the
>  stream-thread print out the status message. The stream-thread printed
> >>> out
>  our status message a total of 5 times.
> 
>  Given that only a few milliseconds passed between missing the join and
> >>> the
>  stream-thread printing the status message, would increasing the
>  max.task.idle.ms help?
> 
>  Thanks,
>  Chad
> 
>  On Mon, Nov 1, 2021 at 10:03 PM Matthias J. Sax 
> >>> wrote:
> 
> > Timestamp synchronization is not perfect, and as a matter of fact, we
> > fixed a few gaps in 3.0.0 release. We actually hope, that we closed
> the
> > last gaps in 3.0.0... *fingers-crossed* :)
> >
> >> We are using a timestamp extractor that returns 0.
> >
> > You can do this, and it effectively "disables" timestamp
> >>> synchronization
> > as records on the KTable side don't have a timeline any longer. As a
> > side effect it also allows you to "bootstrap" the table, as records
> >>> with
> > timestamp zero will always be processed first (as they are smaller).
> Of
> > course, you also don't have time synchronization for "future" data
> and
> > your program becomes non-deterministic if you reprocess old data.
> >
> >> his seemed to be the only
> >> way to bootstrap enough records at startup to avoid the missed join.
> >
> > Using 3.0.0 and enabling timestamp synchronization via
> > `max.task.idle.ms` config, should allow you to get the correct
> >>> behavior
> > without the zero-extractor (of course, your KTable data must have
> > smaller timestamps that your KStream data).
> >
> >> If I use "timestamp synchronization" do I have to remove the zero
> >> timestamp extractor? If I remove the zero timestamp extractor will
> >> timestamp synchronization take care of the missed join issue on
> >>> startup?
> >
> > To be more precise: timestamp synchronization is _always_ on. The
> > question is just how strict it is applied. By default, we do the
> >>> weakest
> > from

Re: Stream to KTable internals

2021-11-03 Thread Matthias J. Sax
Can you check if the program ever does "enforced processing", ie, 
`max.task.idle.ms` passed, and we process despite an empty input buffer.


Cf https://kafka.apache.org/documentation/#kafka_streams_task_monitoring

As long as there is input data, we should never do "enforced processing" 
and the metric should stay at zero.



-Matthias

On 11/3/21 2:41 PM, Chad Preisler wrote:

Just a quick update. Setting max.task.idle.ms to 1 (10 seconds) had no
effect on this issue.

On Tue, Nov 2, 2021 at 6:55 PM Chad Preisler 
wrote:


No unfortunately it is not the case. The table record is written about 20
seconds before the stream record. I’ll crank up the time tomorrow and see
what happens.

On Tue, Nov 2, 2021 at 6:24 PM Matthias J. Sax  wrote:


Hard to tell, but as it seems that you can reproduce the issue, it might
be worth a try to increase the idle time further.

I guess one corner case for stream-table join that is not resolved yet
is when stream and table record have the same timestamp... For this
case, the table record might not be processed first.

Could you hit this case?


-Matthias

On 11/2/21 3:13 PM, Chad Preisler wrote:

Thank you for the information. We are using the Kafka 3.0 client

library.

We are able to reliably reproduce this issue in our test environment

now. I

removed my timestamp extractor, and I set the max.task.idle.ms to

2000. I

also turned on trace logging for package
org.apache.kafka.streams.processor.internals.

To create the issue we stopped the application and ran enough data to
create a lag of 400 messages. We saw 5 missed joins.

  From the stream-thread log messages we saw the event message, our

stream

missed the join, and then several milliseconds later we saw the
stream-thread print out the status message. The stream-thread printed

out

our status message a total of 5 times.

Given that only a few milliseconds passed between missing the join and

the

stream-thread printing the status message, would increasing the
max.task.idle.ms help?

Thanks,
Chad

On Mon, Nov 1, 2021 at 10:03 PM Matthias J. Sax 

wrote:



Timestamp synchronization is not perfect, and as a matter of fact, we
fixed a few gaps in 3.0.0 release. We actually hope, that we closed the
last gaps in 3.0.0... *fingers-crossed* :)


We are using a timestamp extractor that returns 0.


You can do this, and it effectively "disables" timestamp

synchronization

as records on the KTable side don't have a timeline any longer. As a
side effect it also allows you to "bootstrap" the table, as records

with

timestamp zero will always be processed first (as they are smaller). Of
course, you also don't have time synchronization for "future" data and
your program becomes non-deterministic if you reprocess old data.


his seemed to be the only
way to bootstrap enough records at startup to avoid the missed join.


Using 3.0.0 and enabling timestamp synchronization via
`max.task.idle.ms` config, should allow you to get the correct

behavior

without the zero-extractor (of course, your KTable data must have
smaller timestamps that your KStream data).


If I use "timestamp synchronization" do I have to remove the zero
timestamp extractor? If I remove the zero timestamp extractor will
timestamp synchronization take care of the missed join issue on

startup?


To be more precise: timestamp synchronization is _always_ on. The
question is just how strict it is applied. By default, we do the

weakest

from which is only best effort.


I'm guessing the issue here is that occasionally the poll request is

not

returning the matching record for the KTable side of the join before

the

task goes off and starts processing records.


Yes, because of default best effort approach. That is why you should
increase `max.task.idle.ms` to detect this case and "skip" processing
and let KS do another poll() to get KTable data.

2.8 and earlier:

max.task.idle.ms=0 -> best effort (no poll() retry)
max.task.idle.ms>0 -> try to do another poll() until data is there or
idle time passed

Note: >0 might still "fail" even if there is data, because consumer
fetch behavior is not predictable.


3.0:

max.task.idle.ms=-1 -> best effort (no poll() retry)
max.task.idle.ms=0 -> if there is data broker side, repeat to poll()
until you get the data
max.task.idle.ms>0 -> even if there is not data broker side, wait

until

data becomes available or the idle time passed


Hope this helps.


-Matthias

On 11/1/21 4:29 PM, Guozhang Wang wrote:

Hello Chad,

   From your earlier comment, you mentioned "In my scenario the records

were

written to the KTable topic before the record was written to the

KStream

topic." So I think Matthias and others have excluded this possibility

while

trying to help investigate.

If only the matching records from KStream are returned via a single a
consumer poll call but not the other records from KTable, then you

would

miss this matched join result.

Guozhang


On Sun, Oct 31, 2021 at 7:28 AM Chad Preisler <

chad.preis.

Re: Stream to KTable internals

2021-11-03 Thread Chad Preisler
Just a quick update. Setting max.task.idle.ms to 1 (10 seconds) had no
effect on this issue.

On Tue, Nov 2, 2021 at 6:55 PM Chad Preisler 
wrote:

> No unfortunately it is not the case. The table record is written about 20
> seconds before the stream record. I’ll crank up the time tomorrow and see
> what happens.
>
> On Tue, Nov 2, 2021 at 6:24 PM Matthias J. Sax  wrote:
>
>> Hard to tell, but as it seems that you can reproduce the issue, it might
>> be worth a try to increase the idle time further.
>>
>> I guess one corner case for stream-table join that is not resolved yet
>> is when stream and table record have the same timestamp... For this
>> case, the table record might not be processed first.
>>
>> Could you hit this case?
>>
>>
>> -Matthias
>>
>> On 11/2/21 3:13 PM, Chad Preisler wrote:
>> > Thank you for the information. We are using the Kafka 3.0 client
>> library.
>> > We are able to reliably reproduce this issue in our test environment
>> now. I
>> > removed my timestamp extractor, and I set the max.task.idle.ms to
>> 2000. I
>> > also turned on trace logging for package
>> > org.apache.kafka.streams.processor.internals.
>> >
>> > To create the issue we stopped the application and ran enough data to
>> > create a lag of 400 messages. We saw 5 missed joins.
>> >
>> >  From the stream-thread log messages we saw the event message, our
>> stream
>> > missed the join, and then several milliseconds later we saw the
>> > stream-thread print out the status message. The stream-thread printed
>> out
>> > our status message a total of 5 times.
>> >
>> > Given that only a few milliseconds passed between missing the join and
>> the
>> > stream-thread printing the status message, would increasing the
>> > max.task.idle.ms help?
>> >
>> > Thanks,
>> > Chad
>> >
>> > On Mon, Nov 1, 2021 at 10:03 PM Matthias J. Sax 
>> wrote:
>> >
>> >> Timestamp synchronization is not perfect, and as a matter of fact, we
>> >> fixed a few gaps in 3.0.0 release. We actually hope, that we closed the
>> >> last gaps in 3.0.0... *fingers-crossed* :)
>> >>
>> >>> We are using a timestamp extractor that returns 0.
>> >>
>> >> You can do this, and it effectively "disables" timestamp
>> synchronization
>> >> as records on the KTable side don't have a timeline any longer. As a
>> >> side effect it also allows you to "bootstrap" the table, as records
>> with
>> >> timestamp zero will always be processed first (as they are smaller). Of
>> >> course, you also don't have time synchronization for "future" data and
>> >> your program becomes non-deterministic if you reprocess old data.
>> >>
>> >>> his seemed to be the only
>> >>> way to bootstrap enough records at startup to avoid the missed join.
>> >>
>> >> Using 3.0.0 and enabling timestamp synchronization via
>> >> `max.task.idle.ms` config, should allow you to get the correct
>> behavior
>> >> without the zero-extractor (of course, your KTable data must have
>> >> smaller timestamps that your KStream data).
>> >>
>> >>> If I use "timestamp synchronization" do I have to remove the zero
>> >>> timestamp extractor? If I remove the zero timestamp extractor will
>> >>> timestamp synchronization take care of the missed join issue on
>> startup?
>> >>
>> >> To be more precise: timestamp synchronization is _always_ on. The
>> >> question is just how strict it is applied. By default, we do the
>> weakest
>> >> from which is only best effort.
>> >>
>> >>> I'm guessing the issue here is that occasionally the poll request is
>> not
>> >>> returning the matching record for the KTable side of the join before
>> the
>> >>> task goes off and starts processing records.
>> >>
>> >> Yes, because of default best effort approach. That is why you should
>> >> increase `max.task.idle.ms` to detect this case and "skip" processing
>> >> and let KS do another poll() to get KTable data.
>> >>
>> >> 2.8 and earlier:
>> >>
>> >> max.task.idle.ms=0 -> best effort (no poll() retry)
>> >> max.task.idle.ms>0 -> try to do another poll() until data is there or
>> >> idle time passed
>> >>
>> >> Note: >0 might still "fail" even if there is data, because consumer
>> >> fetch behavior is not predictable.
>> >>
>> >>
>> >> 3.0:
>> >>
>> >> max.task.idle.ms=-1 -> best effort (no poll() retry)
>> >> max.task.idle.ms=0 -> if there is data broker side, repeat to poll()
>> >> until you get the data
>> >> max.task.idle.ms>0 -> even if there is not data broker side, wait
>> until
>> >> data becomes available or the idle time passed
>> >>
>> >>
>> >> Hope this helps.
>> >>
>> >>
>> >> -Matthias
>> >>
>> >> On 11/1/21 4:29 PM, Guozhang Wang wrote:
>> >>> Hello Chad,
>> >>>
>> >>>   From your earlier comment, you mentioned "In my scenario the records
>> >> were
>> >>> written to the KTable topic before the record was written to the
>> KStream
>> >>> topic." So I think Matthias and others have excluded this possibility
>> >> while
>> >>> trying to help investigate.
>> >>>
>> >>> If only the matching records from KStream are retur

Re: Kafka streams event deduplication keeping last event in window

2021-11-03 Thread Matthias J. Sax
You could do something similar to what the WindowStore does and store a 
key-timestamp pair as actual key. Given current wall-clock time, you can 
compute the time for closed windows and do corresponding lookups (either 
per key, or using range scans).


-Matthias

On 11/3/21 12:40 AM, Luigi Cerone wrote:

Hello Matthias, thanks for your reply.


Using a plain kv-store, whenever the punctuation runs you can find closed

windows, forward the result and also delete the row explicitly, which give
you more control.


What is the best way to find closed windows? Have you got any examples?

Thanks! :)

On 2021/11/02 23:34:33 "Matthias J. Sax" wrote:

I did not study your code snippet, but yes, it sounds like a valid
approach from your description.


How can I be sure that the start of the window will
coincide with the Punctuator's scheduled interval?


For punctuations, there is always some jitter, because it's not possible
to run a punctuation at the very exact point in time when it is
scheduled to run. Thus, a punctuation might fire a little delayed
anyway. You can also not control the "anchor point" directly, because it
depends on the point in time when you register the punctuation.

Also note, that a WindowedStore is basically still a key-value store, ie
a single key-value pair models one window. The main difference is the
timestamp that is use to expired rows eventually, what just implies that
expired rows are dropped (without any notification).

Thus, the only thing you can do is, to run the punctuation frequently
enough to keep latency low enough to detect windows that are logically
closed to forward the corresponding result.

But you cannot really "bind" the punctuation with the state store
expiration, and window-store also does not support deletes... Thus, I am
wondering if using a plain key-value store might be more useful for your
case? Using a plain kv-store, whenever the punctuation runs you can find
closed windows, forward the result and also delete the row explicitly,
which give you more control.

Hope this helps.

-Matthias

On 11/2/21 10:29 AM, Luigi Cerone wrote:

I'm using Kafka Streams in a deduplication events problem over short

time

windows (<= 1 minute).
First I've tried to tackle the problem by using DSL API with
[`.suppress(Suppressed.untilWindowCloses(...))`](


https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html
)

operator but, given the fact that wall-clock time is not yet supported
(I've seen the [KIP 424](


https://cwiki.apache.org/confluence/display/KAFKA/KIP-424%3A+Allow+suppression+of+intermediate+events+based+on+wall+clock+time)
),

this operator is not viable for my use case.

Then, I've followed this [official Confluent example](


https://kafka-tutorials.confluent.io/finding-distinct-events/confluent.html)

in which low level Processor API is used and it was working fine but has
one major limitation for my use-case. The single event (obtained by
deduplication) is emitted at the **beginning** of the time window,
subsequent duplicated events are "suppressed". In my use case I need the
reverse of that, meaning that a single event should be emitted at the

end

of the window.
I'm asking for suggestions on how to implement this use case with

Processor

API.

My idea was to use the Processor API with a custom [Transformer](


https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/Transformer.html
)

and a [Punctuator](


https://kafka.apache.org/24/javadoc/index.html?org/apache/kafka/streams/processor/Punctuator.html

).
The transformer would store in a [WindowStore](


https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/state/WindowStore.html
)

the distinct keys received without returning any KeyValue.

Simultaneously,

I'd schedule a punctuator running with an interval equal to the size of

the

window in the WindowStore. This punctuator will iterate over the

elements

in the store and forward them downstream.
The following are some core parts of the logic:

DeduplicationTransformer (slightly modified from [official Confluent
example](


https://kafka-tutorials.confluent.io/finding-distinct-events/confluent.html)

):
```java
  @Override
  @SuppressWarnings("unchecked")
  public void init(final ProcessorContext context) {
  this.context = context;
  eventIdStore = (WindowStore)
context.getStateStore(this.storeName);

  // Schedule punctuator for this transformer.
  context.schedule(Duration.ofMillis(this.windowSizeMs),
PunctuationType.WALL_CLOCK_TIME,
  new DeduplicationPunctuator(eventIdStore, context,
this.windowSizeMs));
  }

  @Override
  public KeyValue transform(final K key, final V value) {
  final E eventId = idExtractor.apply(key, value);
  if (eventId == null) {
  return KeyValue.pair(key, value);
  } else {
  if (!isDuplicate(eventId)) {
  rememberNewEvent(eventId, value, context.time

Re: Globalktable usage in a multi microservices application

2021-11-03 Thread Vincent Maurin

Hello

The idea behind a GlobalKTable is to materiliaze data (a kafka topic) 
close from where it is used. Actually, each task/worker will materialize 
the full GlobalKTable in order to use it. So in your scenario, what 
should be shared between your services is ideally the Kafka topic used 
to build the GlobalKTable.


Then, if you want your app2 to be independent from Kafka, and only 
dependent on a RPC endpoint, you must develop your own endpoint 
"serving" the GlobalKTable


Best,

Vincent

On 02/11/2021 18:54, Sigalit Eliazov wrote:

we have 2 microservices:

responsible for consuming message and update a globalKtable with
configuration information (key=id, value=myObject)

once it is activated with some id as an input it should look into the
configuration globalKtable and retrieve data by that id

how is it possible to access that globalktable from app2. should a RPC
layer needs to added to retrieve the info or KStream has some other
capabilities?

Thanks



RE: Re: Kafka streams event deduplication keeping last event in window

2021-11-03 Thread Luigi Cerone
Hello Matthias, thanks for your reply.

> Using a plain kv-store, whenever the punctuation runs you can find closed
windows, forward the result and also delete the row explicitly, which give
you more control.


What is the best way to find closed windows? Have you got any examples?

Thanks! :)

On 2021/11/02 23:34:33 "Matthias J. Sax" wrote:
> I did not study your code snippet, but yes, it sounds like a valid
> approach from your description.
>
> > How can I be sure that the start of the window will
> > coincide with the Punctuator's scheduled interval?
>
> For punctuations, there is always some jitter, because it's not possible
> to run a punctuation at the very exact point in time when it is
> scheduled to run. Thus, a punctuation might fire a little delayed
> anyway. You can also not control the "anchor point" directly, because it
> depends on the point in time when you register the punctuation.
>
> Also note, that a WindowedStore is basically still a key-value store, ie
> a single key-value pair models one window. The main difference is the
> timestamp that is use to expired rows eventually, what just implies that
> expired rows are dropped (without any notification).
>
> Thus, the only thing you can do is, to run the punctuation frequently
> enough to keep latency low enough to detect windows that are logically
> closed to forward the corresponding result.
>
> But you cannot really "bind" the punctuation with the state store
> expiration, and window-store also does not support deletes... Thus, I am
> wondering if using a plain key-value store might be more useful for your
> case? Using a plain kv-store, whenever the punctuation runs you can find
> closed windows, forward the result and also delete the row explicitly,
> which give you more control.
>
> Hope this helps.
>
> -Matthias
>
> On 11/2/21 10:29 AM, Luigi Cerone wrote:
> > I'm using Kafka Streams in a deduplication events problem over short
time
> > windows (<= 1 minute).
> > First I've tried to tackle the problem by using DSL API with
> > [`.suppress(Suppressed.untilWindowCloses(...))`](
> >
https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html
)
> > operator but, given the fact that wall-clock time is not yet supported
> > (I've seen the [KIP 424](
> >
https://cwiki.apache.org/confluence/display/KAFKA/KIP-424%3A+Allow+suppression+of+intermediate+events+based+on+wall+clock+time)
),
> > this operator is not viable for my use case.
> >
> > Then, I've followed this [official Confluent example](
> >
https://kafka-tutorials.confluent.io/finding-distinct-events/confluent.html)
> > in which low level Processor API is used and it was working fine but has
> > one major limitation for my use-case. The single event (obtained by
> > deduplication) is emitted at the **beginning** of the time window,
> > subsequent duplicated events are "suppressed". In my use case I need the
> > reverse of that, meaning that a single event should be emitted at the
end
> > of the window.
> > I'm asking for suggestions on how to implement this use case with
Processor
> > API.
> >
> > My idea was to use the Processor API with a custom [Transformer](
> >
https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/Transformer.html
)
> > and a [Punctuator](
> >
https://kafka.apache.org/24/javadoc/index.html?org/apache/kafka/streams/processor/Punctuator.html
> > ).
> > The transformer would store in a [WindowStore](
> >
https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/state/WindowStore.html
)
> > the distinct keys received without returning any KeyValue.
Simultaneously,
> > I'd schedule a punctuator running with an interval equal to the size of
the
> > window in the WindowStore. This punctuator will iterate over the
elements
> > in the store and forward them downstream.
> > The following are some core parts of the logic:
> >
> > DeduplicationTransformer (slightly modified from [official Confluent
> > example](
> >
https://kafka-tutorials.confluent.io/finding-distinct-events/confluent.html)
> > ):
> > ```java
> >  @Override
> >  @SuppressWarnings("unchecked")
> >  public void init(final ProcessorContext context) {
> >  this.context = context;
> >  eventIdStore = (WindowStore)
> > context.getStateStore(this.storeName);
> >
> >  // Schedule punctuator for this transformer.
> >  context.schedule(Duration.ofMillis(this.windowSizeMs),
> > PunctuationType.WALL_CLOCK_TIME,
> >  new DeduplicationPunctuator(eventIdStore, context,
> > this.windowSizeMs));
> >  }
> >
> >  @Override
> >  public KeyValue transform(final K key, final V value) {
> >  final E eventId = idExtractor.apply(key, value);
> >  if (eventId == null) {
> >  return KeyValue.pair(key, value);
> >  } else {
> >  if (!isDuplicate(eventId)) {
> >  rememberNewEvent(eventId, value, context.timestamp());
> >  }
> >  return null;
> 

Re: Kafka streams event deduplication keeping last event in window

2021-11-03 Thread Luigi Cerone
Hello Matthias, thanks for your reply.

> Using a plain kv-store, whenever the punctuation runs you can find closed
windows, forward the result and also delete the row explicitly, which give
you more control.


What is the best way to find closed windows? Have you got any examples?

Thanks! :)

Il giorno mer 3 nov 2021 alle ore 00:34 Matthias J. Sax 
ha scritto:

> I did not study your code snippet, but yes, it sounds like a valid
> approach from your description.
>
> > How can I be sure that the start of the window will
> > coincide with the Punctuator's scheduled interval?
>
> For punctuations, there is always some jitter, because it's not possible
> to run a punctuation at the very exact point in time when it is
> scheduled to run. Thus, a punctuation might fire a little delayed
> anyway. You can also not control the "anchor point" directly, because it
> depends on the point in time when you register the punctuation.
>
> Also note, that a WindowedStore is basically still a key-value store, ie
> a single key-value pair models one window. The main difference is the
> timestamp that is use to expired rows eventually, what just implies that
> expired rows are dropped (without any notification).
>
> Thus, the only thing you can do is, to run the punctuation frequently
> enough to keep latency low enough to detect windows that are logically
> closed to forward the corresponding result.
>
> But you cannot really "bind" the punctuation with the state store
> expiration, and window-store also does not support deletes... Thus, I am
> wondering if using a plain key-value store might be more useful for your
> case? Using a plain kv-store, whenever the punctuation runs you can find
> closed windows, forward the result and also delete the row explicitly,
> which give you more control.
>
> Hope this helps.
>
> -Matthias
>
> On 11/2/21 10:29 AM, Luigi Cerone wrote:
> > I'm using Kafka Streams in a deduplication events problem over short time
> > windows (<= 1 minute).
> > First I've tried to tackle the problem by using DSL API with
> > [`.suppress(Suppressed.untilWindowCloses(...))`](
> >
> https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html
> )
> > operator but, given the fact that wall-clock time is not yet supported
> > (I've seen the [KIP 424](
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-424%3A+Allow+suppression+of+intermediate+events+based+on+wall+clock+time)
> ),
> > this operator is not viable for my use case.
> >
> > Then, I've followed this [official Confluent example](
> >
> https://kafka-tutorials.confluent.io/finding-distinct-events/confluent.html
> )
> > in which low level Processor API is used and it was working fine but has
> > one major limitation for my use-case. The single event (obtained by
> > deduplication) is emitted at the **beginning** of the time window,
> > subsequent duplicated events are "suppressed". In my use case I need the
> > reverse of that, meaning that a single event should be emitted at the end
> > of the window.
> > I'm asking for suggestions on how to implement this use case with
> Processor
> > API.
> >
> > My idea was to use the Processor API with a custom [Transformer](
> >
> https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/Transformer.html
> )
> > and a [Punctuator](
> >
> https://kafka.apache.org/24/javadoc/index.html?org/apache/kafka/streams/processor/Punctuator.html
> > ).
> > The transformer would store in a [WindowStore](
> >
> https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/state/WindowStore.html
> )
> > the distinct keys received without returning any KeyValue.
> Simultaneously,
> > I'd schedule a punctuator running with an interval equal to the size of
> the
> > window in the WindowStore. This punctuator will iterate over the elements
> > in the store and forward them downstream.
> > The following are some core parts of the logic:
> >
> > DeduplicationTransformer (slightly modified from [official Confluent
> > example](
> >
> https://kafka-tutorials.confluent.io/finding-distinct-events/confluent.html
> )
> > ):
> > ```java
> >  @Override
> >  @SuppressWarnings("unchecked")
> >  public void init(final ProcessorContext context) {
> >  this.context = context;
> >  eventIdStore = (WindowStore)
> > context.getStateStore(this.storeName);
> >
> >  // Schedule punctuator for this transformer.
> >  context.schedule(Duration.ofMillis(this.windowSizeMs),
> > PunctuationType.WALL_CLOCK_TIME,
> >  new DeduplicationPunctuator(eventIdStore, context,
> > this.windowSizeMs));
> >  }
> >
> >  @Override
> >  public KeyValue transform(final K key, final V value) {
> >  final E eventId = idExtractor.apply(key, value);
> >  if (eventId == null) {
> >  return KeyValue.pair(key, value);
> >  } else {
> >  if (!isDuplicate(eventId)) {
> >  rememberNewEvent(eventId, value, context.timestamp

Re: [kafka-clients] [VOTE] 2.7.2 RC0

2021-11-03 Thread Manikumar
Hi,

+1 (binding)

- verified the signatures
- verified the quickstart with binary

Thanks for running the release!

Thanks,
Manikumar

On Tue, Nov 2, 2021 at 11:16 PM Mickael Maison  wrote:

> Bumping the thread.
>
> Contributors, committers and PMC, please take some time to test this
> release candidate and vote.
>
> Thanks,
> Mickael
>
> On Tue, Oct 26, 2021 at 7:38 PM Israel Ekpo  wrote:
> >
> > Thanks Bill. That is greatly appreciated :)
> >
> > We need more PMC members with binding votes to participate.
> >
> > You can do it!
> >
> > On Tue, Oct 26, 2021 at 1:25 PM Bill Bejeck  wrote:
> >>
> >> Hi Mickael,
> >>
> >> Thanks for running the release.
> >>
> >> Steps taken
> >>
> >> Validated checksums
> >> Validated signatures
> >> Built from source
> >> Ran all the unit tests
> >> Spot checked various JavaDocs
> >>
> >>
> >> +1(binding)
> >>
> >> On Tue, Oct 26, 2021 at 4:43 AM Luke Chen  wrote:
> >>>
> >>> Hi Mickael,
> >>>
> >>> Thanks for the release. I did:
> >>> 1. Verified checksums and signatures
> >>> 2. Run quick start steps
> >>> 3. Verified the CVE-2021-38153 is indeed fixed in kafka-2.7.2-src.tgz
> >>>  >.
> >>>
> >>> +1 (non-binding)
> >>>
> >>> Thank you.
> >>> Luke
> >>>
> >>> On Tue, Oct 26, 2021 at 3:41 PM Tom Bentley 
> wrote:
> >>>
> >>> > Hi Mickael,
> >>> >
> >>> > As with 2.6.3 RC0, I have:
> >>> >
> >>> > * Verified checksums and signatures
> >>> > * Built jars and docs from the source jar
> >>> > * Run the unit and integration tests
> >>> >
> >>> > +1 non-binding
> >>> >
> >>> > Kind regards,
> >>> >
> >>> > Tom
> >>> >
> >>> > On Sun, Oct 24, 2021 at 3:05 PM Israel Ekpo 
> wrote:
> >>> >
> >>> > > Mickael,
> >>> > >
> >>> > > Do we need to do another RC? Were there issues with this release?
> >>> > >
> >>> > > What happens next?
> >>> > >
> >>> > >
> >>> > > On Sat, Oct 16, 2021 at 8:11 PM Israel Ekpo 
> >>> > wrote:
> >>> > >
> >>> > > >
> >>> > > > I have performed the following checks
> >>> > > >
> >>> > > > Validation of Release Artifacts Cryptographic Hashes (ASC MD5
> SHA1
> >>> > > SHA512)
> >>> > > > PGP Signatures used to sign the release artifacts
> >>> > > > Javadocs check
> >>> > > > Site docs check was not necessary
> >>> > > > Jenkins build was successful.
> >>> > > >
> >>> > > > I used the steps here for the first two checks
> >>> > > > https://github.com/izzyacademy/apache-kafka-release-party
> >>> > > >
> >>> > > > I vote +1 on this RC
> >>> > > >
> >>> > > >
> >>> > > > On Fri, Oct 15, 2021 at 12:11 PM Israel Ekpo <
> israele...@gmail.com>
> >>> > > wrote:
> >>> > > >
> >>> > > >> Hi Mickael,
> >>> > > >>
> >>> > > >> I am pretty surprised that there are no votes so far on the RCs
> and
> >>> > the
> >>> > > >> deadline has already passed.
> >>> > > >>
> >>> > > >> I am running my checks right now using the process outlined here
> >>> > > >>
> >>> > > >>
> >>> > > >>
> >>> > >
> >>> >
> https://github.com/izzyacademy/apache-kafka-release-party#how-to-validate-apache-kafka-release-candidates
> >>> > > >>
> >>> > > >> I will post my results and vote as soon as they are completed.
> >>> > > >>
> >>> > > >> On Fri, Oct 15, 2021 at 9:52 AM Mickael Maison <
> mimai...@apache.org>
> >>> > > >> wrote:
> >>> > > >>
> >>> > > >>> Successful Jenkins build:
> >>> > > >>> https://ci-builds.apache.org/job/Kafka/job/kafka-2.7-jdk8/181/
> >>> > > >>>
> >>> > > >>> On Wed, Oct 13, 2021 at 6:47 PM Mickael Maison <
> mimai...@apache.org>
> >>> > > >>> wrote:
> >>> > > >>> >
> >>> > > >>> > Hi Israel,
> >>> > > >>> >
> >>> > > >>> > Our tooling generates the same template for all types of
> releases.
> >>> > > >>> >
> >>> > > >>> > For bugfix releases, the site docs and javadocs don't
> typically
> >>> > > >>> > require extensive validation.
> >>> > > >>> > It's still a good idea to open them up and check a few pages
> to
> >>> > > >>> > validate they look right.
> >>> > > >>> >
> >>> > > >>> > For this release, as you've mentioned, site docs have not
> changed.
> >>> > > >>> >
> >>> > > >>> > Thanks
> >>> > > >>> >
> >>> > > >>> > On Wed, Oct 13, 2021 at 1:59 AM Israel Ekpo <
> israele...@gmail.com>
> >>> > > >>> wrote:
> >>> > > >>> > >
> >>> > > >>> > > Mickael,
> >>> > > >>> > >
> >>> > > >>> > > For patch or bug fix releases like this one, should we
> exclude
> >>> > the
> >>> > > >>> Javadocs and site docs if they have not changed?
> >>> > > >>> > >
> >>> > > >>> > > https://github.com/apache/kafka-site
> >>> > > >>> > >
> >>> > > >>> > > The site docs were last changed about 6 months ago and it
> appears
> >>> > > it
> >>> > > >>> may not have changed or needs validation
> >>> > > >>> > >
> >>> > > >>> > >
> >>> > > >>> > >
> >>> > > >>> > > On Tue, Oct 12, 2021 at 2:17 PM Mickael Maison <
> >>> > > mimai...@apache.org>
> >>> > > >>> wrote:
> >>> > > >>> > >>
> >>> > > >>> > >> Hello Kafka users, developers and client-developers,
> >>> > > >>> > >>
> >>> > > >>> > >> This is the first candidate for rel