Hi Bryan, Thanks for the tip, that was very helpful, and helped me finish. I wanted essentially your flow with a schema inference and no schema registry. The flow takes an arbitrary (no pre-defined schema other than being an array) JSON array and chunks it out. I attached the final result for anyone who's curious.
Neil On Thu, Aug 31, 2017 at 9:28 AM, Bryan Bende <bbe...@gmail.com> wrote: > Neil, > > I'm a little confused as to what format your initial data is in... You > showed an example payload as JSON, but then mentioned using an > AvroReader, so it wasn't clear to me if your starting point is JSON or > Avro. > > Assuming it is JSON, I put together a template that shows how to split > your sample data: > > https://gist.github.com/bbende/f73d06c0d35ed1aeb2603a8f87276ed7 > > I used the second schema you have (the one where the top-level element > is a record) and then SplitRecord with a JsonTreeReader and > JsonRecordSetWriter. > > The sample data I sent in was your example data, and it produced two > flow files coming out of SplitRecord, one for each element of the > array. > > Let us know if this not what you are trying to do. > > Thanks, > > Bryan > > > On Wed, Aug 30, 2017 at 8:31 PM, Neil Derraugh > <neil.derra...@intellifylearning.com> wrote: > > I should have mentioned I tried starting with a JsonPathReader before the > > AvroReader. I had a property I was calling root with a value of $. I > can > > post details about that too if it would be helpful. > > > > On Wed, Aug 30, 2017 at 8:08 PM, Neil Derraugh > > <neil.derra...@intellifylearning.com> wrote: > >> > >> I have arbitrary JSON arrays that I want to split into chunks. I've > been > >> (unsuccessfully) trying to figure this out with InferAvroSchema -> > >> SplitJson(AvroReader, JsonRecordSetWriter). > >> > >> Here's an example payload: > >> [{ > >> "id": "56740f4b-48de-0502-afdc-59a463b3f6dc", > >> "account_id": "b0dad7e2-7bb9-4ca9-b9fd-134870656eb2", > >> "contact_id": "a0ebd53a-77c5-e2ea-4787-59a463053b1b", > >> "date_modified": 1503959931000, > >> "deleted": 0 > >> }, > >> { > >> "id": "1ac80e25-7f28-f5c6-bac0-59a4636ef31f", > >> "account_id": "71d4904e-f8f1-4209-bff9-4d080057ea84", > >> "contact_id": "e429bfe6-9c89-8b81-9ee6-59a463fc7fd8", > >> "date_modified": 1503959873000, > >> "deleted": 0 > >> }] > >> > >> Here's the schema that gets inferred (the AvroReader's Avro Record Name > is > >> "root"): > >> { > >> "type": "array", > >> "items": { > >> "type": "record", > >> "name": "root", > >> "fields": [ > >> { > >> "name": "id", > >> "type": "string", > >> "doc": "Type inferred from > >> '\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'" > >> }, > >> { > >> "name": "account_id", > >> "type": "string", > >> "doc": "Type inferred from > >> '\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'" > >> }, > >> { > >> "name": "contact_id", > >> "type": "string", > >> "doc": "Type inferred from > >> '\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'" > >> }, > >> { > >> "name": "date_modified", > >> "type": "long", > >> "doc": "Type inferred from '1503959931000'" > >> }, > >> { > >> "name": "deleted", > >> "type": "int", > >> "doc": "Type inferred from '0'" > >> } > >> ] > >> } > >> } > >> > >> When I use ${inferred.avro.schema} for both the AvroReader and the > >> JsonRecordSetWriter I get: > >> SplitRecord[id=b3453515-caaa-1e1f-8bb6-26dec275a0d5] Failed to create > >> Record Writer for > >> StandardFlowFileRecord[uuid=45d7a0d2-258a-4f40-b5f9-4886eb2c2a76,claim= > StandardContentClaim > >> [resourceClaim=StandardResourceClaim[id=1504118228480-325, > >> container=default, section=325], offset=0, > >> length=86462199],offset=0,name=accounts-contacts.json. > avro,size=86462199]; > >> routing to failure: org.apache.nifi.schema.access. > SchemaNotFoundException: > >> org.apache.avro.AvroRuntimeException: Not a record: > >> {"type":"array","items":{"type":"record","name":"root"," > fields":[{"name":"id","type":"string","doc":"Type > >> inferred from > >> '\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"},{"name":" > account_id","type":"string","doc":"Type > >> inferred from > >> '\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"},{"name":" > contact_id","type":"string","doc":"Type > >> inferred from > >> '\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"},{"name":" > date_modified","type":"long","doc":"Type > >> inferred from '1503959931000'"},{"name":"deleted","type":"int","doc":" > Type > >> inferred from '0'"}]}}. > >> > >> The stack trace: > >> 2017-08-30 19:42:21,692 ERROR [Timer-Driven Process Thread-9] > >> o.a.nifi.processors.standard.SplitRecord > >> SplitRecord[id=b3453515-caaa-1e1f-8bb6-26dec275a0d5] Failed to create > Record > >> Writer for > >> StandardFlowFileRecord[uuid=a5f720cf-98a8-4c29-bd91-098c7f25448d,claim= > StandardContentClaim > >> [resourceClaim=StandardResourceClaim[id=1504121074997-336, > >> container=default, section=336], offset=1013917, > >> length=454],offset=0,name=626851422080935,size=454]; routing to > failure: > >> org.apache.nifi.schema.access.SchemaNotFoundException: > >> org.apache.avro.AvroRuntimeException: Not a record: > >> {"type":"array","items":{"type":"record","name":"root"," > fields":[{"name":"id","type":"string","doc":"Type > >> inferred from > >> '\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"},{"name":" > account_id","type":"string","doc":"Type > >> inferred from > >> '\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"},{"name":" > contact_id","type":"string","doc":"Type > >> inferred from > >> '\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"},{"name":" > date_modified","type":"long","doc":"Type > >> inferred from '1503959931000'"},{"name":"deleted","type":"int","doc":" > Type > >> inferred from '0'"}]}} > >> org.apache.nifi.schema.access.SchemaNotFoundException: > >> org.apache.avro.AvroRuntimeException: Not a record: > >> {"type":"array","items":{"type":"record","name":"root"," > fields":[{"name":"id","type":"string","doc":"Type > >> inferred from > >> '\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"},{"name":" > account_id","type":"string","doc":"Type > >> inferred from > >> '\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"},{"name":" > contact_id","type":"string","doc":"Type > >> inferred from > >> '\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"},{"name":" > date_modified","type":"long","doc":"Type > >> inferred from '1503959931000'"},{"name":"deleted","type":"int","doc":" > Type > >> inferred from '0'"}]}} > >> at > >> org.apache.nifi.schema.access.AvroSchemaTextStrategy.getSchema( > AvroSchemaTextStrategy.java:55) > >> at > >> org.apache.nifi.serialization.SchemaRegistryService.getSchema( > SchemaRegistryService.java:112) > >> at sun.reflect.GeneratedMethodAccessor1466.invoke(Unknown Source) > >> at > >> sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > >> at java.lang.reflect.Method.invoke(Method.java:498) > >> at > >> org.apache.nifi.controller.service.StandardControllerServiceInvoc > ationHandler.invoke(StandardControllerServiceInvocationHandler.java:89) > >> at com.sun.proxy.$Proxy144.getSchema(Unknown Source) > >> at > >> org.apache.nifi.processors.standard.SplitRecord. > onTrigger(SplitRecord.java:138) > >> at > >> org.apache.nifi.processor.AbstractProcessor.onTrigger( > AbstractProcessor.java:27) > >> at > >> org.apache.nifi.controller.StandardProcessorNode.onTrigger( > StandardProcessorNode.java:1120) > >> at > >> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call( > ContinuallyRunProcessorTask.java:147) > >> at > >> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call( > ContinuallyRunProcessorTask.java:47) > >> at > >> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run( > TimerDrivenSchedulingAgent.java:132) > >> at java.util.concurrent.Executors$RunnableAdapter. > call(Executors.java:511) > >> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > >> at > >> java.util.concurrent.ScheduledThreadPoolExecutor$ > ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > >> at > >> java.util.concurrent.ScheduledThreadPoolExecutor$ > ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > >> at > >> java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > >> at > >> java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > >> at java.lang.Thread.run(Thread.java:748) > >> Caused by: org.apache.avro.AvroRuntimeException: Not a record: > >> {"type":"array","items":{"type":"record","name":"root"," > fields":[{"name":"id","type":"string","doc":"Type > >> inferred from > >> '\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"},{"name":" > account_id","type":"string","doc":"Type > >> inferred from > >> '\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"},{"name":" > contact_id","type":"string","doc":"Type > >> inferred from > >> '\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"},{"name":" > date_modified","type":"long","doc":"Type > >> inferred from '1503959931000'"},{"name":"deleted","type":"int","doc":" > Type > >> inferred from '0'"}]}} > >> at org.apache.avro.Schema.getFields(Schema.java:220) > >> at org.apache.nifi.avro.AvroTypeUtil.createSchema( > AvroTypeUtil.java:218) > >> at org.apache.nifi.avro.AvroTypeUtil.createSchema( > AvroTypeUtil.java:202) > >> at > >> org.apache.nifi.schema.access.AvroSchemaTextStrategy.getSchema( > AvroSchemaTextStrategy.java:53) > >> ... 19 common frames omitted > >> > >> Which looks like it's coming from the Writer, and is maybe about the > root > >> element being an array as opposed to a record. So I hardcoded the > schema in > >> the JsonRecordSetWriter to be just the record like this: > >> { > >> "type": "record", > >> "name": "root", > >> "fields": [ > >> { > >> "name": "id", > >> "type": "string", > >> "doc": "Type inferred from > >> '\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'" > >> }, > >> { > >> "name": "account_id", > >> "type": "string", > >> "doc": "Type inferred from > >> '\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'" > >> }, > >> { > >> "name": "contact_id", > >> "type": "string", > >> "doc": "Type inferred from > >> '\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'" > >> }, > >> { > >> "name": "date_modified", > >> "type": "long", > >> "doc": "Type inferred from '1503959931000'" > >> }, > >> { > >> "name": "deleted", > >> "type": "int", > >> "doc": "Type inferred from '0'" > >> } > >> ] > >> } > >> Which gave me: > >> SplitRecord[id=b3453515-caaa-1e1f-8bb6-26dec275a0d5] Failed to split > >> StandardFlowFileRecord[uuid=e2098e1b-f2f8-4ca8-926f-6d0e3643ce45,claim= > StandardContentClaim > >> [resourceClaim=StandardResourceClaim[id=1504118228480-325, > >> container=default, section=325], offset=0, > >> length=86462199],offset=0,name=accounts-contacts.json. > avro,size=86462199]: > >> org.apache.nifi.processor.exception.ProcessException: Failed to parse > >> incoming data. > >> > >> The stack trace: > >> 2017-08-30 19:31:21,690 ERROR [Timer-Driven Process Thread-3] > >> o.a.nifi.processors.standard.SplitRecord > >> SplitRecord[id=b3453515-caaa-1e1f-8bb6-26dec275a0d5] Failed to split > >> StandardFlowFileRecord[uuid=41f02871-e5e8-496e-a671-bc3200c4bf8e,claim= > StandardContentClaim > >> [resourceClaim=StandardResourceClaim[id=1504121074874-335, > >> container=default, section=335], offset=1028220, > >> length=454],offset=0,name=626191417134736,size=454]: > >> org.apache.nifi.processor.exception.ProcessException: Failed to parse > >> incoming data > >> org.apache.nifi.processor.exception.ProcessException: Failed to parse > >> incoming data > >> at > >> org.apache.nifi.processors.standard.SplitRecord$1. > process(SplitRecord.java:187) > >> at > >> org.apache.nifi.controller.repository.StandardProcessSession.read( > StandardProcessSession.java:2136) > >> at > >> org.apache.nifi.controller.repository.StandardProcessSession.read( > StandardProcessSession.java:2106) > >> at > >> org.apache.nifi.processors.standard.SplitRecord. > onTrigger(SplitRecord.java:149) > >> at > >> org.apache.nifi.processor.AbstractProcessor.onTrigger( > AbstractProcessor.java:27) > >> at > >> org.apache.nifi.controller.StandardProcessorNode.onTrigger( > StandardProcessorNode.java:1120) > >> at > >> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call( > ContinuallyRunProcessorTask.java:147) > >> at > >> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call( > ContinuallyRunProcessorTask.java:47) > >> at > >> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run( > TimerDrivenSchedulingAgent.java:132) > >> at java.util.concurrent.Executors$RunnableAdapter. > call(Executors.java:511) > >> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > >> at > >> java.util.concurrent.ScheduledThreadPoolExecutor$ > ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > >> at > >> java.util.concurrent.ScheduledThreadPoolExecutor$ > ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > >> at > >> java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > >> at > >> java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > >> at java.lang.Thread.run(Thread.java:748) > >> Caused by: org.apache.nifi.schema.access.SchemaNotFoundException: > >> org.apache.avro.AvroRuntimeException: Not a record: > >> {"type":"array","items":{"type":"record","name":"root"," > fields":[{"name":"id","type":"string","doc":"Type > >> inferred from > >> '\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"},{"name":" > account_id","type":"string","doc":"Type > >> inferred from > >> '\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"},{"name":" > contact_id","type":"string","doc":"Type > >> inferred from > >> '\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"},{"name":" > date_modified","type":"long","doc":"Type > >> inferred from '1503959931000'"},{"name":"deleted","type":"int","doc":" > Type > >> inferred from '0'"}]}} > >> at > >> org.apache.nifi.schema.access.AvroSchemaTextStrategy.getSchema( > AvroSchemaTextStrategy.java:55) > >> at > >> org.apache.nifi.serialization.SchemaRegistryService.getSchema( > SchemaRegistryService.java:112) > >> at org.apache.nifi.avro.AvroReader.createRecordReader( > AvroReader.java:92) > >> at sun.reflect.GeneratedMethodAccessor1467.invoke(Unknown Source) > >> at > >> sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > >> at java.lang.reflect.Method.invoke(Method.java:498) > >> at > >> org.apache.nifi.controller.service.StandardControllerServiceInvoc > ationHandler.invoke(StandardControllerServiceInvocationHandler.java:89) > >> at com.sun.proxy.$Proxy158.createRecordReader(Unknown Source) > >> at > >> org.apache.nifi.processors.standard.SplitRecord$1. > process(SplitRecord.java:152) > >> ... 15 common frames omitted > >> Caused by: org.apache.avro.AvroRuntimeException: Not a record: > >> {"type":"array","items":{"type":"record","name":"root"," > fields":[{"name":"id","type":"string","doc":"Type > >> inferred from > >> '\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"},{"name":" > account_id","type":"string","doc":"Type > >> inferred from > >> '\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"},{"name":" > contact_id","type":"string","doc":"Type > >> inferred from > >> '\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"},{"name":" > date_modified","type":"long","doc":"Type > >> inferred from '1503959931000'"},{"name":"deleted","type":"int","doc":" > Type > >> inferred from '0'"}]}} > >> at org.apache.avro.Schema.getFields(Schema.java:220) > >> at org.apache.nifi.avro.AvroTypeUtil.createSchema( > AvroTypeUtil.java:218) > >> at org.apache.nifi.avro.AvroTypeUtil.createSchema( > AvroTypeUtil.java:202) > >> at > >> org.apache.nifi.schema.access.AvroSchemaTextStrategy.getSchema( > AvroSchemaTextStrategy.java:53) > >> ... 23 common frames omitted > >> > >> Can somebody point me to what I'm doing wrong? Or suggest an > alternative > >> approach to chunking arbitrary JSON arrays? > >> > >> Thanks, > >> Neil > > > > >
<?xml version="1.0" ?> <template encoding-version="1.1"> <description></description> <groupId>102f3353-015e-1000-78d0-043a14fedd03</groupId> <name>inferred_record_split</name> <snippet> <connections> <id>8c6ff019-297d-395e-0000-000000000000</id> <parentGroupId>8d16826d-ce84-362e-0000-000000000000</parentGroupId> <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold> <backPressureObjectThreshold>10000</backPressureObjectThreshold> <destination> <groupId>8d16826d-ce84-362e-0000-000000000000</groupId> <id>9bdc7b0b-db26-374a-0000-000000000000</id> <type>PROCESSOR</type> </destination> <flowFileExpiration>0 sec</flowFileExpiration> <labelIndex>1</labelIndex> <name></name> <selectedRelationships>success</selectedRelationships> <source> <groupId>8d16826d-ce84-362e-0000-000000000000</groupId> <id>93dc502a-0071-3183-0000-000000000000</id> <type>PROCESSOR</type> </source> <zIndex>0</zIndex> </connections> <connections> <id>ce151ab2-095c-360e-0000-000000000000</id> <parentGroupId>8d16826d-ce84-362e-0000-000000000000</parentGroupId> <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold> <backPressureObjectThreshold>10000</backPressureObjectThreshold> <destination> <groupId>8d16826d-ce84-362e-0000-000000000000</groupId> <id>90e4bbe0-4477-34e1-0000-000000000000</id> <type>PROCESSOR</type> </destination> <flowFileExpiration>0 sec</flowFileExpiration> <labelIndex>1</labelIndex> <name></name> <selectedRelationships>splits</selectedRelationships> <source> <groupId>8d16826d-ce84-362e-0000-000000000000</groupId> <id>9bdc7b0b-db26-374a-0000-000000000000</id> <type>PROCESSOR</type> </source> <zIndex>0</zIndex> </connections> <connections> <id>d4d9f32b-259a-3c26-0000-000000000000</id> <parentGroupId>8d16826d-ce84-362e-0000-000000000000</parentGroupId> <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold> <backPressureObjectThreshold>10000</backPressureObjectThreshold> <destination> <groupId>8d16826d-ce84-362e-0000-000000000000</groupId> <id>93dc502a-0071-3183-0000-000000000000</id> <type>PROCESSOR</type> </destination> <flowFileExpiration>0 sec</flowFileExpiration> <labelIndex>1</labelIndex> <name></name> <selectedRelationships>success</selectedRelationships> <source> <groupId>8d16826d-ce84-362e-0000-000000000000</groupId> <id>3ffeeea3-a0cc-386a-0000-000000000000</id> <type>PROCESSOR</type> </source> <zIndex>0</zIndex> </connections> <connections> <id>297e1ac2-407e-3ecd-0000-000000000000</id> <parentGroupId>8d16826d-ce84-362e-0000-000000000000</parentGroupId> <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold> <backPressureObjectThreshold>10000</backPressureObjectThreshold> <destination> <groupId>8d16826d-ce84-362e-0000-000000000000</groupId> <id>3ffeeea3-a0cc-386a-0000-000000000000</id> <type>PROCESSOR</type> </destination> <flowFileExpiration>0 sec</flowFileExpiration> <labelIndex>1</labelIndex> <name></name> <selectedRelationships>success</selectedRelationships> <source> <groupId>8d16826d-ce84-362e-0000-000000000000</groupId> <id>1c12157b-c44e-3223-0000-000000000000</id> <type>PROCESSOR</type> </source> <zIndex>0</zIndex> </connections> <controllerServices> <id>de57752d-8a16-311c-0000-000000000000</id> <parentGroupId>8d16826d-ce84-362e-0000-000000000000</parentGroupId> <bundle> <artifact>nifi-record-serialization-services-nar</artifact> <group>org.apache.nifi</group> <version>1.3.0</version> </bundle> <comments></comments> <descriptors> <entry> <key>schema-access-strategy</key> <value> <name>schema-access-strategy</name> </value> </entry> <entry> <key>schema-registry</key> <value> <identifiesControllerService>org.apache.nifi.schemaregistry.services.SchemaRegistry</identifiesControllerService> <name>schema-registry</name> </value> </entry> <entry> <key>schema-name</key> <value> <name>schema-name</name> </value> </entry> <entry> <key>schema-text</key> <value> <name>schema-text</name> </value> </entry> <entry> <key>Date Format</key> <value> <name>Date Format</name> </value> </entry> <entry> <key>Time Format</key> <value> <name>Time Format</name> </value> </entry> <entry> <key>Timestamp Format</key> <value> <name>Timestamp Format</name> </value> </entry> </descriptors> <name>JsonTreeReader</name> <persistsState>false</persistsState> <properties> <entry> <key>schema-access-strategy</key> <value>schema-text-property</value> </entry> <entry> <key>schema-registry</key> </entry> <entry> <key>schema-name</key> </entry> <entry> <key>schema-text</key> </entry> <entry> <key>Date Format</key> </entry> <entry> <key>Time Format</key> </entry> <entry> <key>Timestamp Format</key> </entry> </properties> <state>ENABLED</state> <type>org.apache.nifi.json.JsonTreeReader</type> </controllerServices> <controllerServices> <id>f8122fc5-36ce-3753-0000-000000000000</id> <parentGroupId>8d16826d-ce84-362e-0000-000000000000</parentGroupId> <bundle> <artifact>nifi-record-serialization-services-nar</artifact> <group>org.apache.nifi</group> <version>1.3.0</version> </bundle> <comments></comments> <descriptors> <entry> <key>Schema Write Strategy</key> <value> <name>Schema Write Strategy</name> </value> </entry> <entry> <key>schema-access-strategy</key> <value> <name>schema-access-strategy</name> </value> </entry> <entry> <key>schema-registry</key> <value> <identifiesControllerService>org.apache.nifi.schemaregistry.services.SchemaRegistry</identifiesControllerService> <name>schema-registry</name> </value> </entry> <entry> <key>schema-name</key> <value> <name>schema-name</name> </value> </entry> <entry> <key>schema-text</key> <value> <name>schema-text</name> </value> </entry> <entry> <key>Date Format</key> <value> <name>Date Format</name> </value> </entry> <entry> <key>Time Format</key> <value> <name>Time Format</name> </value> </entry> <entry> <key>Timestamp Format</key> <value> <name>Timestamp Format</name> </value> </entry> <entry> <key>Pretty Print JSON</key> <value> <name>Pretty Print JSON</name> </value> </entry> </descriptors> <name>JsonRecordSetWriter</name> <persistsState>false</persistsState> <properties> <entry> <key>Schema Write Strategy</key> <value>full-schema-attribute</value> </entry> <entry> <key>schema-access-strategy</key> <value>schema-text-property</value> </entry> <entry> <key>schema-registry</key> </entry> <entry> <key>schema-name</key> </entry> <entry> <key>schema-text</key> </entry> <entry> <key>Date Format</key> </entry> <entry> <key>Time Format</key> </entry> <entry> <key>Timestamp Format</key> </entry> <entry> <key>Pretty Print JSON</key> </entry> </properties> <state>ENABLED</state> <type>org.apache.nifi.json.JsonRecordSetWriter</type> </controllerServices> <processors> <id>90e4bbe0-4477-34e1-0000-000000000000</id> <parentGroupId>8d16826d-ce84-362e-0000-000000000000</parentGroupId> <position> <x>0.0</x> <y>755.0000076293945</y> </position> <bundle> <artifact>nifi-standard-nar</artifact> <group>org.apache.nifi</group> <version>1.3.0</version> </bundle> <config> <bulletinLevel>WARN</bulletinLevel> <comments></comments> <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount> <descriptors> <entry> <key>Log Level</key> <value> <name>Log Level</name> </value> </entry> <entry> <key>Log Payload</key> <value> <name>Log Payload</name> </value> </entry> <entry> <key>Attributes to Log</key> <value> <name>Attributes to Log</name> </value> </entry> <entry> <key>Attributes to Ignore</key> <value> <name>Attributes to Ignore</name> </value> </entry> <entry> <key>Log prefix</key> <value> <name>Log prefix</name> </value> </entry> </descriptors> <executionNode>ALL</executionNode> <lossTolerant>false</lossTolerant> <penaltyDuration>30 sec</penaltyDuration> <properties> <entry> <key>Log Level</key> <value>info</value> </entry> <entry> <key>Log Payload</key> <value>false</value> </entry> <entry> <key>Attributes to Log</key> </entry> <entry> <key>Attributes to Ignore</key> </entry> <entry> <key>Log prefix</key> </entry> </properties> <runDurationMillis>0</runDurationMillis> <schedulingPeriod>0 sec</schedulingPeriod> <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> <yieldDuration>1 sec</yieldDuration> </config> <name>LogAttribute</name> <relationships> <autoTerminate>true</autoTerminate> <name>success</name> </relationships> <style></style> <type>org.apache.nifi.processors.standard.LogAttribute</type> </processors> <processors> <id>93dc502a-0071-3183-0000-000000000000</id> <parentGroupId>8d16826d-ce84-362e-0000-000000000000</parentGroupId> <position> <x>0.0</x> <y>380.00000762939453</y> </position> <bundle> <artifact>nifi-update-attribute-nar</artifact> <group>org.apache.nifi</group> <version>1.3.0</version> </bundle> <config> <bulletinLevel>WARN</bulletinLevel> <comments></comments> <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount> <descriptors> <entry> <key>Delete Attributes Expression</key> <value> <name>Delete Attributes Expression</name> </value> </entry> <entry> <key>Store State</key> <value> <name>Store State</name> </value> </entry> <entry> <key>Stateful Variables Initial Value</key> <value> <name>Stateful Variables Initial Value</name> </value> </entry> <entry> <key>avro.schema</key> <value> <name>avro.schema</name> </value> </entry> </descriptors> <executionNode>ALL</executionNode> <lossTolerant>false</lossTolerant> <penaltyDuration>30 sec</penaltyDuration> <properties> <entry> <key>Delete Attributes Expression</key> </entry> <entry> <key>Store State</key> <value>Do not store state</value> </entry> <entry> <key>Stateful Variables Initial Value</key> </entry> <entry> <key>avro.schema</key> <value>${inferred.avro.schema:jsonPath('$.items')}</value> </entry> </properties> <runDurationMillis>0</runDurationMillis> <schedulingPeriod>0 sec</schedulingPeriod> <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> <yieldDuration>1 sec</yieldDuration> </config> <name>UpdateAttribute</name> <relationships> <autoTerminate>false</autoTerminate> <name>success</name> </relationships> <style></style> <type>org.apache.nifi.processors.attributes.UpdateAttribute</type> </processors> <processors> <id>9bdc7b0b-db26-374a-0000-000000000000</id> <parentGroupId>8d16826d-ce84-362e-0000-000000000000</parentGroupId> <position> <x>0.0</x> <y>562.9420223370378</y> </position> <bundle> <artifact>nifi-standard-nar</artifact> <group>org.apache.nifi</group> <version>1.3.0</version> </bundle> <config> <bulletinLevel>WARN</bulletinLevel> <comments></comments> <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount> <descriptors> <entry> <key>Record Reader</key> <value> <identifiesControllerService>org.apache.nifi.serialization.RecordReaderFactory</identifiesControllerService> <name>Record Reader</name> </value> </entry> <entry> <key>Record Writer</key> <value> <identifiesControllerService>org.apache.nifi.serialization.RecordSetWriterFactory</identifiesControllerService> <name>Record Writer</name> </value> </entry> <entry> <key>Records Per Split</key> <value> <name>Records Per Split</name> </value> </entry> </descriptors> <executionNode>ALL</executionNode> <lossTolerant>false</lossTolerant> <penaltyDuration>30 sec</penaltyDuration> <properties> <entry> <key>Record Reader</key> <value>de57752d-8a16-311c-0000-000000000000</value> </entry> <entry> <key>Record Writer</key> <value>f8122fc5-36ce-3753-0000-000000000000</value> </entry> <entry> <key>Records Per Split</key> <value>1</value> </entry> </properties> <runDurationMillis>0</runDurationMillis> <schedulingPeriod>0 sec</schedulingPeriod> <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> <yieldDuration>1 sec</yieldDuration> </config> <name>SplitRecord</name> <relationships> <autoTerminate>true</autoTerminate> <name>failure</name> </relationships> <relationships> <autoTerminate>true</autoTerminate> <name>original</name> </relationships> <relationships> <autoTerminate>false</autoTerminate> <name>splits</name> </relationships> <style></style> <type>org.apache.nifi.processors.standard.SplitRecord</type> </processors> <processors> <id>1c12157b-c44e-3223-0000-000000000000</id> <parentGroupId>8d16826d-ce84-362e-0000-000000000000</parentGroupId> <position> <x>0.0</x> <y>0.0</y> </position> <bundle> <artifact>nifi-standard-nar</artifact> <group>org.apache.nifi</group> <version>1.3.0</version> </bundle> <config> <bulletinLevel>WARN</bulletinLevel> <comments></comments> <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount> <descriptors> <entry> <key>File Size</key> <value> <name>File Size</name> </value> </entry> <entry> <key>Batch Size</key> <value> <name>Batch Size</name> </value> </entry> <entry> <key>Data Format</key> <value> <name>Data Format</name> </value> </entry> <entry> <key>Unique FlowFiles</key> <value> <name>Unique FlowFiles</name> </value> </entry> <entry> <key>generate-ff-custom-text</key> <value> <name>generate-ff-custom-text</name> </value> </entry> <entry> <key>schema.name</key> <value> <name>schema.name</name> </value> </entry> </descriptors> <executionNode>PRIMARY</executionNode> <lossTolerant>false</lossTolerant> <penaltyDuration>30 sec</penaltyDuration> <properties> <entry> <key>File Size</key> <value>0B</value> </entry> <entry> <key>Batch Size</key> <value>1</value> </entry> <entry> <key>Data Format</key> <value>Text</value> </entry> <entry> <key>Unique FlowFiles</key> <value>false</value> </entry> <entry> <key>generate-ff-custom-text</key> <value>[{ "id": "56740f4b-48de-0502-afdc-59a463b3f6dc", "account_id": "b0dad7e2-7bb9-4ca9-b9fd-134870656eb2", "contact_id": "a0ebd53a-77c5-e2ea-4787-59a463053b1b", "date_modified": 1503959931000, "deleted": 0 }, { "id": "1ac80e25-7f28-f5c6-bac0-59a4636ef31f", "account_id": "71d4904e-f8f1-4209-bff9-4d080057ea84", "contact_id": "e429bfe6-9c89-8b81-9ee6-59a463fc7fd8", "date_modified": 1503959873000, "deleted": 0 }] </value> </entry> <entry> <key>schema.name</key> <value>root</value> </entry> </properties> <runDurationMillis>0</runDurationMillis> <schedulingPeriod>10 sec</schedulingPeriod> <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> <yieldDuration>1 sec</yieldDuration> </config> <name>GenerateFlowFile</name> <relationships> <autoTerminate>false</autoTerminate> <name>success</name> </relationships> <style></style> <type>org.apache.nifi.processors.standard.GenerateFlowFile</type> </processors> <processors> <id>3ffeeea3-a0cc-386a-0000-000000000000</id> <parentGroupId>8d16826d-ce84-362e-0000-000000000000</parentGroupId> <position> <x>0.0</x> <y>185.0</y> </position> <bundle> <artifact>nifi-kite-nar</artifact> <group>org.apache.nifi</group> <version>1.3.0</version> </bundle> <config> <bulletinLevel>WARN</bulletinLevel> <comments></comments> <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount> <descriptors> <entry> <key>Schema Output Destination</key> <value> <name>Schema Output Destination</name> </value> </entry> <entry> <key>Input Content Type</key> <value> <name>Input Content Type</name> </value> </entry> <entry> <key>CSV Header Definition</key> <value> <name>CSV Header Definition</name> </value> </entry> <entry> <key>Get CSV Header Definition From Data</key> <value> <name>Get CSV Header Definition From Data</name> </value> </entry> <entry> <key>CSV Header Line Skip Count</key> <value> <name>CSV Header Line Skip Count</name> </value> </entry> <entry> <key>CSV delimiter</key> <value> <name>CSV delimiter</name> </value> </entry> <entry> <key>CSV Escape String</key> <value> <name>CSV Escape String</name> </value> </entry> <entry> <key>CSV Quote String</key> <value> <name>CSV Quote String</name> </value> </entry> <entry> <key>Pretty Avro Output</key> <value> <name>Pretty Avro Output</name> </value> </entry> <entry> <key>Avro Record Name</key> <value> <name>Avro Record Name</name> </value> </entry> <entry> <key>Number Of Records To Analyze</key> <value> <name>Number Of Records To Analyze</name> </value> </entry> <entry> <key>Charset</key> <value> <name>Charset</name> </value> </entry> </descriptors> <executionNode>ALL</executionNode> <lossTolerant>false</lossTolerant> <penaltyDuration>30 sec</penaltyDuration> <properties> <entry> <key>Schema Output Destination</key> <value>flowfile-attribute</value> </entry> <entry> <key>Input Content Type</key> <value>json</value> </entry> <entry> <key>CSV Header Definition</key> </entry> <entry> <key>Get CSV Header Definition From Data</key> <value>true</value> </entry> <entry> <key>CSV Header Line Skip Count</key> <value>0</value> </entry> <entry> <key>CSV delimiter</key> <value>,</value> </entry> <entry> <key>CSV Escape String</key> <value>\</value> </entry> <entry> <key>CSV Quote String</key> <value>'</value> </entry> <entry> <key>Pretty Avro Output</key> <value>true</value> </entry> <entry> <key>Avro Record Name</key> <value>root</value> </entry> <entry> <key>Number Of Records To Analyze</key> <value>10</value> </entry> <entry> <key>Charset</key> <value>UTF-8</value> </entry> </properties> <runDurationMillis>0</runDurationMillis> <schedulingPeriod>0 sec</schedulingPeriod> <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> <yieldDuration>1 sec</yieldDuration> </config> <name>InferAvroSchema</name> <relationships> <autoTerminate>true</autoTerminate> <name>failure</name> </relationships> <relationships> <autoTerminate>true</autoTerminate> <name>original</name> </relationships> <relationships> <autoTerminate>false</autoTerminate> <name>success</name> </relationships> <relationships> <autoTerminate>true</autoTerminate> <name>unsupported content</name> </relationships> <style></style> <type>org.apache.nifi.processors.kite.InferAvroSchema</type> </processors> </snippet> <timestamp>09/01/2017 13:22:38 EDT</timestamp> </template>