Hi Andrew, Thank you for the suggestion. Creating the custom Processor helped me achieving the requirement. Thank you once again.
Thanks C Suresh On Sunday, December 6, 2020, Andrew Grant <[email protected]> wrote: > Hi Suresh, > > It seems the keys are different for each message - I see 0, 1 and 2. When > you say "This topic contains the same keys" do you mean the message > contents contain the same "attr1" "attr2" "attr3" keys? If so, I think the > first step would be to re-key the messages so the new key is > "attr1:attr2:attr3" or something like that - the solutions I can think of > require all messages that make up the merged message be sent to the same > partition. > > Once that is done, I see two options. > > The first one would be to write an streams application that uses the > Processor API - see > https://docs.confluent.io/platform/current/streams/ > developer-guide/dsl-api.html#applying-processors-and- > transformers-processor-api-integration. > You could create a state store and use that to maintain the merged message. > As each message comes in you update the store merged value in the state > store. > > The second option I can think of would be to use the Kafka Streams DSL > "groupByKey" operator - see > https://docs.confluent.io/platform/current/streams/ > developer-guide/dsl-api.html#streams-developer-guide-dsl-aggregating. > This lets you declaratively define an aggregation function that would do > the merging. I'd suggest this option if possible as it's less code and you > don't have to manage a state store yourself. > > Both solutions require all messages that make up the merged message be sent > to the same partition, hence the suggestion to re-key mentioned above. > > It's not clear to me if you require only the "finalized" message be emitted > downstream. In either case you could filter emitted messages so only the > one with all the keys is passed to downstream operators. > > Hope that helps. > > Thanks, > Andrew > > On Sun, Dec 6, 2020 at 9:39 AM Suresh Chidambaram <[email protected]> > wrote: > > > Hi, > > > > Greetings! > > > > My requirement is as below. > > > > I have Topic named "sample-topic". This topic contains the same keys(as > > String) with multiple messages and the messages are in JSON format. I > would > > like to merge the JSON messages and produce a final JSON. In order to > > achieve this, how to maintain the state of the previous message? > > > > Below are the input messages in the topic. > > > > Key : 0 Message: { "attr1" : "value1", "attr2" : "value_01", > > "attr3" :"value_3" } > > > > Key : 1 Message: { "attr1": "value_x", "attr2": "value2" , > > "attr3" : "value_y"} > > > > Key : 2 Message: { "attr1": "value_m", "attr2": null , "attr3" > : > > "value_o"} > > > > > > Now, I need the resultant JSON as below. > > > > { "attr1": "value_m", "attr2": "value2" , "attr3" : "value_o"} > > > > > > Could someone help me by letting me know the solution? Thank you. > > > > Thanks > > > > C Suresh > > > > > -- > Andrew Grant > 8054482621 >
