[ https://issues.apache.org/jira/browse/KAFKA-10417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17213535#comment-17213535 ]
John Roesler commented on KAFKA-10417: -------------------------------------- Hi [~duvholt] , Thanks for sharing that. It looks like it's just ignoring the request if the parent processor isn't the right type? It might be working for you, but if you have any operations downstream that actually need the old values, then you should beware that you're now silently getting incorrect results. On the other hand, if you're _just_ doing it to work around the bug that manifests when you use suppress, then you're probably ok. It's risky, though, so you might want to take a closer look and/or prioritize putting in a proper fix. The KTableValueGetter/enableOldValues thing is _super_ confusing, and as a matter of fact, it's something I'm planning to clean up now that KIP-478 is finally over the line. So don't feel bad if it's as clear as mud when you try to figure out the right way to fix it, it's that way for everyone. What's happening in the cogroup case is that there are a set of aggregator processors whose results all get "merged" together, and the KTable is the result of that merge. The aggregation processors themselves are capable of `enableSendOldValues`, but the implementation uses the PassThrough for that merge node, which is not aware of "send old values". What seems to be needed is for the cogroup merge processor to get a new kind of PassThrough that is capable of transmitting "enable send old values" to its parents. I'm not sure whether or not the method could be added to PassThrough harmlessly, or whether we need some kind of KTableSourcePassThrough supplier. Does this sound like the kind of thing you want to take a crack at? Although it may not technically be a blocker, it would be ideal to get a fix for this in 2.7 . Thanks, -John > suppress() with cogroup() throws ClassCastException > --------------------------------------------------- > > Key: KAFKA-10417 > URL: https://issues.apache.org/jira/browse/KAFKA-10417 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.6.0 > Reporter: Wardha Perinkada Kattu > Priority: Critical > Labels: kafka-streams > Fix For: 2.7.0, 2.6.1 > > > Streams operation - `cogroup()`, `aggregate()` followed by `suppress()` > throws `ClassCastException` > Works fine without the `suppress()` > Code block tested - > {code:java} > val stream1 = requestStreams.merge(successStreams).merge(errorStreams) > .groupByKey(Grouped.with(Serdes.String(), > serdesConfig.notificationSerde())) > val streams2 = confirmationStreams > .groupByKey(Grouped.with(Serdes.String(), > serdesConfig.confirmationsSerde())) > val cogrouped = > stream1.cogroup(notificationAggregator).cogroup(streams2, > confirmationsAggregator) > > .windowedBy(TimeWindows.of(Duration.ofMinutes(notificationStreamsConfig.joinWindowMinutes.toLong())).grace(Duration.ofMinutes(notificationStreamsConfig.graceDurationMinutes.toLong()))) > .aggregate({ null }, Materialized.`as`<String, > NotificationMetric, WindowStore<Bytes, > ByteArray>>("time-windowed-aggregated-stream-store") > > .withValueSerde(serdesConfig.notificationMetricSerde())) > .suppress(Suppressed.untilWindowCloses(unbounded())) > .toStream() > {code} > Exception thrown is: > {code:java} > Caused by: java.lang.ClassCastException: class > org.apache.kafka.streams.kstream.internals.PassThrough cannot be cast to > class org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier > (org.apache.kafka.streams.kstream.internals.PassThrough and > org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier are in > unnamed module of loader 'app') > {code} > [https://stackoverflow.com/questions/63459685/kgroupedstream-with-cogroup-aggregate-suppress] -- This message was sent by Atlassian Jira (v8.3.4#803005)