[ 
https://issues.apache.org/jira/browse/KAFKA-10417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17242806#comment-17242806
 ] 

John Roesler commented on KAFKA-10417:
--------------------------------------

Hey [~duvholt] , I'm really sorry I missed your comment. Sadly, there's no 
documentation, but the basic point is that some aggregations need to be able to 
"subtract" the prior value before "adding" the new value. For this case, it 
requests upstream operators to send the old value along with any updates.

> suppress() with cogroup() throws ClassCastException
> ---------------------------------------------------
>
>                 Key: KAFKA-10417
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10417
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.6.0
>            Reporter: Wardha Perinkada Kattu
>            Priority: Critical
>              Labels: kafka-streams
>             Fix For: 2.8.0, 2.7.1
>
>
> Streams operation - `cogroup()`, `aggregate()` followed by `suppress()` 
> throws `ClassCastException`
> Works fine without the `suppress()`
> Code block tested -
> {code:java}
> val stream1 = requestStreams.merge(successStreams).merge(errorStreams)
>                 .groupByKey(Grouped.with(Serdes.String(), 
> serdesConfig.notificationSerde()))
>         val streams2 = confirmationStreams
>                 .groupByKey(Grouped.with(Serdes.String(), 
> serdesConfig.confirmationsSerde()))
>         val cogrouped = 
> stream1.cogroup(notificationAggregator).cogroup(streams2, 
> confirmationsAggregator)
>                 
> .windowedBy(TimeWindows.of(Duration.ofMinutes(notificationStreamsConfig.joinWindowMinutes.toLong())).grace(Duration.ofMinutes(notificationStreamsConfig.graceDurationMinutes.toLong())))
>                 .aggregate({ null }, Materialized.`as`<String, 
> NotificationMetric, WindowStore<Bytes, 
> ByteArray>>("time-windowed-aggregated-stream-store")
>                         
> .withValueSerde(serdesConfig.notificationMetricSerde()))
>                  .suppress(Suppressed.untilWindowCloses(unbounded()))
>                 .toStream()
> {code}
> Exception thrown is:
> {code:java}
> Caused by: java.lang.ClassCastException: class 
> org.apache.kafka.streams.kstream.internals.PassThrough cannot be cast to 
> class org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier 
> (org.apache.kafka.streams.kstream.internals.PassThrough and 
> org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier are in 
> unnamed module of loader 'app')
> {code}
> [https://stackoverflow.com/questions/63459685/kgroupedstream-with-cogroup-aggregate-suppress]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to