Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted
Sure John, I will document it. Thanks a lot for your reply. -- Jonathan On Tue, Mar 5, 2019 at 7:38 PM John Roesler wrote: > Hi Jonathan, > > Just a quick update: I have not been able to reproduce the duplicates issue > with the 2.2 RC, even with a topology very similar to the one you included > in your stackoverflow post. > > I think we should treat this as a new bug. Would you mind opening a new > Jira bug ticket with some steps to reproduce the problem, and also exactly > the behavior you observe? > > Thanks, > -John > > On Mon, Mar 4, 2019 at 10:41 PM John Roesler wrote: > > > Hi Jonathan, > > > > Sorry to hear that the feature is causing you trouble as well, and that > > the 2.2 release candidate didn't seem to fix it. > > > > I'll try and do a repro based on the code in your SO post tomorrow. > > > > I don't think it's related to the duplicates, but that shutdown error is > > puzzling. Can you print the topology (with topology.describe() ) ? This > > will tell us what is in task 1 (i.e., *1_*) of your program. > > > > Thanks, > > -John > > > > On Fri, Mar 1, 2019 at 11:33 AM Jonathan Santilli < > > jonathansanti...@gmail.com> wrote: > > > >> BTW, after stopping the app gracefully (Stream#close()), this error > shows > >> up repeatedly: > >> > >> 2019-03-01 17:18:07,819 WARN > >> [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1] > >> internals.ProcessorStateManager (ProcessorStateManager.java:327) - task > >> [0_0] Failed to write offset checkpoint file to > >> [/tmp/kafka-stream/XXX/0_0/.checkpoint] > >> > >> java.io.FileNotFoundException: /tmp/kafka-stream/XXX/0_0/.checkpoint.tmp > >> (No such file or directory) > >> > >> at java.io.FileOutputStream.open0(Native Method) ~[?:1.8.0_191] > >> > >> at java.io.FileOutputStream.open(FileOutputStream.java:270) > ~[?:1.8.0_191] > >> > >> at java.io.FileOutputStream.(FileOutputStream.java:213) > >> ~[?:1.8.0_191] > >> > >> at java.io.FileOutputStream.(FileOutputStream.java:162) > >> ~[?:1.8.0_191] > >> > >> at org.apache.kafka.streams.state.internals.OffsetCheckpoint.write( > >> OffsetCheckpoint.java:79) ~[kafka-streams-2.2.0.jar:?] > >> > >> at > >> > >> > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint( > >> ProcessorStateManager.java:325) [kafka-streams-2.2.0.jar:?] > >> > >> at org.apache.kafka.streams.processor.internals.StreamTask.suspend( > >> StreamTask.java:599) [kafka-streams-2.2.0.jar:?] > >> > >> at org.apache.kafka.streams.processor.internals.StreamTask.close( > >> StreamTask.java:721) [kafka-streams-2.2.0.jar:?] > >> > >> at org.apache.kafka.streams.processor.internals.AssignedTasks.close( > >> AssignedTasks.java:337) [kafka-streams-2.2.0.jar:?] > >> > >> at org.apache.kafka.streams.processor.internals.TaskManager.shutdown( > >> TaskManager.java:267) [kafka-streams-2.2.0.jar:?] > >> > >> at > >> > >> > org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown( > >> StreamThread.java:1209) [kafka-streams-2.2.0.jar:?] > >> > >> at org.apache.kafka.streams.processor.internals.StreamThread.run( > >> StreamThread.java:786) [kafka-streams-2.2.0.jar:?] > >> > >> > >> However, I have checked and the folder created starts with: *1_* > >> > >> ls -lha /tmp/kafka-stream/XXX/1_1 > >> total 8 > >> drwxr-xr-x 5 a b 160B 1 Mar 17:18 . > >> drwxr-xr-x 34 a b 1.1K 1 Mar 17:15 .. > >> -rw-r--r-- 1 a b 2.9K 1 Mar 17:18 .checkpoint > >> -rw-r--r-- 1 a b 0B 1 Mar 16:05 .lock > >> drwxr-xr-x 3 a b96B 1 Mar 16:43 > >> KSTREAM-REDUCE-STATE-STORE-05 > >> > >> > >> > >> Cheers! > >> -- > >> Jonathan > >> > >> > >> > >> On Fri, Mar 1, 2019 at 5:11 PM Jonathan Santilli < > >> jonathansanti...@gmail.com> > >> wrote: > >> > >> > Hello John, hope you are well. > >> > I have tested the version 2.2 release candidate (although I know it > has > >> > been postponed). > >> > I have been following this email thread because I think am > experiencing > >> > the same issue. I have reported in an email to this list and also all > >> the > >> > details are in OS ( > >> > > >> > https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio > >> > ). > >> > > >> > After the test, the result is the same as before (at least for my > case), > >> > already processed records are passed again to the output topic causing > >> the > >> > data duplication: > >> > > >> > ... > >> > 2019-03-01 16:55:23,808 INFO > >> [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1] > >> > internals.StoreChangelogReader (StoreChangelogReader.java:221) - > >> > stream-thread > [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1] > >> No > >> > checkpoint found for task 1_10 state store > >> > KTABLE-SUPPRESS-STATE-STORE-11 changelog > >> > XXX-KTABLE-SUPPRESS-STATE-STORE-11-changelog-10 with EOS > turned > >> on. *Reinitializing > >> > the task and restore its state from the beginning.* > >> > > >> > ... > >> > > >> > > >> >
Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted
Hi Jonathan, Just a quick update: I have not been able to reproduce the duplicates issue with the 2.2 RC, even with a topology very similar to the one you included in your stackoverflow post. I think we should treat this as a new bug. Would you mind opening a new Jira bug ticket with some steps to reproduce the problem, and also exactly the behavior you observe? Thanks, -John On Mon, Mar 4, 2019 at 10:41 PM John Roesler wrote: > Hi Jonathan, > > Sorry to hear that the feature is causing you trouble as well, and that > the 2.2 release candidate didn't seem to fix it. > > I'll try and do a repro based on the code in your SO post tomorrow. > > I don't think it's related to the duplicates, but that shutdown error is > puzzling. Can you print the topology (with topology.describe() ) ? This > will tell us what is in task 1 (i.e., *1_*) of your program. > > Thanks, > -John > > On Fri, Mar 1, 2019 at 11:33 AM Jonathan Santilli < > jonathansanti...@gmail.com> wrote: > >> BTW, after stopping the app gracefully (Stream#close()), this error shows >> up repeatedly: >> >> 2019-03-01 17:18:07,819 WARN >> [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1] >> internals.ProcessorStateManager (ProcessorStateManager.java:327) - task >> [0_0] Failed to write offset checkpoint file to >> [/tmp/kafka-stream/XXX/0_0/.checkpoint] >> >> java.io.FileNotFoundException: /tmp/kafka-stream/XXX/0_0/.checkpoint.tmp >> (No such file or directory) >> >> at java.io.FileOutputStream.open0(Native Method) ~[?:1.8.0_191] >> >> at java.io.FileOutputStream.open(FileOutputStream.java:270) ~[?:1.8.0_191] >> >> at java.io.FileOutputStream.(FileOutputStream.java:213) >> ~[?:1.8.0_191] >> >> at java.io.FileOutputStream.(FileOutputStream.java:162) >> ~[?:1.8.0_191] >> >> at org.apache.kafka.streams.state.internals.OffsetCheckpoint.write( >> OffsetCheckpoint.java:79) ~[kafka-streams-2.2.0.jar:?] >> >> at >> >> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint( >> ProcessorStateManager.java:325) [kafka-streams-2.2.0.jar:?] >> >> at org.apache.kafka.streams.processor.internals.StreamTask.suspend( >> StreamTask.java:599) [kafka-streams-2.2.0.jar:?] >> >> at org.apache.kafka.streams.processor.internals.StreamTask.close( >> StreamTask.java:721) [kafka-streams-2.2.0.jar:?] >> >> at org.apache.kafka.streams.processor.internals.AssignedTasks.close( >> AssignedTasks.java:337) [kafka-streams-2.2.0.jar:?] >> >> at org.apache.kafka.streams.processor.internals.TaskManager.shutdown( >> TaskManager.java:267) [kafka-streams-2.2.0.jar:?] >> >> at >> >> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown( >> StreamThread.java:1209) [kafka-streams-2.2.0.jar:?] >> >> at org.apache.kafka.streams.processor.internals.StreamThread.run( >> StreamThread.java:786) [kafka-streams-2.2.0.jar:?] >> >> >> However, I have checked and the folder created starts with: *1_* >> >> ls -lha /tmp/kafka-stream/XXX/1_1 >> total 8 >> drwxr-xr-x 5 a b 160B 1 Mar 17:18 . >> drwxr-xr-x 34 a b 1.1K 1 Mar 17:15 .. >> -rw-r--r-- 1 a b 2.9K 1 Mar 17:18 .checkpoint >> -rw-r--r-- 1 a b 0B 1 Mar 16:05 .lock >> drwxr-xr-x 3 a b96B 1 Mar 16:43 >> KSTREAM-REDUCE-STATE-STORE-05 >> >> >> >> Cheers! >> -- >> Jonathan >> >> >> >> On Fri, Mar 1, 2019 at 5:11 PM Jonathan Santilli < >> jonathansanti...@gmail.com> >> wrote: >> >> > Hello John, hope you are well. >> > I have tested the version 2.2 release candidate (although I know it has >> > been postponed). >> > I have been following this email thread because I think am experiencing >> > the same issue. I have reported in an email to this list and also all >> the >> > details are in OS ( >> > >> https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio >> > ). >> > >> > After the test, the result is the same as before (at least for my case), >> > already processed records are passed again to the output topic causing >> the >> > data duplication: >> > >> > ... >> > 2019-03-01 16:55:23,808 INFO >> [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1] >> > internals.StoreChangelogReader (StoreChangelogReader.java:221) - >> > stream-thread [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1] >> No >> > checkpoint found for task 1_10 state store >> > KTABLE-SUPPRESS-STATE-STORE-11 changelog >> > XXX-KTABLE-SUPPRESS-STATE-STORE-11-changelog-10 with EOS turned >> on. *Reinitializing >> > the task and restore its state from the beginning.* >> > >> > ... >> > >> > >> > I was hoping for this to be fixed, but is not the case, at least for my >> > case. >> > >> > If you can, please take a look at the question in SO, I was in contact >> > with Matthias about it, he also points me the place where probably the >> > potential but could be present. >> > >> > Please, let me know any thoughts. >> > >> > >> > Cheers! >> > -- >> > Jonathan >> > >> > >> > On Tue, Feb 26, 2019 at 5:23 PM John
Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted
Hi Jonathan, Sorry to hear that the feature is causing you trouble as well, and that the 2.2 release candidate didn't seem to fix it. I'll try and do a repro based on the code in your SO post tomorrow. I don't think it's related to the duplicates, but that shutdown error is puzzling. Can you print the topology (with topology.describe() ) ? This will tell us what is in task 1 (i.e., *1_*) of your program. Thanks, -John On Fri, Mar 1, 2019 at 11:33 AM Jonathan Santilli < jonathansanti...@gmail.com> wrote: > BTW, after stopping the app gracefully (Stream#close()), this error shows > up repeatedly: > > 2019-03-01 17:18:07,819 WARN > [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1] > internals.ProcessorStateManager (ProcessorStateManager.java:327) - task > [0_0] Failed to write offset checkpoint file to > [/tmp/kafka-stream/XXX/0_0/.checkpoint] > > java.io.FileNotFoundException: /tmp/kafka-stream/XXX/0_0/.checkpoint.tmp > (No such file or directory) > > at java.io.FileOutputStream.open0(Native Method) ~[?:1.8.0_191] > > at java.io.FileOutputStream.open(FileOutputStream.java:270) ~[?:1.8.0_191] > > at java.io.FileOutputStream.(FileOutputStream.java:213) > ~[?:1.8.0_191] > > at java.io.FileOutputStream.(FileOutputStream.java:162) > ~[?:1.8.0_191] > > at org.apache.kafka.streams.state.internals.OffsetCheckpoint.write( > OffsetCheckpoint.java:79) ~[kafka-streams-2.2.0.jar:?] > > at > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint( > ProcessorStateManager.java:325) [kafka-streams-2.2.0.jar:?] > > at org.apache.kafka.streams.processor.internals.StreamTask.suspend( > StreamTask.java:599) [kafka-streams-2.2.0.jar:?] > > at org.apache.kafka.streams.processor.internals.StreamTask.close( > StreamTask.java:721) [kafka-streams-2.2.0.jar:?] > > at org.apache.kafka.streams.processor.internals.AssignedTasks.close( > AssignedTasks.java:337) [kafka-streams-2.2.0.jar:?] > > at org.apache.kafka.streams.processor.internals.TaskManager.shutdown( > TaskManager.java:267) [kafka-streams-2.2.0.jar:?] > > at > org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown( > StreamThread.java:1209) [kafka-streams-2.2.0.jar:?] > > at org.apache.kafka.streams.processor.internals.StreamThread.run( > StreamThread.java:786) [kafka-streams-2.2.0.jar:?] > > > However, I have checked and the folder created starts with: *1_* > > ls -lha /tmp/kafka-stream/XXX/1_1 > total 8 > drwxr-xr-x 5 a b 160B 1 Mar 17:18 . > drwxr-xr-x 34 a b 1.1K 1 Mar 17:15 .. > -rw-r--r-- 1 a b 2.9K 1 Mar 17:18 .checkpoint > -rw-r--r-- 1 a b 0B 1 Mar 16:05 .lock > drwxr-xr-x 3 a b96B 1 Mar 16:43 > KSTREAM-REDUCE-STATE-STORE-05 > > > > Cheers! > -- > Jonathan > > > > On Fri, Mar 1, 2019 at 5:11 PM Jonathan Santilli < > jonathansanti...@gmail.com> > wrote: > > > Hello John, hope you are well. > > I have tested the version 2.2 release candidate (although I know it has > > been postponed). > > I have been following this email thread because I think am experiencing > > the same issue. I have reported in an email to this list and also all the > > details are in OS ( > > > https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio > > ). > > > > After the test, the result is the same as before (at least for my case), > > already processed records are passed again to the output topic causing > the > > data duplication: > > > > ... > > 2019-03-01 16:55:23,808 INFO > [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1] > > internals.StoreChangelogReader (StoreChangelogReader.java:221) - > > stream-thread [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1] > No > > checkpoint found for task 1_10 state store > > KTABLE-SUPPRESS-STATE-STORE-11 changelog > > XXX-KTABLE-SUPPRESS-STATE-STORE-11-changelog-10 with EOS turned > on. *Reinitializing > > the task and restore its state from the beginning.* > > > > ... > > > > > > I was hoping for this to be fixed, but is not the case, at least for my > > case. > > > > If you can, please take a look at the question in SO, I was in contact > > with Matthias about it, he also points me the place where probably the > > potential but could be present. > > > > Please, let me know any thoughts. > > > > > > Cheers! > > -- > > Jonathan > > > > > > On Tue, Feb 26, 2019 at 5:23 PM John Roesler wrote: > > > >> Hi again, Peter, > >> > >> Just to close the loop about the bug in Suppress, we did get the > >> (apparent) > >> same report from a few other people: > >> https://issues.apache.org/jira/browse/KAFKA-7895 > >> > >> I also managed to reproduce the duplicate-result behavior, which could > >> cause it to emit both intermediate results and duplicate final results. > >> > >> There's a patch for it in the 2.2 release candidate. Perhaps you can try > >> it > >> out and see if it resolves the issue for you? > >> > >> I'm backporting the fix to 2.1 as well, but I
Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted
BTW, after stopping the app gracefully (Stream#close()), this error shows up repeatedly: 2019-03-01 17:18:07,819 WARN [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1] internals.ProcessorStateManager (ProcessorStateManager.java:327) - task [0_0] Failed to write offset checkpoint file to [/tmp/kafka-stream/XXX/0_0/.checkpoint] java.io.FileNotFoundException: /tmp/kafka-stream/XXX/0_0/.checkpoint.tmp (No such file or directory) at java.io.FileOutputStream.open0(Native Method) ~[?:1.8.0_191] at java.io.FileOutputStream.open(FileOutputStream.java:270) ~[?:1.8.0_191] at java.io.FileOutputStream.(FileOutputStream.java:213) ~[?:1.8.0_191] at java.io.FileOutputStream.(FileOutputStream.java:162) ~[?:1.8.0_191] at org.apache.kafka.streams.state.internals.OffsetCheckpoint.write( OffsetCheckpoint.java:79) ~[kafka-streams-2.2.0.jar:?] at org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint( ProcessorStateManager.java:325) [kafka-streams-2.2.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamTask.suspend( StreamTask.java:599) [kafka-streams-2.2.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamTask.close( StreamTask.java:721) [kafka-streams-2.2.0.jar:?] at org.apache.kafka.streams.processor.internals.AssignedTasks.close( AssignedTasks.java:337) [kafka-streams-2.2.0.jar:?] at org.apache.kafka.streams.processor.internals.TaskManager.shutdown( TaskManager.java:267) [kafka-streams-2.2.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown( StreamThread.java:1209) [kafka-streams-2.2.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.run( StreamThread.java:786) [kafka-streams-2.2.0.jar:?] However, I have checked and the folder created starts with: *1_* ls -lha /tmp/kafka-stream/XXX/1_1 total 8 drwxr-xr-x 5 a b 160B 1 Mar 17:18 . drwxr-xr-x 34 a b 1.1K 1 Mar 17:15 .. -rw-r--r-- 1 a b 2.9K 1 Mar 17:18 .checkpoint -rw-r--r-- 1 a b 0B 1 Mar 16:05 .lock drwxr-xr-x 3 a b96B 1 Mar 16:43 KSTREAM-REDUCE-STATE-STORE-05 Cheers! -- Jonathan On Fri, Mar 1, 2019 at 5:11 PM Jonathan Santilli wrote: > Hello John, hope you are well. > I have tested the version 2.2 release candidate (although I know it has > been postponed). > I have been following this email thread because I think am experiencing > the same issue. I have reported in an email to this list and also all the > details are in OS ( > https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio > ). > > After the test, the result is the same as before (at least for my case), > already processed records are passed again to the output topic causing the > data duplication: > > ... > 2019-03-01 16:55:23,808 INFO > [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1] > internals.StoreChangelogReader (StoreChangelogReader.java:221) - > stream-thread [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1] No > checkpoint found for task 1_10 state store > KTABLE-SUPPRESS-STATE-STORE-11 changelog > XXX-KTABLE-SUPPRESS-STATE-STORE-11-changelog-10 with EOS turned on. > *Reinitializing > the task and restore its state from the beginning.* > > ... > > > I was hoping for this to be fixed, but is not the case, at least for my > case. > > If you can, please take a look at the question in SO, I was in contact > with Matthias about it, he also points me the place where probably the > potential but could be present. > > Please, let me know any thoughts. > > > Cheers! > -- > Jonathan > > > On Tue, Feb 26, 2019 at 5:23 PM John Roesler wrote: > >> Hi again, Peter, >> >> Just to close the loop about the bug in Suppress, we did get the >> (apparent) >> same report from a few other people: >> https://issues.apache.org/jira/browse/KAFKA-7895 >> >> I also managed to reproduce the duplicate-result behavior, which could >> cause it to emit both intermediate results and duplicate final results. >> >> There's a patch for it in the 2.2 release candidate. Perhaps you can try >> it >> out and see if it resolves the issue for you? >> >> I'm backporting the fix to 2.1 as well, but I unfortunately missed the >> last >> 2.1 bugfix release. >> >> Thanks, >> -John >> >> On Fri, Jan 25, 2019 at 10:23 AM John Roesler wrote: >> >> > Hi Peter, >> > >> > Thanks for the replies. >> > >> > Regarding transactions: >> > Yes, actually, with EOS enabled, the changelog and the output topics are >> > all produced with the same transactional producer, within the same >> > transactions. So it should already be atomic. >> > >> > Regarding restore: >> > Streams doesn't put the store into service until the restore is >> completed, >> > so it should be guaranteed not to happen. But there's of course no >> > guarantee that I didn't mess something up. I'll take a hard look at it. >> > >> > Regarding restoration and offsets: >> > Your guess is correct: Streams tracks the latest
Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted
Hello John, hope you are well. I have tested the version 2.2 release candidate (although I know it has been postponed). I have been following this email thread because I think am experiencing the same issue. I have reported in an email to this list and also all the details are in OS ( https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio ). After the test, the result is the same as before (at least for my case), already processed records are passed again to the output topic causing the data duplication: ... 2019-03-01 16:55:23,808 INFO [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1] internals.StoreChangelogReader (StoreChangelogReader.java:221) - stream-thread [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1] No checkpoint found for task 1_10 state store KTABLE-SUPPRESS-STATE-STORE-11 changelog XXX-KTABLE-SUPPRESS-STATE-STORE-11-changelog-10 with EOS turned on. *Reinitializing the task and restore its state from the beginning.* ... I was hoping for this to be fixed, but is not the case, at least for my case. If you can, please take a look at the question in SO, I was in contact with Matthias about it, he also points me the place where probably the potential but could be present. Please, let me know any thoughts. Cheers! -- Jonathan On Tue, Feb 26, 2019 at 5:23 PM John Roesler wrote: > Hi again, Peter, > > Just to close the loop about the bug in Suppress, we did get the (apparent) > same report from a few other people: > https://issues.apache.org/jira/browse/KAFKA-7895 > > I also managed to reproduce the duplicate-result behavior, which could > cause it to emit both intermediate results and duplicate final results. > > There's a patch for it in the 2.2 release candidate. Perhaps you can try it > out and see if it resolves the issue for you? > > I'm backporting the fix to 2.1 as well, but I unfortunately missed the last > 2.1 bugfix release. > > Thanks, > -John > > On Fri, Jan 25, 2019 at 10:23 AM John Roesler wrote: > > > Hi Peter, > > > > Thanks for the replies. > > > > Regarding transactions: > > Yes, actually, with EOS enabled, the changelog and the output topics are > > all produced with the same transactional producer, within the same > > transactions. So it should already be atomic. > > > > Regarding restore: > > Streams doesn't put the store into service until the restore is > completed, > > so it should be guaranteed not to happen. But there's of course no > > guarantee that I didn't mess something up. I'll take a hard look at it. > > > > Regarding restoration and offsets: > > Your guess is correct: Streams tracks the latest stored offset outside of > > the store implementation itself, specifically by writing a file (called a > > Checkpoint File) in the state directory. If the file is there, it reads > > that offset and restores from that point. If the file is missing, it > > restores from the beginning of the stream. So it should "just work" for > > you. Just for completeness, there have been several edge cases discovered > > where this mechanism isn't completely safe, so in the case of EOS, I > > believe we actually disregard that checkpoint file and the prior state > and > > always rebuild from the earliest offset in the changelog. > > > > Personally, I would like to see us provide the ability to store the > > checkpoint inside the state store, so that checkpoint updates are > > linearized correctly w.r.t. data updates, but I actually haven't > mentioned > > this thought to anyone until now ;) > > > > Finally, regarding your prior email: > > Yes, I was thinking that the "wrong" output values might be part of > > rolled-back transactions and therefore enabling read-committed mode on > the > > consumer might tell a different story that what you've seen to date. > > > > I'm honestly still baffled about those intermediate results that are > > sneaking out. I wonder if it's something specific to your data stream, > like > > maybe if there is maybe an edge case when two records have exactly the > same > > timestamp? I'll have to stare at the code some more... > > > > Regardless, in order to reap the benefits of running the app with EOS, > you > > really have to also set your consumers to read_committed. Otherwise, > you'll > > be seeing output data from aborted (aka rolled-back) transactions, and > you > > miss the intended "exactly once" guarantee. > > > > Thanks, > > -John > > > > On Fri, Jan 25, 2019 at 1:51 AM Peter Levart > > wrote: > > > >> Hi John, > >> > >> Haven't been able to reinstate the demo yet, but I have been re-reading > >> the following scenario of yours > >> > >> On 1/24/19 11:48 PM, Peter Levart wrote: > >> > Hi John, > >> > > >> > On 1/24/19 3:18 PM, John Roesler wrote: > >> > > >> >> > >> >> The reason is that, upon restart, the suppression buffer can only > >> >> "remember" what got sent & committed to its changelog topic before. > >> >> > >> >> The scenario
Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted
Hi again, Peter, Just to close the loop about the bug in Suppress, we did get the (apparent) same report from a few other people: https://issues.apache.org/jira/browse/KAFKA-7895 I also managed to reproduce the duplicate-result behavior, which could cause it to emit both intermediate results and duplicate final results. There's a patch for it in the 2.2 release candidate. Perhaps you can try it out and see if it resolves the issue for you? I'm backporting the fix to 2.1 as well, but I unfortunately missed the last 2.1 bugfix release. Thanks, -John On Fri, Jan 25, 2019 at 10:23 AM John Roesler wrote: > Hi Peter, > > Thanks for the replies. > > Regarding transactions: > Yes, actually, with EOS enabled, the changelog and the output topics are > all produced with the same transactional producer, within the same > transactions. So it should already be atomic. > > Regarding restore: > Streams doesn't put the store into service until the restore is completed, > so it should be guaranteed not to happen. But there's of course no > guarantee that I didn't mess something up. I'll take a hard look at it. > > Regarding restoration and offsets: > Your guess is correct: Streams tracks the latest stored offset outside of > the store implementation itself, specifically by writing a file (called a > Checkpoint File) in the state directory. If the file is there, it reads > that offset and restores from that point. If the file is missing, it > restores from the beginning of the stream. So it should "just work" for > you. Just for completeness, there have been several edge cases discovered > where this mechanism isn't completely safe, so in the case of EOS, I > believe we actually disregard that checkpoint file and the prior state and > always rebuild from the earliest offset in the changelog. > > Personally, I would like to see us provide the ability to store the > checkpoint inside the state store, so that checkpoint updates are > linearized correctly w.r.t. data updates, but I actually haven't mentioned > this thought to anyone until now ;) > > Finally, regarding your prior email: > Yes, I was thinking that the "wrong" output values might be part of > rolled-back transactions and therefore enabling read-committed mode on the > consumer might tell a different story that what you've seen to date. > > I'm honestly still baffled about those intermediate results that are > sneaking out. I wonder if it's something specific to your data stream, like > maybe if there is maybe an edge case when two records have exactly the same > timestamp? I'll have to stare at the code some more... > > Regardless, in order to reap the benefits of running the app with EOS, you > really have to also set your consumers to read_committed. Otherwise, you'll > be seeing output data from aborted (aka rolled-back) transactions, and you > miss the intended "exactly once" guarantee. > > Thanks, > -John > > On Fri, Jan 25, 2019 at 1:51 AM Peter Levart > wrote: > >> Hi John, >> >> Haven't been able to reinstate the demo yet, but I have been re-reading >> the following scenario of yours >> >> On 1/24/19 11:48 PM, Peter Levart wrote: >> > Hi John, >> > >> > On 1/24/19 3:18 PM, John Roesler wrote: >> > >> >> >> >> The reason is that, upon restart, the suppression buffer can only >> >> "remember" what got sent & committed to its changelog topic before. >> >> >> >> The scenario I have in mind is: >> >> >> >> ... >> >> * buffer state X >> >> ... >> >> * flush state X to buffer changelog >> >> ... >> >> * commit transaction T0; start new transaction T1 >> >> ... >> >> * emit final result X (in uncommitted transaction T1) >> >> ... >> >> * crash before flushing to the changelog the fact that state X was >> >> emitted. >> >> Also, transaction T1 gets aborted, since we crash before committing. >> >> ... >> >> * restart, restoring state X again from the changelog (because the emit >> >> didn't get committed) >> >> * start transaction T2 >> >> * emit final result X again (in uncommitted transaction T2) >> >> ... >> >> * commit transaction T2 >> >> ... >> >> >> >> So, the result gets emitted twice, but the first time is in an aborted >> >> transaction. This leads me to another clarifying question: >> >> >> >> Based on your first message, it seems like the duplicates you observe >> >> are >> >> in the output topic. When you read the topic, do you configure your >> >> consumer with "read committed" mode? If not, you'll see "results" from >> >> uncommitted transactions, which could explain the duplicates. >> >> ...and I was thinking that perhaps the right solution to the suppression >> problem would be to use transactional producers for the resulting output >> topic AND the store change-log. Is this possible? Does the compaction of >> the log on the brokers work for transactional producers as expected? In >> that case, the sending of final result and the marking of that fact in >> the store change log would together be an atomic operation. >> That said, I think
Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted
Hi Peter, Thanks for the replies. Regarding transactions: Yes, actually, with EOS enabled, the changelog and the output topics are all produced with the same transactional producer, within the same transactions. So it should already be atomic. Regarding restore: Streams doesn't put the store into service until the restore is completed, so it should be guaranteed not to happen. But there's of course no guarantee that I didn't mess something up. I'll take a hard look at it. Regarding restoration and offsets: Your guess is correct: Streams tracks the latest stored offset outside of the store implementation itself, specifically by writing a file (called a Checkpoint File) in the state directory. If the file is there, it reads that offset and restores from that point. If the file is missing, it restores from the beginning of the stream. So it should "just work" for you. Just for completeness, there have been several edge cases discovered where this mechanism isn't completely safe, so in the case of EOS, I believe we actually disregard that checkpoint file and the prior state and always rebuild from the earliest offset in the changelog. Personally, I would like to see us provide the ability to store the checkpoint inside the state store, so that checkpoint updates are linearized correctly w.r.t. data updates, but I actually haven't mentioned this thought to anyone until now ;) Finally, regarding your prior email: Yes, I was thinking that the "wrong" output values might be part of rolled-back transactions and therefore enabling read-committed mode on the consumer might tell a different story that what you've seen to date. I'm honestly still baffled about those intermediate results that are sneaking out. I wonder if it's something specific to your data stream, like maybe if there is maybe an edge case when two records have exactly the same timestamp? I'll have to stare at the code some more... Regardless, in order to reap the benefits of running the app with EOS, you really have to also set your consumers to read_committed. Otherwise, you'll be seeing output data from aborted (aka rolled-back) transactions, and you miss the intended "exactly once" guarantee. Thanks, -John On Fri, Jan 25, 2019 at 1:51 AM Peter Levart wrote: > Hi John, > > Haven't been able to reinstate the demo yet, but I have been re-reading > the following scenario of yours > > On 1/24/19 11:48 PM, Peter Levart wrote: > > Hi John, > > > > On 1/24/19 3:18 PM, John Roesler wrote: > > > >> > >> The reason is that, upon restart, the suppression buffer can only > >> "remember" what got sent & committed to its changelog topic before. > >> > >> The scenario I have in mind is: > >> > >> ... > >> * buffer state X > >> ... > >> * flush state X to buffer changelog > >> ... > >> * commit transaction T0; start new transaction T1 > >> ... > >> * emit final result X (in uncommitted transaction T1) > >> ... > >> * crash before flushing to the changelog the fact that state X was > >> emitted. > >> Also, transaction T1 gets aborted, since we crash before committing. > >> ... > >> * restart, restoring state X again from the changelog (because the emit > >> didn't get committed) > >> * start transaction T2 > >> * emit final result X again (in uncommitted transaction T2) > >> ... > >> * commit transaction T2 > >> ... > >> > >> So, the result gets emitted twice, but the first time is in an aborted > >> transaction. This leads me to another clarifying question: > >> > >> Based on your first message, it seems like the duplicates you observe > >> are > >> in the output topic. When you read the topic, do you configure your > >> consumer with "read committed" mode? If not, you'll see "results" from > >> uncommitted transactions, which could explain the duplicates. > > ...and I was thinking that perhaps the right solution to the suppression > problem would be to use transactional producers for the resulting output > topic AND the store change-log. Is this possible? Does the compaction of > the log on the brokers work for transactional producers as expected? In > that case, the sending of final result and the marking of that fact in > the store change log would together be an atomic operation. > That said, I think there's another problem with suppression which looks > like the supression processor is already processing the input while the > state store has not been fully restored yet or something related... Is > this guaranteed not to happen? > > And now something unrelated I wanted to ask... > > I'm trying to create my own custom state store. From the API I can see > it is pretty straightforward. One thing that I don't quite understand is > how Kafka Streams know whether to replay the whole change log after the > store registers itself or just a part of it and which part (from which > offset per partition). There doesn't seem to be any API point through > which the store could communicate this information back to Kafka > Streams. Is such bookkeeping
Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted
Hi John, Haven't been able to reinstate the demo yet, but I have been re-reading the following scenario of yours On 1/24/19 11:48 PM, Peter Levart wrote: Hi John, On 1/24/19 3:18 PM, John Roesler wrote: The reason is that, upon restart, the suppression buffer can only "remember" what got sent & committed to its changelog topic before. The scenario I have in mind is: ... * buffer state X ... * flush state X to buffer changelog ... * commit transaction T0; start new transaction T1 ... * emit final result X (in uncommitted transaction T1) ... * crash before flushing to the changelog the fact that state X was emitted. Also, transaction T1 gets aborted, since we crash before committing. ... * restart, restoring state X again from the changelog (because the emit didn't get committed) * start transaction T2 * emit final result X again (in uncommitted transaction T2) ... * commit transaction T2 ... So, the result gets emitted twice, but the first time is in an aborted transaction. This leads me to another clarifying question: Based on your first message, it seems like the duplicates you observe are in the output topic. When you read the topic, do you configure your consumer with "read committed" mode? If not, you'll see "results" from uncommitted transactions, which could explain the duplicates. ...and I was thinking that perhaps the right solution to the suppression problem would be to use transactional producers for the resulting output topic AND the store change-log. Is this possible? Does the compaction of the log on the brokers work for transactional producers as expected? In that case, the sending of final result and the marking of that fact in the store change log would together be an atomic operation. That said, I think there's another problem with suppression which looks like the supression processor is already processing the input while the state store has not been fully restored yet or something related... Is this guaranteed not to happen? And now something unrelated I wanted to ask... I'm trying to create my own custom state store. From the API I can see it is pretty straightforward. One thing that I don't quite understand is how Kafka Streams know whether to replay the whole change log after the store registers itself or just a part of it and which part (from which offset per partition). There doesn't seem to be any API point through which the store could communicate this information back to Kafka Streams. Is such bookkeeping performed outside the store? Does Kafka Streams first invoke flush() on the store and then notes down the offsets from the change log producer somewhere? So next time the store is brought up, the log is only replayed from last noted down offset? So it can happen that the store gets some log entries that have already been incorporated in it (from the point of one flush before) but never misses any... In any case there has to be an indication somewhere that the store didn't survive and has to be rebuilt from scratch. How do Kafka Streams detect that situation? By placing some marker file into the directory reserved for store's local storage? Regards, Peter
Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted
Hi John, On 1/24/19 3:18 PM, John Roesler wrote: Hi Peter, Thanks for the clarification. When you hit the "stop" button, AFAIK it does send a SIGTERM, but I don't think that Streams automatically registers a shutdown hook. In our examples and demos, we register a shutdown hook "outside" of streams (right next to the code that calls start() ). Unless I missed something, a SIGTERM would still cause Streams to exit abruptly, skipping flush and commit. This can cause apparent duplicates *if you're not using EOS or if you're reading uncommitted transactions*. The fact is that Spring which I use to instantiate the KafkaStreams object does that: @Bean(initMethod = "start", destroyMethod = "close") public KafkaStreams processorStreams(... ..so when JVM gets SIGTERM, the shutdown hook that Spring installs shuts down the ApplicationContext which calls all "destroyMethod"(s) on registered Bean(s)... And the duplicates are less apparent but still occur even in EOS mode... But they are not actual duplicates. They are duplicate(s) only by windowed keys, the values are different... The reason is that, upon restart, the suppression buffer can only "remember" what got sent & committed to its changelog topic before. The scenario I have in mind is: ... * buffer state X ... * flush state X to buffer changelog ... * commit transaction T0; start new transaction T1 ... * emit final result X (in uncommitted transaction T1) ... * crash before flushing to the changelog the fact that state X was emitted. Also, transaction T1 gets aborted, since we crash before committing. ... * restart, restoring state X again from the changelog (because the emit didn't get committed) * start transaction T2 * emit final result X again (in uncommitted transaction T2) ... * commit transaction T2 ... So, the result gets emitted twice, but the first time is in an aborted transaction. This leads me to another clarifying question: Based on your first message, it seems like the duplicates you observe are in the output topic. When you read the topic, do you configure your consumer with "read committed" mode? If not, you'll see "results" from uncommitted transactions, which could explain the duplicates. So when EOS is enabled, the output topics are used in transactional manner. The consumer of such topic should enable read_commited semantics then... That would do if my problem was about seeing duplicates of final windowing results. That is not my problem. My problem is that upon restart of processor, I see some non-final window aggregations, followed by final aggregations for the same windowed key. That's harder to tolerate in an application. If it was just duplicates of the "correct" aggregation I could ignore the 2nd and subsequent message for the same windowed key, but if I 1st get a non-final aggregation, I can not simply ignore the 2nd occurence of the same windowed key. I must cope with "replacing the previous aggregation with new version of it" in the app. Meaning, that suppression of non-final results does not buy me anything as it is not guaranteeing that. Is it possible that non-final windowed aggregations are emitted in some scenario, but then such transaction is rolled-back and I would not see the non-fnal aggregations if I enabled read commited isolation on consumer? I think I'll have to reinstate the demo and try that... Stay tuned. Regards, Peter Likewise, if you were to attach a callback, like "foreach" downstream of the suppression, you would see duplicates in the case of a crash. Callbacks are a general "hole" in EOS, which I have some ideas to close, but that's a separate topic. There may still be something else going on, but I'm trying to start with the simpler explanations. Thanks again, -John Thanks, -John On Wed, Jan 23, 2019 at 5:11 AM Peter Levart wrote: Hi John, Sorry I haven't had time to prepare the minimal reproducer yet. I still have plans to do it though... On 1/22/19 8:02 PM, John Roesler wrote: Hi Peter, Just to follow up on the actual bug, can you confirm whether: * when you say "restart", do you mean orderly shutdown and restart, or crash and restart? I start it as SpringBoot application from IDEA and then stop it with the red square button. It does initiate the shutdown sequence before exiting... So I think it is by SIGTERM which initiates JVM shutdown hook(s). * have you tried this with EOS enabled? I can imagine some ways that there could be duplicates, but they should be impossible with EOS enabled. Yes, I have EOS enabled. Thanks for your help, -John Regards, Peter On Mon, Jan 14, 2019 at 1:20 PM John Roesler wrote: Hi Peter, I see your train of thought, but the actual implementation of the window store is structured differently from your mental model. Unlike Key/Value stores, we know that the records in a window store will "expire" on a regular schedule, and also that every single record will eventually expire. With this in mind, we
Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted
Hi Peter, Thanks for the clarification. When you hit the "stop" button, AFAIK it does send a SIGTERM, but I don't think that Streams automatically registers a shutdown hook. In our examples and demos, we register a shutdown hook "outside" of streams (right next to the code that calls start() ). Unless I missed something, a SIGTERM would still cause Streams to exit abruptly, skipping flush and commit. This can cause apparent duplicates *if you're not using EOS or if you're reading uncommitted transactions*. The reason is that, upon restart, the suppression buffer can only "remember" what got sent & committed to its changelog topic before. The scenario I have in mind is: ... * buffer state X ... * flush state X to buffer changelog ... * commit transaction T0; start new transaction T1 ... * emit final result X (in uncommitted transaction T1) ... * crash before flushing to the changelog the fact that state X was emitted. Also, transaction T1 gets aborted, since we crash before committing. ... * restart, restoring state X again from the changelog (because the emit didn't get committed) * start transaction T2 * emit final result X again (in uncommitted transaction T2) ... * commit transaction T2 ... So, the result gets emitted twice, but the first time is in an aborted transaction. This leads me to another clarifying question: Based on your first message, it seems like the duplicates you observe are in the output topic. When you read the topic, do you configure your consumer with "read committed" mode? If not, you'll see "results" from uncommitted transactions, which could explain the duplicates. Likewise, if you were to attach a callback, like "foreach" downstream of the suppression, you would see duplicates in the case of a crash. Callbacks are a general "hole" in EOS, which I have some ideas to close, but that's a separate topic. There may still be something else going on, but I'm trying to start with the simpler explanations. Thanks again, -John Thanks, -John On Wed, Jan 23, 2019 at 5:11 AM Peter Levart wrote: > Hi John, > > Sorry I haven't had time to prepare the minimal reproducer yet. I still > have plans to do it though... > > On 1/22/19 8:02 PM, John Roesler wrote: > > Hi Peter, > > > > Just to follow up on the actual bug, can you confirm whether: > > * when you say "restart", do you mean orderly shutdown and restart, or > > crash and restart? > > I start it as SpringBoot application from IDEA and then stop it with the > red square button. It does initiate the shutdown sequence before > exiting... So I think it is by SIGTERM which initiates JVM shutdown > hook(s). > > > * have you tried this with EOS enabled? I can imagine some ways that > there > > could be duplicates, but they should be impossible with EOS enabled. > > Yes, I have EOS enabled. > > > > > Thanks for your help, > > -John > > Regards, Peter > > > > > On Mon, Jan 14, 2019 at 1:20 PM John Roesler wrote: > > > >> Hi Peter, > >> > >> I see your train of thought, but the actual implementation of the > >> window store is structured differently from your mental model. > >> Unlike Key/Value stores, we know that the records in a window > >> store will "expire" on a regular schedule, and also that every single > >> record will eventually expire. With this in mind, we have implemented > >> an optimization to avoid a lot of compaction overhead in RocksDB, as > >> well as saving on range scans. > >> > >> Instead of storing everything in one database, we open several > >> databases and bucket windows into them. Then, when windows > >> expire, we just ignore the records (i.e., the API makes them > unreachable, > >> but we don't actually delete them). Once all the windows in a database > >> are expired, we just close and delete the whole database. Then, we open > >> a new one for new windows. If you look in the code, these databases are > >> called "segments". > >> > >> Thus, I don't think that you should attempt to use the built-in window > >> stores > >> as you described. Instead, it should be straightforward to implement > your > >> own StateStore with a layout that's more favorable to your desired > >> behavior. > >> > >> You should also be able to set up the change log the way you need as > well. > >> Explicitly removed entities also would get removed from the log as > well, if > >> it's a compacted log. > >> > >> Actually, what you're describing is *very* similar to the implementation > >> for suppress. I might actually suggest that you just copy the > suppression > >> implementation and adapt it to your needs, or at the very least, study > >> how it works. In doing so, you might actually discover the cause of the > >> bug yourself! > >> > >> I hope this helps, and thanks for your help, > >> -John > >> > >> > >> On Sat, Jan 12, 2019 at 5:45 AM Peter Levart > >> wrote: > >> > >>> Hi Jonh, > >>> > >>> Thank you very much for explaining how WindowStore works. I have some > >>> more questions... > >>> > >>> On 1/10/19 5:33 PM, John
Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted
Hi John, Sorry I haven't had time to prepare the minimal reproducer yet. I still have plans to do it though... On 1/22/19 8:02 PM, John Roesler wrote: Hi Peter, Just to follow up on the actual bug, can you confirm whether: * when you say "restart", do you mean orderly shutdown and restart, or crash and restart? I start it as SpringBoot application from IDEA and then stop it with the red square button. It does initiate the shutdown sequence before exiting... So I think it is by SIGTERM which initiates JVM shutdown hook(s). * have you tried this with EOS enabled? I can imagine some ways that there could be duplicates, but they should be impossible with EOS enabled. Yes, I have EOS enabled. Thanks for your help, -John Regards, Peter On Mon, Jan 14, 2019 at 1:20 PM John Roesler wrote: Hi Peter, I see your train of thought, but the actual implementation of the window store is structured differently from your mental model. Unlike Key/Value stores, we know that the records in a window store will "expire" on a regular schedule, and also that every single record will eventually expire. With this in mind, we have implemented an optimization to avoid a lot of compaction overhead in RocksDB, as well as saving on range scans. Instead of storing everything in one database, we open several databases and bucket windows into them. Then, when windows expire, we just ignore the records (i.e., the API makes them unreachable, but we don't actually delete them). Once all the windows in a database are expired, we just close and delete the whole database. Then, we open a new one for new windows. If you look in the code, these databases are called "segments". Thus, I don't think that you should attempt to use the built-in window stores as you described. Instead, it should be straightforward to implement your own StateStore with a layout that's more favorable to your desired behavior. You should also be able to set up the change log the way you need as well. Explicitly removed entities also would get removed from the log as well, if it's a compacted log. Actually, what you're describing is *very* similar to the implementation for suppress. I might actually suggest that you just copy the suppression implementation and adapt it to your needs, or at the very least, study how it works. In doing so, you might actually discover the cause of the bug yourself! I hope this helps, and thanks for your help, -John On Sat, Jan 12, 2019 at 5:45 AM Peter Levart wrote: Hi Jonh, Thank you very much for explaining how WindowStore works. I have some more questions... On 1/10/19 5:33 PM, John Roesler wrote: Hi Peter, Regarding retention, I was not referring to log retention, but to the window store retention. Since a new window is created every second (for example), there are in principle an unbounded number of windows (the longer the application runs, the more windows there are, with no end). However, we obviously can't store an infinite amount of data, so the window definition includes a retention period. By default, this is 24 hours. After the retention period elapses, all of the data for the window is purged to make room for new windows. Right. Would the following work for example: - configure retention of WindowStore to be "infinite" - explicitly remove records from the store when windows are flushed out - configure WindowStore log topic for compacting Something like the following: Stores .windowStoreBuilder( Stores.persistentWindowStore( storeName, Duration.of(1000L, ChronoUnit.YEARS), // retentionPeriod Duration.ofSeconds(10), // windowSize false ), keySerde, valSerde ) .withCachingEnabled() .withLoggingEnabled( Map.of( TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT ) ); Would in above scenario: - the on-disk WindowStore be kept bounded (there could be some very old entries in it but majority will be new - depending on the activity of particular input keys) - the log topic be kept bounded (explicitly removed entries would be removed from compacted log too) I'm moving away from DSL partly because I have some problems with suppression (which I hope we'll be able to fix) and partly because the DSL can't give me the complicated semantics that I need for the application at hand. I tried to capture what I need in a custom Transformer here: https://gist.github.com/plevart/d3f70bee7346f72161ef633aa60dc94f Your knowledge of how WindowStore works would greatly help me decide if this is a workable idea. So what I meant was that if you buffer some key "A" in window (Monday 09:00:00) and then get no further activity for A for over 24 hours, then when you do get that next event for A, say at (Tuesday
Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted
Hi Peter, Just to follow up on the actual bug, can you confirm whether: * when you say "restart", do you mean orderly shutdown and restart, or crash and restart? * have you tried this with EOS enabled? I can imagine some ways that there could be duplicates, but they should be impossible with EOS enabled. Thanks for your help, -John On Mon, Jan 14, 2019 at 1:20 PM John Roesler wrote: > Hi Peter, > > I see your train of thought, but the actual implementation of the > window store is structured differently from your mental model. > Unlike Key/Value stores, we know that the records in a window > store will "expire" on a regular schedule, and also that every single > record will eventually expire. With this in mind, we have implemented > an optimization to avoid a lot of compaction overhead in RocksDB, as > well as saving on range scans. > > Instead of storing everything in one database, we open several > databases and bucket windows into them. Then, when windows > expire, we just ignore the records (i.e., the API makes them unreachable, > but we don't actually delete them). Once all the windows in a database > are expired, we just close and delete the whole database. Then, we open > a new one for new windows. If you look in the code, these databases are > called "segments". > > Thus, I don't think that you should attempt to use the built-in window > stores > as you described. Instead, it should be straightforward to implement your > own StateStore with a layout that's more favorable to your desired > behavior. > > You should also be able to set up the change log the way you need as well. > Explicitly removed entities also would get removed from the log as well, if > it's a compacted log. > > Actually, what you're describing is *very* similar to the implementation > for suppress. I might actually suggest that you just copy the suppression > implementation and adapt it to your needs, or at the very least, study > how it works. In doing so, you might actually discover the cause of the > bug yourself! > > I hope this helps, and thanks for your help, > -John > > > On Sat, Jan 12, 2019 at 5:45 AM Peter Levart > wrote: > >> Hi Jonh, >> >> Thank you very much for explaining how WindowStore works. I have some >> more questions... >> >> On 1/10/19 5:33 PM, John Roesler wrote: >> > Hi Peter, >> > >> > Regarding retention, I was not referring to log retention, but to the >> > window store retention. >> > Since a new window is created every second (for example), there are in >> > principle an unbounded >> > number of windows (the longer the application runs, the more windows >> there >> > are, with no end). >> > However, we obviously can't store an infinite amount of data, so the >> window >> > definition includes >> > a retention period. By default, this is 24 hours. After the retention >> > period elapses, all of the data >> > for the window is purged to make room for new windows. >> >> Right. Would the following work for example: >> >> - configure retention of WindowStore to be "infinite" >> - explicitly remove records from the store when windows are flushed out >> - configure WindowStore log topic for compacting >> >> Something like the following: >> >> Stores >> .windowStoreBuilder( >> Stores.persistentWindowStore( >> storeName, >> Duration.of(1000L, ChronoUnit.YEARS), // >> retentionPeriod >> Duration.ofSeconds(10), // windowSize >> false >> ), >> keySerde, valSerde >> ) >> .withCachingEnabled() >> .withLoggingEnabled( >> Map.of( >> TopicConfig.CLEANUP_POLICY_CONFIG, >> TopicConfig.CLEANUP_POLICY_COMPACT >> ) >> ); >> >> Would in above scenario: >> >> - the on-disk WindowStore be kept bounded (there could be some very old >> entries in it but majority will be new - depending on the activity of >> particular input keys) >> - the log topic be kept bounded (explicitly removed entries would be >> removed from compacted log too) >> >> I'm moving away from DSL partly because I have some problems with >> suppression (which I hope we'll be able to fix) and partly because the >> DSL can't give me the complicated semantics that I need for the >> application at hand. I tried to capture what I need in a custom >> Transformer here: >> >> https://gist.github.com/plevart/d3f70bee7346f72161ef633aa60dc94f >> >> Your knowledge of how WindowStore works would greatly help me decide if >> this is a workable idea. >> >> > >> > So what I meant was that if you buffer some key "A" in window (Monday >> > 09:00:00) and then get >> > no further activity for A for over 24 hours, then when you do get that >> next >> > event for A, say at >> > (Tuesday 11:00:00), you'd do the scan but find nothing, since your >> buffered >> > state would already >> > have been purged from the
Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted
Hi Peter, I see your train of thought, but the actual implementation of the window store is structured differently from your mental model. Unlike Key/Value stores, we know that the records in a window store will "expire" on a regular schedule, and also that every single record will eventually expire. With this in mind, we have implemented an optimization to avoid a lot of compaction overhead in RocksDB, as well as saving on range scans. Instead of storing everything in one database, we open several databases and bucket windows into them. Then, when windows expire, we just ignore the records (i.e., the API makes them unreachable, but we don't actually delete them). Once all the windows in a database are expired, we just close and delete the whole database. Then, we open a new one for new windows. If you look in the code, these databases are called "segments". Thus, I don't think that you should attempt to use the built-in window stores as you described. Instead, it should be straightforward to implement your own StateStore with a layout that's more favorable to your desired behavior. You should also be able to set up the change log the way you need as well. Explicitly removed entities also would get removed from the log as well, if it's a compacted log. Actually, what you're describing is *very* similar to the implementation for suppress. I might actually suggest that you just copy the suppression implementation and adapt it to your needs, or at the very least, study how it works. In doing so, you might actually discover the cause of the bug yourself! I hope this helps, and thanks for your help, -John On Sat, Jan 12, 2019 at 5:45 AM Peter Levart wrote: > Hi Jonh, > > Thank you very much for explaining how WindowStore works. I have some > more questions... > > On 1/10/19 5:33 PM, John Roesler wrote: > > Hi Peter, > > > > Regarding retention, I was not referring to log retention, but to the > > window store retention. > > Since a new window is created every second (for example), there are in > > principle an unbounded > > number of windows (the longer the application runs, the more windows > there > > are, with no end). > > However, we obviously can't store an infinite amount of data, so the > window > > definition includes > > a retention period. By default, this is 24 hours. After the retention > > period elapses, all of the data > > for the window is purged to make room for new windows. > > Right. Would the following work for example: > > - configure retention of WindowStore to be "infinite" > - explicitly remove records from the store when windows are flushed out > - configure WindowStore log topic for compacting > > Something like the following: > > Stores > .windowStoreBuilder( > Stores.persistentWindowStore( > storeName, > Duration.of(1000L, ChronoUnit.YEARS), // > retentionPeriod > Duration.ofSeconds(10), // windowSize > false > ), > keySerde, valSerde > ) > .withCachingEnabled() > .withLoggingEnabled( > Map.of( > TopicConfig.CLEANUP_POLICY_CONFIG, > TopicConfig.CLEANUP_POLICY_COMPACT > ) > ); > > Would in above scenario: > > - the on-disk WindowStore be kept bounded (there could be some very old > entries in it but majority will be new - depending on the activity of > particular input keys) > - the log topic be kept bounded (explicitly removed entries would be > removed from compacted log too) > > I'm moving away from DSL partly because I have some problems with > suppression (which I hope we'll be able to fix) and partly because the > DSL can't give me the complicated semantics that I need for the > application at hand. I tried to capture what I need in a custom > Transformer here: > > https://gist.github.com/plevart/d3f70bee7346f72161ef633aa60dc94f > > Your knowledge of how WindowStore works would greatly help me decide if > this is a workable idea. > > > > > So what I meant was that if you buffer some key "A" in window (Monday > > 09:00:00) and then get > > no further activity for A for over 24 hours, then when you do get that > next > > event for A, say at > > (Tuesday 11:00:00), you'd do the scan but find nothing, since your > buffered > > state would already > > have been purged from the store. > > Right. That would be the case when WindowStore was configured with > default retention of 24 hours. A quick question: What does window size > configuration for WindowStore (see above) do? Does it have to be > synchronized with the size of windows stored in it? > > > > > The way I avoided this problem for Suppression was to organize the data > by > > timestamp instead > > of by key, so on *every* update I can search for all the keys that are > old > > enough and emit them. > > I also don't use a window store, so I don't have to
Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted
Hi Peter, Regarding retention, I was not referring to log retention, but to the window store retention. Since a new window is created every second (for example), there are in principle an unbounded number of windows (the longer the application runs, the more windows there are, with no end). However, we obviously can't store an infinite amount of data, so the window definition includes a retention period. By default, this is 24 hours. After the retention period elapses, all of the data for the window is purged to make room for new windows. So what I meant was that if you buffer some key "A" in window (Monday 09:00:00) and then get no further activity for A for over 24 hours, then when you do get that next event for A, say at (Tuesday 11:00:00), you'd do the scan but find nothing, since your buffered state would already have been purged from the store. The way I avoided this problem for Suppression was to organize the data by timestamp instead of by key, so on *every* update I can search for all the keys that are old enough and emit them. I also don't use a window store, so I don't have to worry about the retention time. To answer your question about the window store's topic, it configures a retention time the same length as the store's retention time, (and they keys are the full windowed key including the window start time), so it'll have roughly the same size bound as the store itself. Back to the process of figuring out what might be wrong with Suppression, I don't suppose you would be able to file a Jira and upload a repro program? If not, that's ok. I haven't been able to reproduce the bug yet, but it seems like it's happening somewhat consistently for you, so I should be able to get it to happen eventually. Thanks, and sorry again for the troubles. -John On Tue, Jan 8, 2019 at 6:48 AM Peter Levart wrote: > > > On 1/8/19 12:57 PM, Peter Levart wrote: > > Hi John, > > > > On 1/8/19 12:45 PM, Peter Levart wrote: > >>> I looked at your custom transfomer, and it looks almost correct to > >>> me. The > >>> only flaw seems to be that it only looks > >>> for closed windows for the key currently being processed, which > >>> means that > >>> if you have key "A" buffered, but don't get another event for it for a > >>> while after the window closes, you won't emit the final result. This > >>> might > >>> actually take longer than the window retention period, in which > >>> case, the > >>> data would be deleted without ever emitting the final result. > >> > >> So in DSL case, the suppression works by flushing *all* of the "ripe" > >> windows in the whole buffer whenever a singe event comes in with > >> recent enough timestamp regardless of the key of that event? > >> > >> Is the buffer shared among processing tasks or does each task > >> maintain its own private buffer that only contains its share of data > >> pertaining to assigned input partitions? In case the tasks are > >> executed on several processing JVM(s) the buffer can't really be > >> shared, right? In that case a single event can't flush all of the > >> "ripe" windows, but just those that are contained in the task's part > >> of buffer... > > > > Just a question about your comment above: > > > > /"This might actually take longer than the window retention period, in > > which case, the data would be deleted without ever emitting the final > > result"/ > > > > Are you talking about the buffer log topic retention? Aren't log > > topics configured to "compact" rather than "delete" messages? So the > > last "version" of the buffer entry for a particular key should stay > > forever? What are the keys in suppression buffer log topic? Are they a > > pair of (timestamp, key) ? Probably not since in that case the > > compacted log would grow indefinitely... > > > > Another question: > > > > What are the keys in WindowStore's log topic? If the input keys to the > > processor that uses such WindowStore consist of a bounded set of > > values (for example user ids), would compacted log of such WindowStore > > also be bounded? > > In case the key of WindowStore log topic is (timestamp, key) then would > explicitly deleting flushed entries from WindowStore (by putting null > value into the store) keep the compacted log bounded? In other words, > does WindowStore log topic support a special kind of "tombstone" message > that effectively removes the key from the compacted log? > > In that case, my custom processor could keep entries in its WindowStore > for as log as needed, depending on the activity of a particular input > key... > > > > > Regards, Peter > > > > > >
Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted
On 1/8/19 12:57 PM, Peter Levart wrote: Hi John, On 1/8/19 12:45 PM, Peter Levart wrote: I looked at your custom transfomer, and it looks almost correct to me. The only flaw seems to be that it only looks for closed windows for the key currently being processed, which means that if you have key "A" buffered, but don't get another event for it for a while after the window closes, you won't emit the final result. This might actually take longer than the window retention period, in which case, the data would be deleted without ever emitting the final result. So in DSL case, the suppression works by flushing *all* of the "ripe" windows in the whole buffer whenever a singe event comes in with recent enough timestamp regardless of the key of that event? Is the buffer shared among processing tasks or does each task maintain its own private buffer that only contains its share of data pertaining to assigned input partitions? In case the tasks are executed on several processing JVM(s) the buffer can't really be shared, right? In that case a single event can't flush all of the "ripe" windows, but just those that are contained in the task's part of buffer... Just a question about your comment above: /"This might actually take longer than the window retention period, in which case, the data would be deleted without ever emitting the final result"/ Are you talking about the buffer log topic retention? Aren't log topics configured to "compact" rather than "delete" messages? So the last "version" of the buffer entry for a particular key should stay forever? What are the keys in suppression buffer log topic? Are they a pair of (timestamp, key) ? Probably not since in that case the compacted log would grow indefinitely... Another question: What are the keys in WindowStore's log topic? If the input keys to the processor that uses such WindowStore consist of a bounded set of values (for example user ids), would compacted log of such WindowStore also be bounded? In case the key of WindowStore log topic is (timestamp, key) then would explicitly deleting flushed entries from WindowStore (by putting null value into the store) keep the compacted log bounded? In other words, does WindowStore log topic support a special kind of "tombstone" message that effectively removes the key from the compacted log? In that case, my custom processor could keep entries in its WindowStore for as log as needed, depending on the activity of a particular input key... Regards, Peter
Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted
Hi John, On 1/8/19 12:45 PM, Peter Levart wrote: I looked at your custom transfomer, and it looks almost correct to me. The only flaw seems to be that it only looks for closed windows for the key currently being processed, which means that if you have key "A" buffered, but don't get another event for it for a while after the window closes, you won't emit the final result. This might actually take longer than the window retention period, in which case, the data would be deleted without ever emitting the final result. So in DSL case, the suppression works by flushing *all* of the "ripe" windows in the whole buffer whenever a singe event comes in with recent enough timestamp regardless of the key of that event? Is the buffer shared among processing tasks or does each task maintain its own private buffer that only contains its share of data pertaining to assigned input partitions? In case the tasks are executed on several processing JVM(s) the buffer can't really be shared, right? In that case a single event can't flush all of the "ripe" windows, but just those that are contained in the task's part of buffer... Just a question about your comment above: /"This might actually take longer than the window retention period, in which case, the data would be deleted without ever emitting the final result"/ Are you talking about the buffer log topic retention? Aren't log topics configured to "compact" rather than "delete" messages? So the last "version" of the buffer entry for a particular key should stay forever? What are the keys in suppression buffer log topic? Are they a pair of (timestamp, key) ? Probably not since in that case the compacted log would grow indefinitely... Another question: What are the keys in WindowStore's log topic? If the input keys to the processor that uses such WindowStore consist of a bounded set of values (for example user ids), would compacted log of such WindowStore also be bounded? Regards, Peter
Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted
Hi John, On 1/7/19 9:10 PM, John Roesler wrote: Hi Peter, Sorry, I just now have seen this thread. You asked if this behavior is unexpected, and the answer is yes. Suppress.untilWindowCloses is intended to emit only the final result, regardless of restarts. You also asked how the suppression buffer can resume after a restart, since it's not persistent. The answer is the same as for in-memory stores. The state of the store (or buffer, in this case) is persisted to a changelog topic, which is re-read on restart to re-create the exact state prior to shutdown. "Persistent" in the store nomenclature refers only to "persistent on the local disk". Just to confirm your response regarding the buffer size: While it is better to use the public ("Suppressed.unbounded()") API, yes, your buffer was already unbounded. I looked at your custom transfomer, and it looks almost correct to me. The only flaw seems to be that it only looks for closed windows for the key currently being processed, which means that if you have key "A" buffered, but don't get another event for it for a while after the window closes, you won't emit the final result. This might actually take longer than the window retention period, in which case, the data would be deleted without ever emitting the final result. So in DSL case, the suppression works by flushing *all* of the "ripe" windows in the whole buffer whenever a singe event comes in with recent enough timestamp regardless of the key of that event? Is the buffer shared among processing tasks or does each task maintain its own private buffer that only contains its share of data pertaining to assigned input partitions? In case the tasks are executed on several processing JVM(s) the buffer can't really be shared, right? In that case a single event can't flush all of the "ripe" windows, but just those that are contained in the task's part of buffer... You said you think it should be possible to get the DSL version working, and I agree, since this is exactly what it was designed for. Do you mind filing a bug in the "KAFKA" Jira project ( https://issues.apache.org/jira/secure/Dashboard.jspa)? It will be easier to keep the investigation organized that way. Will do that. In the mean time, I'll take another look at your logs above and try to reason about what could be wrong. Just one clarification... For example, you showed [pool-1-thread-4] APP Consumed: [c@1545398874000/1545398876000] -> [14, 272, 548, 172], sum: 138902 [pool-1-thread-4] APP Consumed: [c@1545398874000/1545398876000] -> [14, 272, 548, 172, 596, 886, 780] INSTEAD OF [14, 272, 548, 172], sum: 141164 Am I correct in thinking that the first, shorter list is the "incremental" version, and the second is the "final" version? I think so, but am confused by "INSTEAD OF". It's the other way around. The 1st list (usually the longer one) is what has just been consumed and the second is what had been consumed before that for the same key (I maintain a ConcurrentHashMap of consumed entries in the test and execute: secondList = map.put(key, firstList) In majority of cases, the consumed list is an incremental update of some previous version of the list (not necessarily direct descendant) consumed before that, but as said, I also observed the final window result before processor restart and after restart some previous version of non-final window aggregation for the same key. May I also note that there is some "jitter" in the input timestamps because I'm trying to model a real usecase where there will be several input(s) to the system with only approximately synchronized clocks. The jitter is kept well below the TimeWindow grace period so there should be no events consumed by the processor that belong to windows that have already been flushed. Regards, Peter Thanks for the report, -John On Wed, Dec 26, 2018 at 3:21 AM Peter Levart wrote: On 12/21/18 3:16 PM, Peter Levart wrote: I also see some results that are actual non-final window aggregations that precede the final aggregations. These non-final results are never emitted out of order (for example, no such non-final result would ever come after the final result for a particular key/window). Absence of proof is not the proof of absence... And I have later observed (using the DSL variant, not the custom Transformer) an occurrence of a non-final result that was emited after restart of streams processor while the final result for the same key/window had been emitted before the restart: [pool-1-thread-4] APP Consumed: [a@154581526/1545815262000] -> [550, 81, 18, 393, 968, 847, 452, 0, 0, 0], sum: 444856 ... ... restart ... ... [pool-1-thread-4] APP Consumed: [a@154581526/1545815262000] -> [550] INSTEAD OF [550, 81, 18, 393, 968, 847, 452, 0, 0, 0], sum: 551648 The app logic can not even rely on guarantee that results are ordered then. This is really not usable until the bug is fixed. Regards, Peter
Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted
Hi Peter, Sorry, I just now have seen this thread. You asked if this behavior is unexpected, and the answer is yes. Suppress.untilWindowCloses is intended to emit only the final result, regardless of restarts. You also asked how the suppression buffer can resume after a restart, since it's not persistent. The answer is the same as for in-memory stores. The state of the store (or buffer, in this case) is persisted to a changelog topic, which is re-read on restart to re-create the exact state prior to shutdown. "Persistent" in the store nomenclature refers only to "persistent on the local disk". Just to confirm your response regarding the buffer size: While it is better to use the public ("Suppressed.unbounded()") API, yes, your buffer was already unbounded. I looked at your custom transfomer, and it looks almost correct to me. The only flaw seems to be that it only looks for closed windows for the key currently being processed, which means that if you have key "A" buffered, but don't get another event for it for a while after the window closes, you won't emit the final result. This might actually take longer than the window retention period, in which case, the data would be deleted without ever emitting the final result. You said you think it should be possible to get the DSL version working, and I agree, since this is exactly what it was designed for. Do you mind filing a bug in the "KAFKA" Jira project ( https://issues.apache.org/jira/secure/Dashboard.jspa)? It will be easier to keep the investigation organized that way. In the mean time, I'll take another look at your logs above and try to reason about what could be wrong. Just one clarification... For example, you showed > [pool-1-thread-4] APP Consumed: [c@1545398874000/1545398876000] -> [14, 272, 548, 172], sum: 138902 > [pool-1-thread-4] APP Consumed: [c@1545398874000/1545398876000] -> [14, 272, 548, 172, 596, 886, 780] INSTEAD OF [14, 272, 548, 172], sum: 141164 Am I correct in thinking that the first, shorter list is the "incremental" version, and the second is the "final" version? I think so, but am confused by "INSTEAD OF". Thanks for the report, -John On Wed, Dec 26, 2018 at 3:21 AM Peter Levart wrote: > > > On 12/21/18 3:16 PM, Peter Levart wrote: > > I also see some results that are actual non-final window aggregations > > that precede the final aggregations. These non-final results are never > > emitted out of order (for example, no such non-final result would ever > > come after the final result for a particular key/window). > > Absence of proof is not the proof of absence... And I have later > observed (using the DSL variant, not the custom Transformer) an > occurrence of a non-final result that was emited after restart of > streams processor while the final result for the same key/window had > been emitted before the restart: > > [pool-1-thread-4] APP Consumed: [a@154581526/1545815262000] -> [550, > 81, 18, 393, 968, 847, 452, 0, 0, 0], sum: 444856 > ... > ... restart ... > ... > [pool-1-thread-4] APP Consumed: [a@154581526/1545815262000] -> [550] > INSTEAD OF [550, 81, 18, 393, 968, 847, 452, 0, 0, 0], sum: 551648 > > > The app logic can not even rely on guarantee that results are ordered > then. This is really not usable until the bug is fixed. > > Regards, Peter > >
Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted
On 12/21/18 3:16 PM, Peter Levart wrote: I also see some results that are actual non-final window aggregations that precede the final aggregations. These non-final results are never emitted out of order (for example, no such non-final result would ever come after the final result for a particular key/window). Absence of proof is not the proof of absence... And I have later observed (using the DSL variant, not the custom Transformer) an occurrence of a non-final result that was emited after restart of streams processor while the final result for the same key/window had been emitted before the restart: [pool-1-thread-4] APP Consumed: [a@154581526/1545815262000] -> [550, 81, 18, 393, 968, 847, 452, 0, 0, 0], sum: 444856 ... ... restart ... ... [pool-1-thread-4] APP Consumed: [a@154581526/1545815262000] -> [550] INSTEAD OF [550, 81, 18, 393, 968, 847, 452, 0, 0, 0], sum: 551648 The app logic can not even rely on guarantee that results are ordered then. This is really not usable until the bug is fixed. Regards, Peter
Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted
Hello Guozhang, May I just add some more observations which might help you pin-point the problem... When the process that runs the kafka streams processing threads is restarted, I can see duplicates in the output topic. But that is understandable for "at least once semantics" and I don't mind if there are duplicates if they are duplicates of final results of window aggregations. My logic is prepared for that. But I also see some results that are actual non-final window aggregations that precede the final aggregations. These non-final results are never emitted out of order (for example, no such non-final result would ever come after the final result for a particular key/window). For example, here are some log fragments of a sample consumption of the output topic where I detect either duplicates or "incremental updates" of some key/window and mark them with "INSTEAD OF" words. I only show incremental updates here: [pool-1-thread-4] APP Consumed: [c@1545398874000/1545398876000] -> [14, 272, 548, 172], sum: 138902 [pool-1-thread-4] APP Consumed: [c@1545398874000/1545398876000] -> [14, 272, 548, 172, 596, 886, 780] INSTEAD OF [14, 272, 548, 172], sum: 141164 or: [pool-1-thread-2] APP Consumed: [c@1545398882000/1545398884000] -> [681, 116, 542, 543, 0, 0, 0, 0], sum: 143046 [pool-1-thread-2] APP Consumed: [c@1545398882000/1545398884000] -> [681, 116, 542, 543, 0, 0, 0, 0, 0, 0, 0, 0] INSTEAD OF [681, 116, 542, 543, 0, 0, 0, 0], sum: 143046 The rule seems to be that almost always the non-final result precedes immediately in the log the final result. I say almost, because I also saw one occurrence of the following: [pool-1-thread-3] APP Consumed: [b@1545398878000/154539888] -> [756, 454, 547, 300, 323], sum: 166729 [pool-1-thread-3] APP Consumed: [b@154539888/1545398882000] -> [193, 740, 660, 981], sum: 169303 [pool-1-thread-3] APP Consumed: [b@1545398878000/154539888] -> [756, 454, 547, 300, 323, 421, 378, 354, 0] INSTEAD OF [756, 454, 547, 300, 323], sum: 170456 [pool-1-thread-3] APP Consumed: [b@154539888/1545398882000] -> [193, 740, 660, 981, 879, 209, 104, 0, 0, 0] INSTEAD OF [193, 740, 660, 981], sum: 171648 Here the incremental update of the key/window happened for two consecutive 2 second windows in close succession and the results were intermingled. What you see in the above log before the window start/end timestamps is a Sting key which is used in groupByKey (a, b, c, d). The input and output topics have 4 partitions and I use 4 streams processing threads... Hope this helps you find the problem. So could this be considered a bug? I don't know how this suppression is supposed to work, but it seems that it does not use any persistent storage for suppression buffer. So after the streams processing process is restarted, it starts with a fresh buffer. What mechanism are used to guarantee that in spite of that, the suppress(untilWindowCloses) suppresses non-final results? Regards, Peter On 12/21/18 10:48 AM, Peter Levart wrote: Hello Guozhang, Thank you for looking into this problem. I noticed that I have been using an internal class constructor and later discovered the right API to create the StrictBufferConfig implementations. But I'm afraid that using your proposed factory method won't change anything since its implementation is as follows: static StrictBufferConfig unbounded() { return new StrictBufferConfigImpl(); } ...it creates an instance of the same class as my sample code below, so the program behaves the same... What does this mean? Was your suggestion meant to rule-out any other possible causes and your suspicion still holds or did you suspect that I was not using suppression buffer of sufficient size? Regards, Peter On 12/21/18 1:58 AM, Guozhang Wang wrote: Hello Peter, Thanks for filing this report, I've looked into the source code and I think I may spotted an edge case to your observations. To validate if my suspicion is correct, could you try modifying your DSL code a little bit, to use a very large suppression buffer size --- BTW the StrictBufferConfigImpl is an internal class (you can tell by its name) and are not recommend to use in your code. More specifically: Suppressed.untilWindowCloses(BufferConfig.unbounded()) -- and see if this issue still exists? Guozhang On Wed, Dec 19, 2018 at 1:50 PM Peter Levart wrote: I see the list processor managed to smash may beautifully formatted HTML message. For that reason I'm re-sending the sample code snippet in plain text mode... Here's a sample kafka streams processor: KStream input = builder .stream( inputTopic, Consumed.with(Serdes.String(), new Val.Serde()) .withTimestampExtractor((rec, prevTs) -> { String key = (String) rec.key();
Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted
Hello Guozhang, Thank you for looking into this problem. I noticed that I have been using an internal class constructor and later discovered the right API to create the StrictBufferConfig implementations. But I'm afraid that using your proposed factory method won't change anything since its implementation is as follows: static StrictBufferConfig unbounded() { return new StrictBufferConfigImpl(); } ...it creates an instance of the same class as my sample code below, so the program behaves the same... What does this mean? Was your suggestion meant to rule-out any other possible causes and your suspicion still holds or did you suspect that I was not using suppression buffer of sufficient size? Regards, Peter On 12/21/18 1:58 AM, Guozhang Wang wrote: Hello Peter, Thanks for filing this report, I've looked into the source code and I think I may spotted an edge case to your observations. To validate if my suspicion is correct, could you try modifying your DSL code a little bit, to use a very large suppression buffer size --- BTW the StrictBufferConfigImpl is an internal class (you can tell by its name) and are not recommend to use in your code. More specifically: Suppressed.untilWindowCloses(BufferConfig.unbounded()) -- and see if this issue still exists? Guozhang On Wed, Dec 19, 2018 at 1:50 PM Peter Levart wrote: I see the list processor managed to smash may beautifully formatted HTML message. For that reason I'm re-sending the sample code snippet in plain text mode... Here's a sample kafka streams processor: KStream input = builder .stream( inputTopic, Consumed.with(Serdes.String(), new Val.Serde()) .withTimestampExtractor((rec, prevTs) -> { String key = (String) rec.key(); Val val = (Val) rec.value(); return Math.max(val.getTimestamp(), Math.max(0L, prevTs - 4000)); }) ); KStream, IntegerList> grouped = input .groupByKey() .windowedBy( TimeWindows.of(Duration.ofSeconds(1)) .advanceBy(Duration.ofSeconds(1)) .grace(Duration.ofSeconds(5)) ) .aggregate( IntegerList::new, (k, v, list) -> { list.add(v.getValue()); return list; }, Materialized.with(Serdes.String(), new IntegerList.Serde()) ) .suppress( Suppressed.untilWindowCloses(new StrictBufferConfigImpl()) ) .toStream(); grouped.to( outputTopic, Produced.with(new SelfSerde.TimeWindowed<>(Serdes.String()), new IntegerList.Serde()) ); Regards, Peter
Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted
Hello Peter, Thanks for filing this report, I've looked into the source code and I think I may spotted an edge case to your observations. To validate if my suspicion is correct, could you try modifying your DSL code a little bit, to use a very large suppression buffer size --- BTW the StrictBufferConfigImpl is an internal class (you can tell by its name) and are not recommend to use in your code. More specifically: Suppressed.untilWindowCloses(BufferConfig.unbounded()) -- and see if this issue still exists? Guozhang On Wed, Dec 19, 2018 at 1:50 PM Peter Levart wrote: > I see the list processor managed to smash may beautifully formatted HTML > message. For that reason I'm re-sending the sample code snippet in plain > text mode... > > Here's a sample kafka streams processor: > > KStream input = > builder > .stream( > inputTopic, > Consumed.with(Serdes.String(), new Val.Serde()) > .withTimestampExtractor((rec, prevTs) -> { > String key = (String) rec.key(); > Val val = (Val) rec.value(); > return Math.max(val.getTimestamp(), > Math.max(0L, prevTs - 4000)); > }) > ); > > KStream, IntegerList> grouped = > input > .groupByKey() > .windowedBy( > TimeWindows.of(Duration.ofSeconds(1)) > .advanceBy(Duration.ofSeconds(1)) > .grace(Duration.ofSeconds(5)) > ) > .aggregate( > IntegerList::new, > (k, v, list) -> { > list.add(v.getValue()); > return list; > }, > Materialized.with(Serdes.String(), new > IntegerList.Serde()) > ) > .suppress( > Suppressed.untilWindowCloses(new > StrictBufferConfigImpl()) > ) > .toStream(); > > grouped.to( > outputTopic, > Produced.with(new > SelfSerde.TimeWindowed<>(Serdes.String()), new IntegerList.Serde()) > ); > > > > Regards, Peter > > -- -- Guozhang
Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted
I see the list processor managed to smash may beautifully formatted HTML message. For that reason I'm re-sending the sample code snippet in plain text mode... Here's a sample kafka streams processor: KStream input = builder .stream( inputTopic, Consumed.with(Serdes.String(), new Val.Serde()) .withTimestampExtractor((rec, prevTs) -> { String key = (String) rec.key(); Val val = (Val) rec.value(); return Math.max(val.getTimestamp(), Math.max(0L, prevTs - 4000)); }) ); KStream, IntegerList> grouped = input .groupByKey() .windowedBy( TimeWindows.of(Duration.ofSeconds(1)) .advanceBy(Duration.ofSeconds(1)) .grace(Duration.ofSeconds(5)) ) .aggregate( IntegerList::new, (k, v, list) -> { list.add(v.getValue()); return list; }, Materialized.with(Serdes.String(), new IntegerList.Serde()) ) .suppress( Suppressed.untilWindowCloses(new StrictBufferConfigImpl()) ) .toStream(); grouped.to( outputTopic, Produced.with(new SelfSerde.TimeWindowed<>(Serdes.String()), new IntegerList.Serde()) ); Regards, Peter
KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted
Hello, I'm trying to use kafka streams to aggregate some time series data using 1 second tumbling time windows. The data is ordered approximately by timestamp with some "jitter" which I'm limiting at the input by a custom TimestampExtractor that moves events into the future if they come in to late guaranteeing that the timestamp of each event never jumps back for more that 4 seconds according to previous most recent event timestamp. I then give the tumbling windows a grace period of 5 seconds... Here's a sample kafka streams processor: KStream input =builder.stream(inputTopic,Consumed.with(Serdes.String(), new Val.Serde()).withTimestampExtractor((rec, prevTs) -> {String key = (String) rec.key();Val val = (Val) rec.value();return Math.max(val.getTimestamp(), Math.max(0L, prevTs - 4000));}));KStream, IntegerList> grouped =input.groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(1)).advanceBy(Duration.ofSeconds(1)).grace(Duration.ofSeconds(5))).aggregate(IntegerList::new,(k, v, list) -> {list.add(v.getValue());return list;},Materialized.with(Serdes.String(), new IntegerList.Serde())).suppress(Suppressed.untilWindowCloses(new StrictBufferConfigImpl())).toStream();grouped.to(outputTopic,Produced.with(new SelfSerde.TimeWindowed<>(Serdes.String()), new IntegerList.Serde())); I'm using KTable.suppress with Suppressed.untilWindowCloses to suppress all but final versions of aggregations. This works as expected and I only get one final result per grouping key and window instance in the output topic. But this only works as expected and advertised until I restart the karfka streams process during the course of aggregating the events. After restart, I can see some non-final versions of aggregations in the output topic followed by final versions. So the guarantee advertised by Suppressed.untilWindowCloses() which says: /"This option is suitable for use cases in which the business logic requires a hard guarantee that only the final result is propagated."/ ...is only true when the kafka streams process is not restarted. Is this expected behavior or maybe a bug? Thanks, Peter Levart