Hi Suresh,

It seems the keys are different for each message - I see 0, 1 and 2. When
you say "This topic contains the same keys" do you mean the message
contents contain the same "attr1" "attr2" "attr3" keys? If so, I think the
first step would be to re-key the messages so the new key is
"attr1:attr2:attr3" or something like that - the solutions I can think of
require all messages that make up the merged message be sent to the same
partition.

Once that is done, I see two options.

The first one would be to write an streams application that uses the
Processor API - see
https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration.
You could create a state store and use that to maintain the merged message.
As each message comes in you update the store merged value in the state
store.

The second option I can think of would be to use the Kafka Streams DSL
"groupByKey" operator - see
https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-aggregating.
This lets you declaratively define an aggregation function that would do
the merging. I'd suggest this option if possible as it's less code and you
don't have to manage a state store yourself.

Both solutions require all messages that make up the merged message be sent
to the same partition, hence the suggestion to re-key mentioned above.

It's not clear to me if you require only the "finalized" message be emitted
downstream. In either case you could filter emitted messages so only the
one with all the keys is passed to downstream operators.

Hope that helps.

Thanks,
Andrew

On Sun, Dec 6, 2020 at 9:39 AM Suresh Chidambaram <[email protected]>
wrote:

> Hi,
>
> Greetings!
>
> My requirement is as below.
>
> I have Topic named  "sample-topic".  This topic contains the same keys(as
> String) with multiple messages and the messages are in JSON format. I would
> like to merge the JSON messages and produce a final JSON. In order to
> achieve this, how to maintain the state of the previous message?
>
> Below are the input messages in the topic.
>
> Key : 0       Message: {    "attr1" : "value1", "attr2" : "value_01",
> "attr3" :"value_3" }
>
> Key : 1       Message: {     "attr1": "value_x", "attr2": "value2" ,
> "attr3" : "value_y"}
>
> Key : 2       Message: {     "attr1": "value_m", "attr2": null , "attr3" :
> "value_o"}
>
>
> Now, I need the resultant JSON as below.
>
> { "attr1": "value_m", "attr2": "value2" , "attr3" : "value_o"}
>
>
> Could someone help me by letting me know the solution? Thank you.
>
> Thanks
>
> C Suresh
>


-- 
Andrew Grant
8054482621

Reply via email to