@Matthias J. Sax / All Have added below line :
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) > > Here is the output : (for uuid (*2cbef750-325b-4a2f-ac39-b2c23fa0313f)*, expecting single output but that is not the case here. Which 1 is the final output from those 2 rows for the same uuid ? [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=1, sum=10.0, > strTime=2022-05-19 11:48:08.128, uuid=fb6bea5f-8fd0-4c03-8df3-aaf392f04a5a) [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2, sum=20.0, > strTime=2022-05-19 11:48:10.328, uuid=b4ab837f-b10a-452d-a663-719215d2992f) > [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3, sum=30.0, > strTime=2022-05-19 11:48:12.527, uuid=8fa1b621-c967-4770-9f85-9fd84999c97c) > [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=4, sum=40.0, > strTime=2022-05-19 11:48:14.726, uuid=1fc21253-7859-45ef-969e-82ed596c4fa0) > [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=5, sum=50.0, > strTime=2022-05-19 11:48:16.925, uuid= > *2cbef750-325b-4a2f-ac39-b2c23fa0313f)* > [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=4, sum=40.0, > strTime=2022-05-19 11:48:16.925, uuid= > *2cbef750-325b-4a2f-ac39-b2c23fa0313f*) > [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=5, sum=50.0, > strTime=2022-05-19 11:48:19.124, uuid=8f72454c-ec02-42c1-84e1-bcf262db436f) > [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=4, sum=40.0, > strTime=2022-05-19 11:48:19.124, uuid=8f72454c-ec02-42c1-84e1-bcf262db436f) > [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=5, sum=50.0, > strTime=2022-05-19 11:48:21.322, uuid=e4e73232-dbd4-45b4-aae9-f3bd2c6e54b4) > [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=4, sum=40.0, > strTime=2022-05-19 11:48:21.322, uuid=e4e73232-dbd4-45b4-aae9-f3bd2c6e54b4) > [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=5, sum=50.0, > strTime=2022-05-19 11:48:23.522, uuid=fc49906e-54d0-4e80-b394-3ebaf6e74eaa) > [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=4, sum=40.0, > strTime=2022-05-19 11:48:23.522, uuid=fc49906e-54d0-4e80-b394-3ebaf6e74eaa) > [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=5, sum=50.0, > strTime=2022-05-19 11:48:25.721, uuid=fbe62fa4-e7c4-437f-b976-0bb7ae0c4390) On Wed, May 18, 2022 at 10:21 PM Matthias J. Sax <mj...@apache.org> wrote: > Emitting intermediate result is by-design. > > If you don't want to get intermediate result, you can add `suppress()` > after the aggregation and configure it to only "emit on window close". > > -Matthias > > On 5/17/22 3:20 AM, Shankar Mane wrote: > > Hi All, > > > > Our use case is to use sliding window. (for e.g. at any point, whenever > >> user performs any actions at time [ t1 ], we would like to see his > activity > >> in [ t1 - last 24 hours]. Using this, to show the user some > recommendations. > > > > > > > > -- I have code ready and it works without any errors. > > -- aggregations happen as expected. > > -- but the output generated is unexpected. As windows gets slides, i am > > getting mixed output which includes intermediate aggregated records also > > coming with final aggregated outputs. > > > > Could someone please help me here ? what can I do here to get ONLY final > > aggregated output. > > > > > > Code snippet : > > ________________________________________________________________ > > > > > > > > builder.stream(tempReadingTopic, Consumed.with(stringSerde, inputSerde)) > > .filter((k, v) -> v != null) > > .map((k,v) -> KeyValue.pair(v.getUserId(), v)) > > //.through("slidingbykey", > > Produced.with(Serdes.String(), inputSerde)) > > .groupByKey() > > > > > .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000), > > windowDuration)) > > .aggregate(OutputPojo::new, (k, tr, out) -> { > > out.setUserId(tr.getUserId()); > > out.setCount(out.getCount() +1); > > out.setSum(out.getSum() + tr.getInt4()); > > out.setUuid(tr.getUuid()); > > > > out.setStrTime(TimestampLongToString.getTime(tr.getTimestamp())); > > waitForMs(200); //added delay just for analysing > output > > return out; > > }, Materialized.with(stringSerde, outputSerde)) > > .suppress(Suppressed.untilTimeLimit(windowDuration, > > Suppressed.BufferConfig.unbounded())) > > .toStream() > > .map((Windowed<String> key, OutputPojo out) -> { > > return new KeyValue<>(key.key(),out) ; > > }) > > .print(Printed.toSysOut()); > > // .to(aveTempOutputTopic, Produced.with(stringSerde, > > outputSerde)) > > ; > > > > > > > > ________________________________________________________________ > > > > > > Input data : > > > > for i in {1..10}; do sleep 1s;python3 del.py 1001 10;sleep 1s; done > >> {'userId': '1001', 'timestamp': 1652781716234, 'int4': 10, 'uuid': > >> '64f019ee-9cf4-427d-b4c9-f2b5f88820e1'} > >> {'userId': '1001', 'timestamp': 1652781718436, 'int4': 10, 'uuid': > >> 'cf173b3e-c34f-470a-ba15-ef648d0be8b9'} > >> {'userId': '1001', 'timestamp': 1652781720634, 'int4': 10, 'uuid': > >> '48d2b4ea-052d-42fa-a998-0216d928c034'} > >> {'userId': '1001', 'timestamp': 1652781722832, 'int4': 10, 'uuid': > >> '55a6c26c-3d2c-46f1-ab3c-04927f660cbe'} > >> {'userId': '1001', 'timestamp': 1652781725029, 'int4': 10, 'uuid': > >> 'dbfd8cee-565d-496b-b5a8-773ae64bc518'} > >> {'userId': '1001', 'timestamp': 1652781727227, 'int4': 10, 'uuid': > >> '135dc5cd-50cb-467b-9e63-300fdeedaf75'} > >> {'userId': '1001', 'timestamp': 1652781729425, 'int4': 10, 'uuid': > >> '66d8e3c7-8f63-43ca-acf1-e39619bf33a0'} > >> {'userId': '1001', 'timestamp': 1652781731623, 'int4': 10, 'uuid': > >> 'f037712b-42a5-4449-bcc2-cf6eafddf5ad'} > >> {'userId': '1001', 'timestamp': 1652781733820, 'int4': 10, 'uuid': > >> '7baa4254-b9da-43dc-bbb7-4caede578aeb'} > >> {'userId': '1001', 'timestamp': 1652781736018, 'int4': 10, 'uuid': > >> '16541989-f3ba-49f6-bd31-bf8a75ba8eac'} > > > > > > ________________________________________________________________ > > > > > > Output (*Unexpected*) : below output is captured at each sliding window > of > > 1s duration (but input data is published at 2s of interval) : > > > > [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3, > sum=30.0, > >> strTime=2022-05-17 15:31:28.263, > >> uuid=966277fd-3bdc-45c2-aa6a-c9c0dbe89c34) ----> seems older UUID > >> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=1, > sum=10.0, > >> strTime=2022-05-17 15:31:28.263, > uuid=966277fd-3bdc-45c2-aa6a-c9c0dbe89c34) > >> > >> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=1, > sum=10.0, > >> strTime=2022-05-17 15:31:56.234, > uuid=64f019ee-9cf4-427d-b4c9-f2b5f88820e1) > >> > >> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2, > sum=20.0, > >> strTime=2022-05-17 15:31:58.436, > uuid=cf173b3e-c34f-470a-ba15-ef648d0be8b9) > >> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2, > sum=20.0, > >> strTime=2022-05-17 15:32:00.634, > uuid=48d2b4ea-052d-42fa-a998-0216d928c034) > >> > >> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3, > sum=30.0, > >> strTime=2022-05-17 15:32:00.634, > uuid=48d2b4ea-052d-42fa-a998-0216d928c034) > >> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2, > sum=20.0, > >> strTime=2022-05-17 15:32:02.832, > uuid=55a6c26c-3d2c-46f1-ab3c-04927f660cbe) > >> > >> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3, > sum=30.0, > >> strTime=2022-05-17 15:32:02.832, > uuid=55a6c26c-3d2c-46f1-ab3c-04927f660cbe) > >> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2, > sum=20.0, > >> strTime=2022-05-17 15:32:05.029, > uuid=dbfd8cee-565d-496b-b5a8-773ae64bc518) > >> > >> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3, > sum=30.0, > >> strTime=2022-05-17 15:32:05.029, > uuid=dbfd8cee-565d-496b-b5a8-773ae64bc518) > >> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2, > sum=20.0, > >> strTime=2022-05-17 15:32:07.227, > uuid=135dc5cd-50cb-467b-9e63-300fdeedaf75) > >> > >> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3, > sum=30.0, > >> strTime=2022-05-17 15:32:07.227, > uuid=135dc5cd-50cb-467b-9e63-300fdeedaf75) > >> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2, > sum=20.0, > >> strTime=2022-05-17 15:32:09.425, > uuid=66d8e3c7-8f63-43ca-acf1-e39619bf33a0) > >> > >> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3, > sum=30.0, > >> strTime=2022-05-17 15:32:09.425, > uuid=66d8e3c7-8f63-43ca-acf1-e39619bf33a0) > >> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2, > sum=20.0, > >> strTime=2022-05-17 15:32:11.623, > uuid=f037712b-42a5-4449-bcc2-cf6eafddf5ad) > >> > >> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3, > sum=30.0, > >> strTime=2022-05-17 15:32:11.623, > uuid=f037712b-42a5-4449-bcc2-cf6eafddf5ad) > >> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2, > sum=20.0, > >> strTime=2022-05-17 15:32:13.820, > uuid=7baa4254-b9da-43dc-bbb7-4caede578aeb) > >> > >> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3, > sum=30.0, > >> strTime=2022-05-17 15:32:13.820, > uuid=7baa4254-b9da-43dc-bbb7-4caede578aeb) > >> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2, > sum=20.0, > >> strTime=2022-05-17 15:32:16.018, > uuid=16541989-f3ba-49f6-bd31-bf8a75ba8eac) > > > > > > Regards, > > Shankar > > >