[ https://issues.apache.org/jira/browse/KAFKA-7718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17144827#comment-17144827 ]
Jorge Esteban Quilcate Otoya commented on KAFKA-7718: ----------------------------------------------------- My assumption is that if headers are part of the value and there is a Serde for `ValueAndHeaders`, then it should be possible to use headers as part of joins or aggregations. This doesn't mean record headers will change as part of those operations though. `ProcessorContext#headers` will change only when `KStream#setHeaders` is called. A more extended example will look like: {code:java} builder.stream("input", Consumed.with(Serdes.String(), Serdes.String())) .withHeaders() .filter((k, v) -> v.headers().headers("k").iterator().hasNext()) .filter((k, v) -> Arrays.equals(v.headers().lastHeader("k").value(), "v".getBytes())) .groupByKey(Grouped.with(Serdes.String(), new ValueAndHeadersSerde<>(Serdes.String()))) .reduce((oldValue, newValue) -> { newValue.headers().add("reduced", "yes".getBytes()); return new ValueAndHeaders<>(oldValue.value().concat(newValue.value()), newValue.headers()); }) .toStream() .setHeaders((k, v) -> v.headers()) .mapValues((k, v) -> v.value()) .to("output", Produced.with(Serdes.String(), Serdes.String()));{code} `ValuaAndHeadersSerde` would be included as part fo the KIP. > Allow customized header inheritance for stateful operators in DSL > ----------------------------------------------------------------- > > Key: KAFKA-7718 > URL: https://issues.apache.org/jira/browse/KAFKA-7718 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Guozhang Wang > Priority: Major > Labels: needs-kip > > As a follow-up work of > https://cwiki.apache.org/confluence/display/KAFKA/KIP-244%3A+Add+Record+Header+support+to+Kafka+Streams+Processor+API, > we want to provide allow users to customize how record headers are inherited > while traversing the topology at the DSL layer (at the lower-level Processor > API layer, users are already capable for customizing and inheriting the > headers as they forward the records to next processor nodes). > Today the headers are implicitly inherited throughout the topology without > any modifications within the Streams library. For stateless operators > (filter, map, etc) this default inheritance policy should be sufficient. For > stateful operators where multiple input records may be generating a single > record (i.e. it is an n:1 transformations rather than 1:1 mapping), since we > only inherit from the triggering record, which would seem to be a "random" > choice to the users and other records' headers are lost. > I'd propose we extend DSL to allow users to customize the headers inheritance > policy for stateful operators, namely Joins and Aggregations. It would > contain two parts: > 1) On the DSL layer, I'd suggest we extend `Joined` and `Grouped` control > object with an additional function that allows users to pass in a lambda > function (let's say its called HeadersMerger, but name subject to discuss > over KIP) that takes two Headers object and generated a single Headers object > in the return value. > 2) On the implementation layer, we need to actually store the headers at the > materialized state store so that they can be retrieved along with the record > for join / aggregation processor. This would be changing the state store > value bytes organization and hence better be considered carefully. Then when > join / aggregate processor is triggered, the Headers of both records will be > retrieved (one from the triggering record, one read from the materialized > state store) and then passed to the HeadersMerger. Some low-hanging > optimizations can be considered though, e.g. if users do not have overridden > this interface, then we can consider not reading the headers from the other > side at all to save IO cost. -- This message was sent by Atlassian Jira (v8.3.4#803005)