[ 
https://issues.apache.org/jira/browse/KAFKA-7718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17144827#comment-17144827
 ] 

Jorge Esteban Quilcate Otoya commented on KAFKA-7718:
-----------------------------------------------------

My assumption is that if headers are part of the value and there is a Serde for 
`ValueAndHeaders`, then it should be possible to use headers as part of joins 
or aggregations.

This doesn't mean record headers will change as part of those operations 
though. `ProcessorContext#headers` will change only when `KStream#setHeaders` 
is called.

A more extended example will look like:
{code:java}
builder.stream("input", Consumed.with(Serdes.String(), Serdes.String()))
 .withHeaders()
 .filter((k, v) -> v.headers().headers("k").iterator().hasNext())
 .filter((k, v) -> Arrays.equals(v.headers().lastHeader("k").value(), 
"v".getBytes()))
 .groupByKey(Grouped.with(Serdes.String(), new 
ValueAndHeadersSerde<>(Serdes.String())))
 .reduce((oldValue, newValue) -> {
 newValue.headers().add("reduced", "yes".getBytes());
 return new ValueAndHeaders<>(oldValue.value().concat(newValue.value()), 
newValue.headers());
 })
 .toStream()
 .setHeaders((k, v) -> v.headers())
 .mapValues((k, v) -> v.value())
 .to("output", Produced.with(Serdes.String(), Serdes.String()));{code}
`ValuaAndHeadersSerde` would be included as part fo the KIP.

> Allow customized header inheritance for stateful operators in DSL
> -----------------------------------------------------------------
>
>                 Key: KAFKA-7718
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7718
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Guozhang Wang
>            Priority: Major
>              Labels: needs-kip
>
> As a follow-up work of 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-244%3A+Add+Record+Header+support+to+Kafka+Streams+Processor+API,
>  we want to provide allow users to customize how record headers are inherited 
> while traversing the topology at the DSL layer (at the lower-level Processor 
> API layer, users are already capable for customizing and inheriting the 
> headers as they forward the records to next processor nodes).
> Today the headers are implicitly inherited throughout the topology without 
> any modifications within the Streams library. For stateless operators 
> (filter, map, etc) this default inheritance policy should be sufficient. For 
> stateful operators where multiple input records may be generating a single 
> record (i.e. it is an n:1 transformations rather than 1:1 mapping), since we 
> only inherit from the triggering record, which would seem to be a "random" 
> choice to the users and other records' headers are lost.
> I'd propose we extend DSL to allow users to customize the headers inheritance 
> policy for stateful operators, namely Joins and Aggregations. It would 
> contain two parts:
> 1) On the DSL layer, I'd suggest we extend `Joined` and `Grouped` control 
> object with an additional function that allows users to pass in a lambda 
> function (let's say its called HeadersMerger, but name subject to discuss 
> over KIP) that takes two Headers object and generated a single Headers object 
> in the return value.
> 2) On the implementation layer, we need to actually store the headers at the 
> materialized state store so that they can be retrieved along with the record 
> for join / aggregation processor. This would be changing the state store 
> value bytes organization and hence better be considered carefully. Then when 
> join / aggregate processor is triggered, the Headers of both records will be 
> retrieved (one from the triggering record, one read from the materialized 
> state store) and then passed to the HeadersMerger. Some low-hanging 
> optimizations can be considered though, e.g. if users do not have overridden 
> this interface, then we can consider not reading the headers from the other 
> side at all to save IO cost.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to