Aaraon,

I would add that the way the fields are declared in the record they are not 
mandatory. So you could define all of the possible fields you could need in the 
schema. This has the disadvantage that any unused fields would then be defined 
as null in the output, which will clutter up your output. If you declare the 
value of “Suppress Null Values” in the writer service to “Suppress Missing 
Values” then this will take care of that.

If fields can take on multiple types then as Lehel says, use a union type, 
which is a just a fancy way of saying declare the possible types in an array. 
Union types can include having a field be either a simple type or a complex 
type. Specification | Apache 
Avro<https://avro.apache.org/docs/1.11.1/specification/#unions>.

I also note the schema specification allows the declaration of an “aliases” 
definition for a field, which allows you to define alternative names for a 
given field type. Useful if you are encountering different sources which define 
different names for essentially the same data. I have not used this myself.

A compromise approach would be to use an attribute to tell the writer which 
schema the message adheres to. If the pattern of data fields you might get can 
be categorised into a few schemas then store the schemas in an Avro schema 
registry and apply an attribute to the flow file to point at the correct schema 
for the particular contents. This approach may make schema maintenance easier, 
depending on your processes.

Also look at the power of the “Infer Schema” value for the schema access 
strategy in the reader. This can work very well in most cases, and can 
interpret nested complex types successfully. You can come unstuck if there are 
mixed types in the flow file, as the infer strategy mostly builds the schema 
from what it finds in the first record. So if there are other records that do 
not conform it will generate an error. This has been improving over previous 
releases so some of the examples that fail in our production model now work on 
a new dev cluster. Anyway, if you do come across these problems you can usually 
solve them by using a split record (to 1 record per flow file) or a partition 
record processor to separate out the different types before processing.

One final point, your DBA will love you if you turn those date strings into 
longs 😉

I hope this gives you the extra tools you need to solve your specific problem. 
Regards,

Steve Hindmarch

From: Lehel Boér <[email protected]>
Sent: Thursday, November 23, 2023 3:44 PM
To: [email protected]
Subject: Re: RecordReader/Writer with nested json resulting in MapRecord[] 
statement

It's a tradeoff between flexibility and structure. You can define the nested 
structure as a string in the schema and it can contain any json structured data 
but you NiFi won't be able to parse it.
If your structure is the same but the data types can be different you might 
check the UNION type in Avro
https://stackoverflow.com/a/50177318
________________________________
From: Aaron Rich <[email protected]<mailto:[email protected]>>
Sent: Thursday, November 23, 2023 7:15
To: [email protected]<mailto:[email protected]> 
<[email protected]<mailto:[email protected]>>
Subject: Re: RecordReader/Writer with nested json resulting in MapRecord[] 
statement

Thanks Lehel!

Follow up question - since I'm focusing on the could event wrapper, is there a 
way to be able to define the data nested record generically so that it can work 
for any structure in the data field? I was hoping by calling it a string that 
would let me do it but obviously not.

I could end up with 100+ different types in the data record and was hoping I 
could just drop any of them into the database as long as they are JSON to match 
the column type. I really don't want to have to make each record schema if I 
can get a generic "JSON Record" definition that might work (hopefully this 
makes sense).

Thank you so much for your help!

-Aaron

On Wed, Nov 22, 2023, 2:51 AM Lehel Boér 
<[email protected]<mailto:[email protected]>> wrote:
Hi Aaron,

You need to define the schema for the "data" field as a nested record.

{
        "name": "data",
        "type": {
            "type": "record",
            "name": "nestedData",
            "fields": [
                {"name": "message_type", "type": "string"},
                {"name": "message_time", "type": "string"},
                {"name": "file_ingest_time", "type": "string"}
            ]
        }
    }

Best Regards,
Lehel
________________________________
From: Aaron Rich <[email protected]<mailto:[email protected]>>
Sent: Wednesday, November 22, 2023 7:30
To: [email protected]<mailto:[email protected]> 
<[email protected]<mailto:[email protected]>>; 
[email protected]<mailto:[email protected]> 
<[email protected]<mailto:[email protected]>>
Subject: RecordReader/Writer with nested json resulting in MapRecord[] statement

Hello,

I'm trying to use the record writer/writers to take a json defined event and 
write it to postgres database. The event follows the CloudEvent definition and 
has a data element that is JSON structured:

{
  "data" : {
    "message_type" : "V5",
    "message_time" : "1614597071000",
    "file_ingest_time": "1682018983320"
  },
  "spec_version" : "1.0.2",
  "subject" : "1969",
  "data_schema" : "URI://something",
  "id" : "80e12364-831e-446a-a260-86bccc469f25",
  "source" : "service",
  "time" : "2023-11-22T00:03:03Z",
  "type" : "V5",
  "data_content_type" : "application/json"
}

I'm trying to write into a postgres db table that has each field as a column 
and data as a JSON type column.

When I try to write the the database with PutDatabaseRecord, I'm getting 
MapRecord sting that is trying to be written into the data column:
MapRecord[{message_type=V5, 
message_time=1614597071000,file_ingest_time=1682018983320}]

I _think_ the issue is with the schema I'm defining:
{
 "type": "record",

 "name": "eventSchema0",

 "fields" : [
    {"name":"data", "type": "string"},
    {"name":"spec_version", "type": "string"},
    {"name":"subject", "type": "string"},
    {"name":"data_schema", "type": "string"},
    {"name":"id", "type": "string"},
    {"name":"source", "type": "string"},
    {"name":"time", "type": "string"},
    {"name":"type", "type": "string"},
    {"name":"data_content_type", "type": "string"}
]

}

What do I need to change so the JSON structure in the "data" field can be 
properly written using a record based processor?

Thanks for the help.

-Aaron

Reply via email to