[ 
https://issues.apache.org/jira/browse/KAFKA-10417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17213535#comment-17213535
 ] 

John Roesler commented on KAFKA-10417:
--------------------------------------

Hi [~duvholt] ,

Thanks for sharing that. It looks like it's just ignoring the request if the 
parent processor isn't the right type?

It might be working for you, but if you have any operations downstream that 
actually need the old values, then you should beware that you're now silently 
getting incorrect results. On the other hand, if you're _just_ doing it to work 
around the bug that manifests when you use suppress, then you're probably ok. 
It's risky, though, so you might want to take a closer look and/or prioritize 
putting in a proper fix.

The KTableValueGetter/enableOldValues thing is _super_ confusing, and as a 
matter of fact, it's something I'm planning to clean up now that KIP-478 is 
finally over the line. So don't feel bad if it's as clear as mud when you try 
to figure out the right way to fix it, it's that way for everyone.

What's happening in the cogroup case is that there are a set of aggregator 
processors whose results all get "merged" together, and the KTable is the 
result of that merge. The aggregation processors themselves are capable of 
`enableSendOldValues`, but the implementation uses the PassThrough for that 
merge node, which is not aware of "send old values". What seems to be needed is 
for the cogroup merge processor to get a new kind of PassThrough that is 
capable of transmitting "enable send old values" to its parents. I'm not sure 
whether or not the method could be added to PassThrough harmlessly, or whether 
we need some kind of KTableSourcePassThrough supplier.

Does this sound like the kind of thing you want to take a crack at? Although it 
may not technically be a blocker, it would be ideal to get a fix for this in 
2.7 .

Thanks,

-John

> suppress() with cogroup() throws ClassCastException
> ---------------------------------------------------
>
>                 Key: KAFKA-10417
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10417
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.6.0
>            Reporter: Wardha Perinkada Kattu
>            Priority: Critical
>              Labels: kafka-streams
>             Fix For: 2.7.0, 2.6.1
>
>
> Streams operation - `cogroup()`, `aggregate()` followed by `suppress()` 
> throws `ClassCastException`
> Works fine without the `suppress()`
> Code block tested -
> {code:java}
> val stream1 = requestStreams.merge(successStreams).merge(errorStreams)
>                 .groupByKey(Grouped.with(Serdes.String(), 
> serdesConfig.notificationSerde()))
>         val streams2 = confirmationStreams
>                 .groupByKey(Grouped.with(Serdes.String(), 
> serdesConfig.confirmationsSerde()))
>         val cogrouped = 
> stream1.cogroup(notificationAggregator).cogroup(streams2, 
> confirmationsAggregator)
>                 
> .windowedBy(TimeWindows.of(Duration.ofMinutes(notificationStreamsConfig.joinWindowMinutes.toLong())).grace(Duration.ofMinutes(notificationStreamsConfig.graceDurationMinutes.toLong())))
>                 .aggregate({ null }, Materialized.`as`<String, 
> NotificationMetric, WindowStore<Bytes, 
> ByteArray>>("time-windowed-aggregated-stream-store")
>                         
> .withValueSerde(serdesConfig.notificationMetricSerde()))
>                  .suppress(Suppressed.untilWindowCloses(unbounded()))
>                 .toStream()
> {code}
> Exception thrown is:
> {code:java}
> Caused by: java.lang.ClassCastException: class 
> org.apache.kafka.streams.kstream.internals.PassThrough cannot be cast to 
> class org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier 
> (org.apache.kafka.streams.kstream.internals.PassThrough and 
> org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier are in 
> unnamed module of loader 'app')
> {code}
> [https://stackoverflow.com/questions/63459685/kgroupedstream-with-cogroup-aggregate-suppress]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to