Hi Bryan,
Thanks for the tip, that was very helpful, and helped me finish. I wanted
essentially your flow with a schema inference and no schema registry. The
flow takes an arbitrary (no pre-defined schema other than being an array)
JSON array and chunks it out. I attached the final result for anyone who's
curious.
Neil
On Thu, Aug 31, 2017 at 9:28 AM, Bryan Bende <[email protected]> wrote:
> Neil,
>
> I'm a little confused as to what format your initial data is in... You
> showed an example payload as JSON, but then mentioned using an
> AvroReader, so it wasn't clear to me if your starting point is JSON or
> Avro.
>
> Assuming it is JSON, I put together a template that shows how to split
> your sample data:
>
> https://gist.github.com/bbende/f73d06c0d35ed1aeb2603a8f87276ed7
>
> I used the second schema you have (the one where the top-level element
> is a record) and then SplitRecord with a JsonTreeReader and
> JsonRecordSetWriter.
>
> The sample data I sent in was your example data, and it produced two
> flow files coming out of SplitRecord, one for each element of the
> array.
>
> Let us know if this not what you are trying to do.
>
> Thanks,
>
> Bryan
>
>
> On Wed, Aug 30, 2017 at 8:31 PM, Neil Derraugh
> <[email protected]> wrote:
> > I should have mentioned I tried starting with a JsonPathReader before the
> > AvroReader. I had a property I was calling root with a value of $. I
> can
> > post details about that too if it would be helpful.
> >
> > On Wed, Aug 30, 2017 at 8:08 PM, Neil Derraugh
> > <[email protected]> wrote:
> >>
> >> I have arbitrary JSON arrays that I want to split into chunks. I've
> been
> >> (unsuccessfully) trying to figure this out with InferAvroSchema ->
> >> SplitJson(AvroReader, JsonRecordSetWriter).
> >>
> >> Here's an example payload:
> >> [{
> >> "id": "56740f4b-48de-0502-afdc-59a463b3f6dc",
> >> "account_id": "b0dad7e2-7bb9-4ca9-b9fd-134870656eb2",
> >> "contact_id": "a0ebd53a-77c5-e2ea-4787-59a463053b1b",
> >> "date_modified": 1503959931000,
> >> "deleted": 0
> >> },
> >> {
> >> "id": "1ac80e25-7f28-f5c6-bac0-59a4636ef31f",
> >> "account_id": "71d4904e-f8f1-4209-bff9-4d080057ea84",
> >> "contact_id": "e429bfe6-9c89-8b81-9ee6-59a463fc7fd8",
> >> "date_modified": 1503959873000,
> >> "deleted": 0
> >> }]
> >>
> >> Here's the schema that gets inferred (the AvroReader's Avro Record Name
> is
> >> "root"):
> >> {
> >> "type": "array",
> >> "items": {
> >> "type": "record",
> >> "name": "root",
> >> "fields": [
> >> {
> >> "name": "id",
> >> "type": "string",
> >> "doc": "Type inferred from
> >> '\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"
> >> },
> >> {
> >> "name": "account_id",
> >> "type": "string",
> >> "doc": "Type inferred from
> >> '\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"
> >> },
> >> {
> >> "name": "contact_id",
> >> "type": "string",
> >> "doc": "Type inferred from
> >> '\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"
> >> },
> >> {
> >> "name": "date_modified",
> >> "type": "long",
> >> "doc": "Type inferred from '1503959931000'"
> >> },
> >> {
> >> "name": "deleted",
> >> "type": "int",
> >> "doc": "Type inferred from '0'"
> >> }
> >> ]
> >> }
> >> }
> >>
> >> When I use ${inferred.avro.schema} for both the AvroReader and the
> >> JsonRecordSetWriter I get:
> >> SplitRecord[id=b3453515-caaa-1e1f-8bb6-26dec275a0d5] Failed to create
> >> Record Writer for
> >> StandardFlowFileRecord[uuid=45d7a0d2-258a-4f40-b5f9-4886eb2c2a76,claim=
> StandardContentClaim
> >> [resourceClaim=StandardResourceClaim[id=1504118228480-325,
> >> container=default, section=325], offset=0,
> >> length=86462199],offset=0,name=accounts-contacts.json.
> avro,size=86462199];
> >> routing to failure: org.apache.nifi.schema.access.
> SchemaNotFoundException:
> >> org.apache.avro.AvroRuntimeException: Not a record:
> >> {"type":"array","items":{"type":"record","name":"root","
> fields":[{"name":"id","type":"string","doc":"Type
> >> inferred from
> >> '\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"},{"name":"
> account_id","type":"string","doc":"Type
> >> inferred from
> >> '\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"},{"name":"
> contact_id","type":"string","doc":"Type
> >> inferred from
> >> '\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"},{"name":"
> date_modified","type":"long","doc":"Type
> >> inferred from '1503959931000'"},{"name":"deleted","type":"int","doc":"
> Type
> >> inferred from '0'"}]}}.
> >>
> >> The stack trace:
> >> 2017-08-30 19:42:21,692 ERROR [Timer-Driven Process Thread-9]
> >> o.a.nifi.processors.standard.SplitRecord
> >> SplitRecord[id=b3453515-caaa-1e1f-8bb6-26dec275a0d5] Failed to create
> Record
> >> Writer for
> >> StandardFlowFileRecord[uuid=a5f720cf-98a8-4c29-bd91-098c7f25448d,claim=
> StandardContentClaim
> >> [resourceClaim=StandardResourceClaim[id=1504121074997-336,
> >> container=default, section=336], offset=1013917,
> >> length=454],offset=0,name=626851422080935,size=454]; routing to
> failure:
> >> org.apache.nifi.schema.access.SchemaNotFoundException:
> >> org.apache.avro.AvroRuntimeException: Not a record:
> >> {"type":"array","items":{"type":"record","name":"root","
> fields":[{"name":"id","type":"string","doc":"Type
> >> inferred from
> >> '\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"},{"name":"
> account_id","type":"string","doc":"Type
> >> inferred from
> >> '\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"},{"name":"
> contact_id","type":"string","doc":"Type
> >> inferred from
> >> '\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"},{"name":"
> date_modified","type":"long","doc":"Type
> >> inferred from '1503959931000'"},{"name":"deleted","type":"int","doc":"
> Type
> >> inferred from '0'"}]}}
> >> org.apache.nifi.schema.access.SchemaNotFoundException:
> >> org.apache.avro.AvroRuntimeException: Not a record:
> >> {"type":"array","items":{"type":"record","name":"root","
> fields":[{"name":"id","type":"string","doc":"Type
> >> inferred from
> >> '\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"},{"name":"
> account_id","type":"string","doc":"Type
> >> inferred from
> >> '\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"},{"name":"
> contact_id","type":"string","doc":"Type
> >> inferred from
> >> '\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"},{"name":"
> date_modified","type":"long","doc":"Type
> >> inferred from '1503959931000'"},{"name":"deleted","type":"int","doc":"
> Type
> >> inferred from '0'"}]}}
> >> at
> >> org.apache.nifi.schema.access.AvroSchemaTextStrategy.getSchema(
> AvroSchemaTextStrategy.java:55)
> >> at
> >> org.apache.nifi.serialization.SchemaRegistryService.getSchema(
> SchemaRegistryService.java:112)
> >> at sun.reflect.GeneratedMethodAccessor1466.invoke(Unknown Source)
> >> at
> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> >> at java.lang.reflect.Method.invoke(Method.java:498)
> >> at
> >> org.apache.nifi.controller.service.StandardControllerServiceInvoc
> ationHandler.invoke(StandardControllerServiceInvocationHandler.java:89)
> >> at com.sun.proxy.$Proxy144.getSchema(Unknown Source)
> >> at
> >> org.apache.nifi.processors.standard.SplitRecord.
> onTrigger(SplitRecord.java:138)
> >> at
> >> org.apache.nifi.processor.AbstractProcessor.onTrigger(
> AbstractProcessor.java:27)
> >> at
> >> org.apache.nifi.controller.StandardProcessorNode.onTrigger(
> StandardProcessorNode.java:1120)
> >> at
> >> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
> ContinuallyRunProcessorTask.java:147)
> >> at
> >> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
> ContinuallyRunProcessorTask.java:47)
> >> at
> >> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(
> TimerDrivenSchedulingAgent.java:132)
> >> at java.util.concurrent.Executors$RunnableAdapter.
> call(Executors.java:511)
> >> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> >> at
> >> java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> >> at
> >> java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> >> at
> >> java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> >> at
> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> >> at java.lang.Thread.run(Thread.java:748)
> >> Caused by: org.apache.avro.AvroRuntimeException: Not a record:
> >> {"type":"array","items":{"type":"record","name":"root","
> fields":[{"name":"id","type":"string","doc":"Type
> >> inferred from
> >> '\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"},{"name":"
> account_id","type":"string","doc":"Type
> >> inferred from
> >> '\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"},{"name":"
> contact_id","type":"string","doc":"Type
> >> inferred from
> >> '\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"},{"name":"
> date_modified","type":"long","doc":"Type
> >> inferred from '1503959931000'"},{"name":"deleted","type":"int","doc":"
> Type
> >> inferred from '0'"}]}}
> >> at org.apache.avro.Schema.getFields(Schema.java:220)
> >> at org.apache.nifi.avro.AvroTypeUtil.createSchema(
> AvroTypeUtil.java:218)
> >> at org.apache.nifi.avro.AvroTypeUtil.createSchema(
> AvroTypeUtil.java:202)
> >> at
> >> org.apache.nifi.schema.access.AvroSchemaTextStrategy.getSchema(
> AvroSchemaTextStrategy.java:53)
> >> ... 19 common frames omitted
> >>
> >> Which looks like it's coming from the Writer, and is maybe about the
> root
> >> element being an array as opposed to a record. So I hardcoded the
> schema in
> >> the JsonRecordSetWriter to be just the record like this:
> >> {
> >> "type": "record",
> >> "name": "root",
> >> "fields": [
> >> {
> >> "name": "id",
> >> "type": "string",
> >> "doc": "Type inferred from
> >> '\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"
> >> },
> >> {
> >> "name": "account_id",
> >> "type": "string",
> >> "doc": "Type inferred from
> >> '\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"
> >> },
> >> {
> >> "name": "contact_id",
> >> "type": "string",
> >> "doc": "Type inferred from
> >> '\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"
> >> },
> >> {
> >> "name": "date_modified",
> >> "type": "long",
> >> "doc": "Type inferred from '1503959931000'"
> >> },
> >> {
> >> "name": "deleted",
> >> "type": "int",
> >> "doc": "Type inferred from '0'"
> >> }
> >> ]
> >> }
> >> Which gave me:
> >> SplitRecord[id=b3453515-caaa-1e1f-8bb6-26dec275a0d5] Failed to split
> >> StandardFlowFileRecord[uuid=e2098e1b-f2f8-4ca8-926f-6d0e3643ce45,claim=
> StandardContentClaim
> >> [resourceClaim=StandardResourceClaim[id=1504118228480-325,
> >> container=default, section=325], offset=0,
> >> length=86462199],offset=0,name=accounts-contacts.json.
> avro,size=86462199]:
> >> org.apache.nifi.processor.exception.ProcessException: Failed to parse
> >> incoming data.
> >>
> >> The stack trace:
> >> 2017-08-30 19:31:21,690 ERROR [Timer-Driven Process Thread-3]
> >> o.a.nifi.processors.standard.SplitRecord
> >> SplitRecord[id=b3453515-caaa-1e1f-8bb6-26dec275a0d5] Failed to split
> >> StandardFlowFileRecord[uuid=41f02871-e5e8-496e-a671-bc3200c4bf8e,claim=
> StandardContentClaim
> >> [resourceClaim=StandardResourceClaim[id=1504121074874-335,
> >> container=default, section=335], offset=1028220,
> >> length=454],offset=0,name=626191417134736,size=454]:
> >> org.apache.nifi.processor.exception.ProcessException: Failed to parse
> >> incoming data
> >> org.apache.nifi.processor.exception.ProcessException: Failed to parse
> >> incoming data
> >> at
> >> org.apache.nifi.processors.standard.SplitRecord$1.
> process(SplitRecord.java:187)
> >> at
> >> org.apache.nifi.controller.repository.StandardProcessSession.read(
> StandardProcessSession.java:2136)
> >> at
> >> org.apache.nifi.controller.repository.StandardProcessSession.read(
> StandardProcessSession.java:2106)
> >> at
> >> org.apache.nifi.processors.standard.SplitRecord.
> onTrigger(SplitRecord.java:149)
> >> at
> >> org.apache.nifi.processor.AbstractProcessor.onTrigger(
> AbstractProcessor.java:27)
> >> at
> >> org.apache.nifi.controller.StandardProcessorNode.onTrigger(
> StandardProcessorNode.java:1120)
> >> at
> >> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
> ContinuallyRunProcessorTask.java:147)
> >> at
> >> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
> ContinuallyRunProcessorTask.java:47)
> >> at
> >> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(
> TimerDrivenSchedulingAgent.java:132)
> >> at java.util.concurrent.Executors$RunnableAdapter.
> call(Executors.java:511)
> >> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> >> at
> >> java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> >> at
> >> java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> >> at
> >> java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> >> at
> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> >> at java.lang.Thread.run(Thread.java:748)
> >> Caused by: org.apache.nifi.schema.access.SchemaNotFoundException:
> >> org.apache.avro.AvroRuntimeException: Not a record:
> >> {"type":"array","items":{"type":"record","name":"root","
> fields":[{"name":"id","type":"string","doc":"Type
> >> inferred from
> >> '\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"},{"name":"
> account_id","type":"string","doc":"Type
> >> inferred from
> >> '\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"},{"name":"
> contact_id","type":"string","doc":"Type
> >> inferred from
> >> '\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"},{"name":"
> date_modified","type":"long","doc":"Type
> >> inferred from '1503959931000'"},{"name":"deleted","type":"int","doc":"
> Type
> >> inferred from '0'"}]}}
> >> at
> >> org.apache.nifi.schema.access.AvroSchemaTextStrategy.getSchema(
> AvroSchemaTextStrategy.java:55)
> >> at
> >> org.apache.nifi.serialization.SchemaRegistryService.getSchema(
> SchemaRegistryService.java:112)
> >> at org.apache.nifi.avro.AvroReader.createRecordReader(
> AvroReader.java:92)
> >> at sun.reflect.GeneratedMethodAccessor1467.invoke(Unknown Source)
> >> at
> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> >> at java.lang.reflect.Method.invoke(Method.java:498)
> >> at
> >> org.apache.nifi.controller.service.StandardControllerServiceInvoc
> ationHandler.invoke(StandardControllerServiceInvocationHandler.java:89)
> >> at com.sun.proxy.$Proxy158.createRecordReader(Unknown Source)
> >> at
> >> org.apache.nifi.processors.standard.SplitRecord$1.
> process(SplitRecord.java:152)
> >> ... 15 common frames omitted
> >> Caused by: org.apache.avro.AvroRuntimeException: Not a record:
> >> {"type":"array","items":{"type":"record","name":"root","
> fields":[{"name":"id","type":"string","doc":"Type
> >> inferred from
> >> '\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"},{"name":"
> account_id","type":"string","doc":"Type
> >> inferred from
> >> '\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"},{"name":"
> contact_id","type":"string","doc":"Type
> >> inferred from
> >> '\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"},{"name":"
> date_modified","type":"long","doc":"Type
> >> inferred from '1503959931000'"},{"name":"deleted","type":"int","doc":"
> Type
> >> inferred from '0'"}]}}
> >> at org.apache.avro.Schema.getFields(Schema.java:220)
> >> at org.apache.nifi.avro.AvroTypeUtil.createSchema(
> AvroTypeUtil.java:218)
> >> at org.apache.nifi.avro.AvroTypeUtil.createSchema(
> AvroTypeUtil.java:202)
> >> at
> >> org.apache.nifi.schema.access.AvroSchemaTextStrategy.getSchema(
> AvroSchemaTextStrategy.java:53)
> >> ... 23 common frames omitted
> >>
> >> Can somebody point me to what I'm doing wrong? Or suggest an
> alternative
> >> approach to chunking arbitrary JSON arrays?
> >>
> >> Thanks,
> >> Neil
> >
> >
>
<?xml version="1.0" ?>
<template encoding-version="1.1">
<description></description>
<groupId>102f3353-015e-1000-78d0-043a14fedd03</groupId>
<name>inferred_record_split</name>
<snippet>
<connections>
<id>8c6ff019-297d-395e-0000-000000000000</id>
<parentGroupId>8d16826d-ce84-362e-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>8d16826d-ce84-362e-0000-000000000000</groupId>
<id>9bdc7b0b-db26-374a-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>8d16826d-ce84-362e-0000-000000000000</groupId>
<id>93dc502a-0071-3183-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>ce151ab2-095c-360e-0000-000000000000</id>
<parentGroupId>8d16826d-ce84-362e-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>8d16826d-ce84-362e-0000-000000000000</groupId>
<id>90e4bbe0-4477-34e1-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>splits</selectedRelationships>
<source>
<groupId>8d16826d-ce84-362e-0000-000000000000</groupId>
<id>9bdc7b0b-db26-374a-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>d4d9f32b-259a-3c26-0000-000000000000</id>
<parentGroupId>8d16826d-ce84-362e-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>8d16826d-ce84-362e-0000-000000000000</groupId>
<id>93dc502a-0071-3183-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>8d16826d-ce84-362e-0000-000000000000</groupId>
<id>3ffeeea3-a0cc-386a-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>297e1ac2-407e-3ecd-0000-000000000000</id>
<parentGroupId>8d16826d-ce84-362e-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>8d16826d-ce84-362e-0000-000000000000</groupId>
<id>3ffeeea3-a0cc-386a-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>8d16826d-ce84-362e-0000-000000000000</groupId>
<id>1c12157b-c44e-3223-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<controllerServices>
<id>de57752d-8a16-311c-0000-000000000000</id>
<parentGroupId>8d16826d-ce84-362e-0000-000000000000</parentGroupId>
<bundle>
<artifact>nifi-record-serialization-services-nar</artifact>
<group>org.apache.nifi</group>
<version>1.3.0</version>
</bundle>
<comments></comments>
<descriptors>
<entry>
<key>schema-access-strategy</key>
<value>
<name>schema-access-strategy</name>
</value>
</entry>
<entry>
<key>schema-registry</key>
<value>
<identifiesControllerService>org.apache.nifi.schemaregistry.services.SchemaRegistry</identifiesControllerService>
<name>schema-registry</name>
</value>
</entry>
<entry>
<key>schema-name</key>
<value>
<name>schema-name</name>
</value>
</entry>
<entry>
<key>schema-text</key>
<value>
<name>schema-text</name>
</value>
</entry>
<entry>
<key>Date Format</key>
<value>
<name>Date Format</name>
</value>
</entry>
<entry>
<key>Time Format</key>
<value>
<name>Time Format</name>
</value>
</entry>
<entry>
<key>Timestamp Format</key>
<value>
<name>Timestamp Format</name>
</value>
</entry>
</descriptors>
<name>JsonTreeReader</name>
<persistsState>false</persistsState>
<properties>
<entry>
<key>schema-access-strategy</key>
<value>schema-text-property</value>
</entry>
<entry>
<key>schema-registry</key>
</entry>
<entry>
<key>schema-name</key>
</entry>
<entry>
<key>schema-text</key>
</entry>
<entry>
<key>Date Format</key>
</entry>
<entry>
<key>Time Format</key>
</entry>
<entry>
<key>Timestamp Format</key>
</entry>
</properties>
<state>ENABLED</state>
<type>org.apache.nifi.json.JsonTreeReader</type>
</controllerServices>
<controllerServices>
<id>f8122fc5-36ce-3753-0000-000000000000</id>
<parentGroupId>8d16826d-ce84-362e-0000-000000000000</parentGroupId>
<bundle>
<artifact>nifi-record-serialization-services-nar</artifact>
<group>org.apache.nifi</group>
<version>1.3.0</version>
</bundle>
<comments></comments>
<descriptors>
<entry>
<key>Schema Write Strategy</key>
<value>
<name>Schema Write Strategy</name>
</value>
</entry>
<entry>
<key>schema-access-strategy</key>
<value>
<name>schema-access-strategy</name>
</value>
</entry>
<entry>
<key>schema-registry</key>
<value>
<identifiesControllerService>org.apache.nifi.schemaregistry.services.SchemaRegistry</identifiesControllerService>
<name>schema-registry</name>
</value>
</entry>
<entry>
<key>schema-name</key>
<value>
<name>schema-name</name>
</value>
</entry>
<entry>
<key>schema-text</key>
<value>
<name>schema-text</name>
</value>
</entry>
<entry>
<key>Date Format</key>
<value>
<name>Date Format</name>
</value>
</entry>
<entry>
<key>Time Format</key>
<value>
<name>Time Format</name>
</value>
</entry>
<entry>
<key>Timestamp Format</key>
<value>
<name>Timestamp Format</name>
</value>
</entry>
<entry>
<key>Pretty Print JSON</key>
<value>
<name>Pretty Print JSON</name>
</value>
</entry>
</descriptors>
<name>JsonRecordSetWriter</name>
<persistsState>false</persistsState>
<properties>
<entry>
<key>Schema Write Strategy</key>
<value>full-schema-attribute</value>
</entry>
<entry>
<key>schema-access-strategy</key>
<value>schema-text-property</value>
</entry>
<entry>
<key>schema-registry</key>
</entry>
<entry>
<key>schema-name</key>
</entry>
<entry>
<key>schema-text</key>
</entry>
<entry>
<key>Date Format</key>
</entry>
<entry>
<key>Time Format</key>
</entry>
<entry>
<key>Timestamp Format</key>
</entry>
<entry>
<key>Pretty Print JSON</key>
</entry>
</properties>
<state>ENABLED</state>
<type>org.apache.nifi.json.JsonRecordSetWriter</type>
</controllerServices>
<processors>
<id>90e4bbe0-4477-34e1-0000-000000000000</id>
<parentGroupId>8d16826d-ce84-362e-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>755.0000076293945</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.3.0</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Log Level</key>
<value>
<name>Log Level</name>
</value>
</entry>
<entry>
<key>Log Payload</key>
<value>
<name>Log Payload</name>
</value>
</entry>
<entry>
<key>Attributes to Log</key>
<value>
<name>Attributes to Log</name>
</value>
</entry>
<entry>
<key>Attributes to Ignore</key>
<value>
<name>Attributes to Ignore</name>
</value>
</entry>
<entry>
<key>Log prefix</key>
<value>
<name>Log prefix</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Log Level</key>
<value>info</value>
</entry>
<entry>
<key>Log Payload</key>
<value>false</value>
</entry>
<entry>
<key>Attributes to Log</key>
</entry>
<entry>
<key>Attributes to Ignore</key>
</entry>
<entry>
<key>Log prefix</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>LogAttribute</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.LogAttribute</type>
</processors>
<processors>
<id>93dc502a-0071-3183-0000-000000000000</id>
<parentGroupId>8d16826d-ce84-362e-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>380.00000762939453</y>
</position>
<bundle>
<artifact>nifi-update-attribute-nar</artifact>
<group>org.apache.nifi</group>
<version>1.3.0</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>Store State</key>
<value>
<name>Store State</name>
</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
<value>
<name>Stateful Variables Initial Value</name>
</value>
</entry>
<entry>
<key>avro.schema</key>
<value>
<name>avro.schema</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>Store State</key>
<value>Do not store state</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
</entry>
<entry>
<key>avro.schema</key>
<value>${inferred.avro.schema:jsonPath('$.items')}</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>UpdateAttribute</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
<processors>
<id>9bdc7b0b-db26-374a-0000-000000000000</id>
<parentGroupId>8d16826d-ce84-362e-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>562.9420223370378</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.3.0</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Record Reader</key>
<value>
<identifiesControllerService>org.apache.nifi.serialization.RecordReaderFactory</identifiesControllerService>
<name>Record Reader</name>
</value>
</entry>
<entry>
<key>Record Writer</key>
<value>
<identifiesControllerService>org.apache.nifi.serialization.RecordSetWriterFactory</identifiesControllerService>
<name>Record Writer</name>
</value>
</entry>
<entry>
<key>Records Per Split</key>
<value>
<name>Records Per Split</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Record Reader</key>
<value>de57752d-8a16-311c-0000-000000000000</value>
</entry>
<entry>
<key>Record Writer</key>
<value>f8122fc5-36ce-3753-0000-000000000000</value>
</entry>
<entry>
<key>Records Per Split</key>
<value>1</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>SplitRecord</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>original</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>splits</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.SplitRecord</type>
</processors>
<processors>
<id>1c12157b-c44e-3223-0000-000000000000</id>
<parentGroupId>8d16826d-ce84-362e-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>0.0</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.3.0</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>File Size</key>
<value>
<name>File Size</name>
</value>
</entry>
<entry>
<key>Batch Size</key>
<value>
<name>Batch Size</name>
</value>
</entry>
<entry>
<key>Data Format</key>
<value>
<name>Data Format</name>
</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>
<name>Unique FlowFiles</name>
</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>
<name>generate-ff-custom-text</name>
</value>
</entry>
<entry>
<key>schema.name</key>
<value>
<name>schema.name</name>
</value>
</entry>
</descriptors>
<executionNode>PRIMARY</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>File Size</key>
<value>0B</value>
</entry>
<entry>
<key>Batch Size</key>
<value>1</value>
</entry>
<entry>
<key>Data Format</key>
<value>Text</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>false</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>[{
"id": "56740f4b-48de-0502-afdc-59a463b3f6dc",
"account_id": "b0dad7e2-7bb9-4ca9-b9fd-134870656eb2",
"contact_id": "a0ebd53a-77c5-e2ea-4787-59a463053b1b",
"date_modified": 1503959931000,
"deleted": 0
},
{
"id": "1ac80e25-7f28-f5c6-bac0-59a4636ef31f",
"account_id": "71d4904e-f8f1-4209-bff9-4d080057ea84",
"contact_id": "e429bfe6-9c89-8b81-9ee6-59a463fc7fd8",
"date_modified": 1503959873000,
"deleted": 0
}]
</value>
</entry>
<entry>
<key>schema.name</key>
<value>root</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>10 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>GenerateFlowFile</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.GenerateFlowFile</type>
</processors>
<processors>
<id>3ffeeea3-a0cc-386a-0000-000000000000</id>
<parentGroupId>8d16826d-ce84-362e-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>185.0</y>
</position>
<bundle>
<artifact>nifi-kite-nar</artifact>
<group>org.apache.nifi</group>
<version>1.3.0</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Schema Output Destination</key>
<value>
<name>Schema Output Destination</name>
</value>
</entry>
<entry>
<key>Input Content Type</key>
<value>
<name>Input Content Type</name>
</value>
</entry>
<entry>
<key>CSV Header Definition</key>
<value>
<name>CSV Header Definition</name>
</value>
</entry>
<entry>
<key>Get CSV Header Definition From Data</key>
<value>
<name>Get CSV Header Definition From Data</name>
</value>
</entry>
<entry>
<key>CSV Header Line Skip Count</key>
<value>
<name>CSV Header Line Skip Count</name>
</value>
</entry>
<entry>
<key>CSV delimiter</key>
<value>
<name>CSV delimiter</name>
</value>
</entry>
<entry>
<key>CSV Escape String</key>
<value>
<name>CSV Escape String</name>
</value>
</entry>
<entry>
<key>CSV Quote String</key>
<value>
<name>CSV Quote String</name>
</value>
</entry>
<entry>
<key>Pretty Avro Output</key>
<value>
<name>Pretty Avro Output</name>
</value>
</entry>
<entry>
<key>Avro Record Name</key>
<value>
<name>Avro Record Name</name>
</value>
</entry>
<entry>
<key>Number Of Records To Analyze</key>
<value>
<name>Number Of Records To Analyze</name>
</value>
</entry>
<entry>
<key>Charset</key>
<value>
<name>Charset</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Schema Output Destination</key>
<value>flowfile-attribute</value>
</entry>
<entry>
<key>Input Content Type</key>
<value>json</value>
</entry>
<entry>
<key>CSV Header Definition</key>
</entry>
<entry>
<key>Get CSV Header Definition From Data</key>
<value>true</value>
</entry>
<entry>
<key>CSV Header Line Skip Count</key>
<value>0</value>
</entry>
<entry>
<key>CSV delimiter</key>
<value>,</value>
</entry>
<entry>
<key>CSV Escape String</key>
<value>\</value>
</entry>
<entry>
<key>CSV Quote String</key>
<value>'</value>
</entry>
<entry>
<key>Pretty Avro Output</key>
<value>true</value>
</entry>
<entry>
<key>Avro Record Name</key>
<value>root</value>
</entry>
<entry>
<key>Number Of Records To Analyze</key>
<value>10</value>
</entry>
<entry>
<key>Charset</key>
<value>UTF-8</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>InferAvroSchema</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>original</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>unsupported content</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.kite.InferAvroSchema</type>
</processors>
</snippet>
<timestamp>09/01/2017 13:22:38 EDT</timestamp>
</template>