Attaching a copy of a sample flow for anyone that comes along with a
similar question...
On Mon, Dec 3, 2018 at 12:29 PM Matt Burgess <[email protected]> wrote:
> Mike,
>
> I'm looking into whether this is possible with UpdateRecord or not.
> For the article you mentioned, the OP wanted to nest values into a
> record that didn't exist in the input. Not sure if that's technically
> the same as your case but could be.
>
> In the meantime this Jolt spec should work with JoltTransformRecord
> (you can go directly from CSV to nested JSON without having to convert
> first):
>
> [
> {
> "operation": "shift",
> "spec": {
> "*": "user.&"
> }
> }
> ]
>
> Regards,
> Matt
>
> On Mon, Dec 3, 2018 at 11:37 AM Mike Thomsen <[email protected]>
> wrote:
> >
> > Looks like I missed this:
> >
> >
> https://community.hortonworks.com/questions/212877/want-to-convert-csv-to-nested-json-using-nifi.html
> >
> > I'll get crackin on Jolt since that seems to be the best answer at the
> moment.
> >
> > On Mon, Dec 3, 2018 at 11:24 AM Mike Thomsen <[email protected]>
> wrote:
> >>
> >> We have a need to be able to take a CSV file and convert it into a
> nested JSON structure. I did a simple test with GenerateFlowFile and a few
> ConvertRecord processors. Test was:
> >>
> >> GenerateFlowFile (JSON) -> ConvertRecord (JSON in, CSV out) ->
> ConvertRecord (CSV in, JSON out) and it threw an exception in the last
> ConvertRecord when it went from something that looked like this:
> >>
> >> user
> >> "MapRecord[{name=Test User, [email protected]}]"
> >>
> >> Sample data and schema:
> >>
> >> {
> >> "user": {
> >> "name": "Test User",
> >> "email": "[email protected]"
> >> }
> >> }
> >>
> >> {
> >> "type": "record",
> >> "name": "User",
> >> "fields": [
> >> {
> >> "name": "user",
> >> "type": {
> >> "name": "UserDetails",
> >> "type": "record",
> >> "fields": [
> >> {
> >> "name": "name",
> >> "type": "string"
> >> },
> >> {
> >> "name": "email",
> >> "type": "string"
> >> }
> >> ]
> >> }
> >> }
> >> ]
> >> }
> >>
> >> Any ideas on how to do this? I honestly don't recall every seeing
> anyone try to do this, and understand if the Record API needs a ticket or
> two to make it happen. Just need to know if I'm missing something.
> >>
> >> Thanks,
> >>
> >> Mike
>
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<template encoding-version="1.2">
<description></description>
<groupId>c555dfee-0166-1000-ae35-1bb18ead4214</groupId>
<name>Jolt Transform Example</name>
<snippet>
<processGroups>
<id>67d06e54-91e0-375d-0000-000000000000</id>
<parentGroupId>e5bd0a32-9717-3739-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>0.0</y>
</position>
<comments></comments>
<contents>
<connections>
<id>d924d975-ebf3-3761-0000-000000000000</id>
<parentGroupId>67d06e54-91e0-375d-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1
GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>67d06e54-91e0-375d-0000-000000000000</groupId>
<id>f40254d0-bcc2-392a-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression>
<loadBalancePartitionAttribute></loadBalancePartitionAttribute>
<loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus>
<loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>67d06e54-91e0-375d-0000-000000000000</groupId>
<id>f234ff6b-609d-31f6-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>f366195d-9666-3c5b-0000-000000000000</id>
<parentGroupId>67d06e54-91e0-375d-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1
GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>67d06e54-91e0-375d-0000-000000000000</groupId>
<id>f234ff6b-609d-31f6-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression>
<loadBalancePartitionAttribute></loadBalancePartitionAttribute>
<loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus>
<loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>67d06e54-91e0-375d-0000-000000000000</groupId>
<id>9bbb9135-7f5e-3827-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<controllerServices>
<id>07c16d6d-0fd5-3747-0000-000000000000</id>
<parentGroupId>67d06e54-91e0-375d-0000-000000000000</parentGroupId>
<bundle>
<artifact>nifi-registry-nar</artifact>
<group>org.apache.nifi</group>
<version>1.8.0</version>
</bundle>
<comments></comments>
<descriptors>
<entry>
<key>avro-reg-validated-field-names</key>
<value>
<name>avro-reg-validated-field-names</name>
</value>
</entry>
<entry>
<key>user</key>
<value>
<name>user</name>
</value>
</entry>
<entry>
<key>user_in</key>
<value>
<name>user_in</name>
</value>
</entry>
<entry>
<key>user_out</key>
<value>
<name>user_out</name>
</value>
</entry>
</descriptors>
<name>AvroSchemaRegistry</name>
<persistsState>false</persistsState>
<properties>
<entry>
<key>avro-reg-validated-field-names</key>
</entry>
<entry>
<key>user</key>
<value>{
"type": "record",
"name": "User",
"fields": [
{
"name": "user",
"type": {
"name": "UserDetails",
"type": "record",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "email",
"type": "string"
}
]
}
}
]
}</value>
</entry>
<entry>
<key>user_in</key>
<value>{
"type": "record",
"name": "UserIn",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "email",
"type": "string"
}
]
}</value>
</entry>
<entry>
<key>user_out</key>
<value>{
"type": "record",
"name": "UserOut",
"fields": [
{
"name": "user",
"type": {
"name": "UserDetails",
"type": "record",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "email",
"type": "string"
}
]
}
}
]
}</value>
</entry>
</properties>
<state>ENABLED</state>
<type>org.apache.nifi.schemaregistry.services.AvroSchemaRegistry</type>
</controllerServices>
<controllerServices>
<id>72c32ce9-48ab-3b01-0000-000000000000</id>
<parentGroupId>67d06e54-91e0-375d-0000-000000000000</parentGroupId>
<bundle>
<artifact>nifi-record-serialization-services-nar</artifact>
<group>org.apache.nifi</group>
<version>1.8.0</version>
</bundle>
<comments></comments>
<descriptors>
<entry>
<key>Schema Write Strategy</key>
<value>
<name>Schema Write Strategy</name>
</value>
</entry>
<entry>
<key>schema-access-strategy</key>
<value>
<name>schema-access-strategy</name>
</value>
</entry>
<entry>
<key>schema-registry</key>
<value>
<identifiesControllerService>org.apache.nifi.schemaregistry.services.SchemaRegistry</identifiesControllerService>
<name>schema-registry</name>
</value>
</entry>
<entry>
<key>schema-name</key>
<value>
<name>schema-name</name>
</value>
</entry>
<entry>
<key>schema-version</key>
<value>
<name>schema-version</name>
</value>
</entry>
<entry>
<key>schema-branch</key>
<value>
<name>schema-branch</name>
</value>
</entry>
<entry>
<key>schema-text</key>
<value>
<name>schema-text</name>
</value>
</entry>
<entry>
<key>Date Format</key>
<value>
<name>Date Format</name>
</value>
</entry>
<entry>
<key>Time Format</key>
<value>
<name>Time Format</name>
</value>
</entry>
<entry>
<key>Timestamp Format</key>
<value>
<name>Timestamp Format</name>
</value>
</entry>
<entry>
<key>Pretty Print JSON</key>
<value>
<name>Pretty Print JSON</name>
</value>
</entry>
<entry>
<key>suppress-nulls</key>
<value>
<name>suppress-nulls</name>
</value>
</entry>
<entry>
<key>output-grouping</key>
<value>
<name>output-grouping</name>
</value>
</entry>
</descriptors>
<name>JsonRecordSetWriter</name>
<persistsState>false</persistsState>
<properties>
<entry>
<key>Schema Write Strategy</key>
</entry>
<entry>
<key>schema-access-strategy</key>
<value>schema-name</value>
</entry>
<entry>
<key>schema-registry</key>
<value>07c16d6d-0fd5-3747-0000-000000000000</value>
</entry>
<entry>
<key>schema-name</key>
<value>${schema.out.name}</value>
</entry>
<entry>
<key>schema-version</key>
</entry>
<entry>
<key>schema-branch</key>
</entry>
<entry>
<key>schema-text</key>
</entry>
<entry>
<key>Date Format</key>
</entry>
<entry>
<key>Time Format</key>
</entry>
<entry>
<key>Timestamp Format</key>
</entry>
<entry>
<key>Pretty Print JSON</key>
<value>true</value>
</entry>
<entry>
<key>suppress-nulls</key>
</entry>
<entry>
<key>output-grouping</key>
</entry>
</properties>
<state>ENABLED</state>
<type>org.apache.nifi.json.JsonRecordSetWriter</type>
</controllerServices>
<controllerServices>
<id>7bf56da8-f3fe-3a87-0000-000000000000</id>
<parentGroupId>67d06e54-91e0-375d-0000-000000000000</parentGroupId>
<bundle>
<artifact>nifi-record-serialization-services-nar</artifact>
<group>org.apache.nifi</group>
<version>1.8.0</version>
</bundle>
<comments></comments>
<descriptors>
<entry>
<key>schema-access-strategy</key>
<value>
<name>schema-access-strategy</name>
</value>
</entry>
<entry>
<key>schema-registry</key>
<value>
<identifiesControllerService>org.apache.nifi.schemaregistry.services.SchemaRegistry</identifiesControllerService>
<name>schema-registry</name>
</value>
</entry>
<entry>
<key>schema-name</key>
<value>
<name>schema-name</name>
</value>
</entry>
<entry>
<key>schema-version</key>
<value>
<name>schema-version</name>
</value>
</entry>
<entry>
<key>schema-branch</key>
<value>
<name>schema-branch</name>
</value>
</entry>
<entry>
<key>schema-text</key>
<value>
<name>schema-text</name>
</value>
</entry>
<entry>
<key>csv-reader-csv-parser</key>
<value>
<name>csv-reader-csv-parser</name>
</value>
</entry>
<entry>
<key>Date Format</key>
<value>
<name>Date Format</name>
</value>
</entry>
<entry>
<key>Time Format</key>
<value>
<name>Time Format</name>
</value>
</entry>
<entry>
<key>Timestamp Format</key>
<value>
<name>Timestamp Format</name>
</value>
</entry>
<entry>
<key>CSV Format</key>
<value>
<name>CSV Format</name>
</value>
</entry>
<entry>
<key>Value Separator</key>
<value>
<name>Value Separator</name>
</value>
</entry>
<entry>
<key>Skip Header Line</key>
<value>
<name>Skip Header Line</name>
</value>
</entry>
<entry>
<key>ignore-csv-header</key>
<value>
<name>ignore-csv-header</name>
</value>
</entry>
<entry>
<key>Quote Character</key>
<value>
<name>Quote Character</name>
</value>
</entry>
<entry>
<key>Escape Character</key>
<value>
<name>Escape Character</name>
</value>
</entry>
<entry>
<key>Comment Marker</key>
<value>
<name>Comment Marker</name>
</value>
</entry>
<entry>
<key>Null String</key>
<value>
<name>Null String</name>
</value>
</entry>
<entry>
<key>Trim Fields</key>
<value>
<name>Trim Fields</name>
</value>
</entry>
<entry>
<key>csvutils-character-set</key>
<value>
<name>csvutils-character-set</name>
</value>
</entry>
</descriptors>
<name>CSVReader</name>
<persistsState>false</persistsState>
<properties>
<entry>
<key>schema-access-strategy</key>
<value>schema-name</value>
</entry>
<entry>
<key>schema-registry</key>
<value>07c16d6d-0fd5-3747-0000-000000000000</value>
</entry>
<entry>
<key>schema-name</key>
</entry>
<entry>
<key>schema-version</key>
</entry>
<entry>
<key>schema-branch</key>
</entry>
<entry>
<key>schema-text</key>
</entry>
<entry>
<key>csv-reader-csv-parser</key>
</entry>
<entry>
<key>Date Format</key>
</entry>
<entry>
<key>Time Format</key>
</entry>
<entry>
<key>Timestamp Format</key>
</entry>
<entry>
<key>CSV Format</key>
</entry>
<entry>
<key>Value Separator</key>
</entry>
<entry>
<key>Skip Header Line</key>
<value>true</value>
</entry>
<entry>
<key>ignore-csv-header</key>
</entry>
<entry>
<key>Quote Character</key>
</entry>
<entry>
<key>Escape Character</key>
</entry>
<entry>
<key>Comment Marker</key>
</entry>
<entry>
<key>Null String</key>
</entry>
<entry>
<key>Trim Fields</key>
</entry>
<entry>
<key>csvutils-character-set</key>
</entry>
</properties>
<state>ENABLED</state>
<type>org.apache.nifi.csv.CSVReader</type>
</controllerServices>
<processors>
<id>9bbb9135-7f5e-3827-0000-000000000000</id>
<parentGroupId>67d06e54-91e0-375d-0000-000000000000</parentGroupId>
<position>
<x>527.7405949668466</x>
<y>1745.7973677987322</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.8.0</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>File Size</key>
<value>
<name>File Size</name>
</value>
</entry>
<entry>
<key>Batch Size</key>
<value>
<name>Batch Size</name>
</value>
</entry>
<entry>
<key>Data Format</key>
<value>
<name>Data Format</name>
</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>
<name>Unique FlowFiles</name>
</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>
<name>generate-ff-custom-text</name>
</value>
</entry>
<entry>
<key>character-set</key>
<value>
<name>character-set</name>
</value>
</entry>
<entry>
<key>schema.name</key>
<value>
<name>schema.name</name>
</value>
</entry>
<entry>
<key>schema.out.name</key>
<value>
<name>schema.out.name</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>File Size</key>
<value>0B</value>
</entry>
<entry>
<key>Batch Size</key>
<value>1</value>
</entry>
<entry>
<key>Data Format</key>
<value>Text</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>false</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>name,email
"Test User","[email protected]"</value>
</entry>
<entry>
<key>character-set</key>
<value>UTF-8</value>
</entry>
<entry>
<key>schema.name</key>
<value>user_in</value>
</entry>
<entry>
<key>schema.out.name</key>
<value>user_out</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>5 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>GenerateFlowFile</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style/>
<type>org.apache.nifi.processors.standard.GenerateFlowFile</type>
</processors>
<processors>
<id>f234ff6b-609d-31f6-0000-000000000000</id>
<parentGroupId>67d06e54-91e0-375d-0000-000000000000</parentGroupId>
<position>
<x>578.7405949668464</x>
<y>2050.797367798732</y>
</position>
<bundle>
<artifact>nifi-jolt-record-nar</artifact>
<group>org.apache.nifi</group>
<version>1.8.0</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>jolt-record-record-reader</key>
<value>
<identifiesControllerService>org.apache.nifi.serialization.RecordReaderFactory</identifiesControllerService>
<name>jolt-record-record-reader</name>
</value>
</entry>
<entry>
<key>jolt-record-record-writer</key>
<value>
<identifiesControllerService>org.apache.nifi.serialization.RecordSetWriterFactory</identifiesControllerService>
<name>jolt-record-record-writer</name>
</value>
</entry>
<entry>
<key>jolt-record-transform</key>
<value>
<name>jolt-record-transform</name>
</value>
</entry>
<entry>
<key>jolt-record-custom-class</key>
<value>
<name>jolt-record-custom-class</name>
</value>
</entry>
<entry>
<key>jolt-record-custom-modules</key>
<value>
<name>jolt-record-custom-modules</name>
</value>
</entry>
<entry>
<key>jolt-record-spec</key>
<value>
<name>jolt-record-spec</name>
</value>
</entry>
<entry>
<key>jolt-record-transform-cache-size</key>
<value>
<name>jolt-record-transform-cache-size</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>jolt-record-record-reader</key>
<value>7bf56da8-f3fe-3a87-0000-000000000000</value>
</entry>
<entry>
<key>jolt-record-record-writer</key>
<value>72c32ce9-48ab-3b01-0000-000000000000</value>
</entry>
<entry>
<key>jolt-record-transform</key>
<value>jolt-transform-chain</value>
</entry>
<entry>
<key>jolt-record-custom-class</key>
</entry>
<entry>
<key>jolt-record-custom-modules</key>
</entry>
<entry>
<key>jolt-record-spec</key>
<value>[
{
"operation": "shift",
"spec": {
"*": "user.&"
}
}
] </value>
</entry>
<entry>
<key>jolt-record-transform-cache-size</key>
<value>1</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>JoltTransformRecord</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>original</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style/>
<type>org.apache.nifi.processors.jolt.record.JoltTransformRecord</type>
</processors>
<processors>
<id>f40254d0-bcc2-392a-0000-000000000000</id>
<parentGroupId>67d06e54-91e0-375d-0000-000000000000</parentGroupId>
<position>
<x>1267.7405339316902</x>
<y>2079.797367798732</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.8.0</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Log Level</key>
<value>
<name>Log Level</name>
</value>
</entry>
<entry>
<key>Log Payload</key>
<value>
<name>Log Payload</name>
</value>
</entry>
<entry>
<key>Attributes to Log</key>
<value>
<name>Attributes to Log</name>
</value>
</entry>
<entry>
<key>attributes-to-log-regex</key>
<value>
<name>attributes-to-log-regex</name>
</value>
</entry>
<entry>
<key>Attributes to Ignore</key>
<value>
<name>Attributes to Ignore</name>
</value>
</entry>
<entry>
<key>attributes-to-ignore-regex</key>
<value>
<name>attributes-to-ignore-regex</name>
</value>
</entry>
<entry>
<key>Log prefix</key>
<value>
<name>Log prefix</name>
</value>
</entry>
<entry>
<key>character-set</key>
<value>
<name>character-set</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Log Level</key>
<value>info</value>
</entry>
<entry>
<key>Log Payload</key>
<value>false</value>
</entry>
<entry>
<key>Attributes to Log</key>
</entry>
<entry>
<key>attributes-to-log-regex</key>
<value>.*</value>
</entry>
<entry>
<key>Attributes to Ignore</key>
</entry>
<entry>
<key>attributes-to-ignore-regex</key>
</entry>
<entry>
<key>Log prefix</key>
</entry>
<entry>
<key>character-set</key>
<value>UTF-8</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>LogAttribute</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>STOPPED</state>
<style/>
<type>org.apache.nifi.processors.standard.LogAttribute</type>
</processors>
</contents>
<name>Jolt Transform</name>
<variables/>
</processGroups>
</snippet>
<timestamp>12/03/2018 15:48:50 EST</timestamp>
</template>