Hi Suresh, It seems the keys are different for each message - I see 0, 1 and 2. When you say "This topic contains the same keys" do you mean the message contents contain the same "attr1" "attr2" "attr3" keys? If so, I think the first step would be to re-key the messages so the new key is "attr1:attr2:attr3" or something like that - the solutions I can think of require all messages that make up the merged message be sent to the same partition.
Once that is done, I see two options. The first one would be to write an streams application that uses the Processor API - see https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration. You could create a state store and use that to maintain the merged message. As each message comes in you update the store merged value in the state store. The second option I can think of would be to use the Kafka Streams DSL "groupByKey" operator - see https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-aggregating. This lets you declaratively define an aggregation function that would do the merging. I'd suggest this option if possible as it's less code and you don't have to manage a state store yourself. Both solutions require all messages that make up the merged message be sent to the same partition, hence the suggestion to re-key mentioned above. It's not clear to me if you require only the "finalized" message be emitted downstream. In either case you could filter emitted messages so only the one with all the keys is passed to downstream operators. Hope that helps. Thanks, Andrew On Sun, Dec 6, 2020 at 9:39 AM Suresh Chidambaram <[email protected]> wrote: > Hi, > > Greetings! > > My requirement is as below. > > I have Topic named "sample-topic". This topic contains the same keys(as > String) with multiple messages and the messages are in JSON format. I would > like to merge the JSON messages and produce a final JSON. In order to > achieve this, how to maintain the state of the previous message? > > Below are the input messages in the topic. > > Key : 0 Message: { "attr1" : "value1", "attr2" : "value_01", > "attr3" :"value_3" } > > Key : 1 Message: { "attr1": "value_x", "attr2": "value2" , > "attr3" : "value_y"} > > Key : 2 Message: { "attr1": "value_m", "attr2": null , "attr3" : > "value_o"} > > > Now, I need the resultant JSON as below. > > { "attr1": "value_m", "attr2": "value2" , "attr3" : "value_o"} > > > Could someone help me by letting me know the solution? Thank you. > > Thanks > > C Suresh > -- Andrew Grant 8054482621
