Re: JSON array chunking
Hi Bryan, Thanks for the tip, that was very helpful, and helped me finish. I wanted essentially your flow with a schema inference and no schema registry. The flow takes an arbitrary (no pre-defined schema other than being an array) JSON array and chunks it out. I attached the final result for anyone who's curious. Neil On Thu, Aug 31, 2017 at 9:28 AM, Bryan Bendewrote: > Neil, > > I'm a little confused as to what format your initial data is in... You > showed an example payload as JSON, but then mentioned using an > AvroReader, so it wasn't clear to me if your starting point is JSON or > Avro. > > Assuming it is JSON, I put together a template that shows how to split > your sample data: > > https://gist.github.com/bbende/f73d06c0d35ed1aeb2603a8f87276ed7 > > I used the second schema you have (the one where the top-level element > is a record) and then SplitRecord with a JsonTreeReader and > JsonRecordSetWriter. > > The sample data I sent in was your example data, and it produced two > flow files coming out of SplitRecord, one for each element of the > array. > > Let us know if this not what you are trying to do. > > Thanks, > > Bryan > > > On Wed, Aug 30, 2017 at 8:31 PM, Neil Derraugh > wrote: > > I should have mentioned I tried starting with a JsonPathReader before the > > AvroReader. I had a property I was calling root with a value of $. I > can > > post details about that too if it would be helpful. > > > > On Wed, Aug 30, 2017 at 8:08 PM, Neil Derraugh > > wrote: > >> > >> I have arbitrary JSON arrays that I want to split into chunks. I've > been > >> (unsuccessfully) trying to figure this out with InferAvroSchema -> > >> SplitJson(AvroReader, JsonRecordSetWriter). > >> > >> Here's an example payload: > >> [{ > >> "id": "56740f4b-48de-0502-afdc-59a463b3f6dc", > >> "account_id": "b0dad7e2-7bb9-4ca9-b9fd-134870656eb2", > >> "contact_id": "a0ebd53a-77c5-e2ea-4787-59a463053b1b", > >> "date_modified": 1503959931000, > >> "deleted": 0 > >> }, > >> { > >> "id": "1ac80e25-7f28-f5c6-bac0-59a4636ef31f", > >> "account_id": "71d4904e-f8f1-4209-bff9-4d080057ea84", > >> "contact_id": "e429bfe6-9c89-8b81-9ee6-59a463fc7fd8", > >> "date_modified": 1503959873000, > >> "deleted": 0 > >> }] > >> > >> Here's the schema that gets inferred (the AvroReader's Avro Record Name > is > >> "root"): > >> { > >> "type": "array", > >> "items": { > >> "type": "record", > >> "name": "root", > >> "fields": [ > >> { > >> "name": "id", > >> "type": "string", > >> "doc": "Type inferred from > >> '\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'" > >> }, > >> { > >> "name": "account_id", > >> "type": "string", > >> "doc": "Type inferred from > >> '\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'" > >> }, > >> { > >> "name": "contact_id", > >> "type": "string", > >> "doc": "Type inferred from > >> '\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'" > >> }, > >> { > >> "name": "date_modified", > >> "type": "long", > >> "doc": "Type inferred from '1503959931000'" > >> }, > >> { > >> "name": "deleted", > >> "type": "int", > >> "doc": "Type inferred from '0'" > >> } > >> ] > >> } > >> } > >> > >> When I use ${inferred.avro.schema} for both the AvroReader and the > >> JsonRecordSetWriter I get: > >> SplitRecord[id=b3453515-caaa-1e1f-8bb6-26dec275a0d5] Failed to create > >> Record Writer for > >> StandardFlowFileRecord[uuid=45d7a0d2-258a-4f40-b5f9-4886eb2c2a76,claim= > StandardContentClaim > >> [resourceClaim=StandardResourceClaim[id=1504118228480-325, > >> container=default, section=325], offset=0, > >> length=86462199],offset=0,name=accounts-contacts.json. > avro,size=86462199]; > >> routing to failure: org.apache.nifi.schema.access. > SchemaNotFoundException: > >> org.apache.avro.AvroRuntimeException: Not a record: > >> {"type":"array","items":{"type":"record","name":"root"," > fields":[{"name":"id","type":"string","doc":"Type > >> inferred from > >> '\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"},{"name":" > account_id","type":"string","doc":"Type > >> inferred from > >> '\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"},{"name":" > contact_id","type":"string","doc":"Type > >> inferred from > >> '\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"},{"name":" > date_modified","type":"long","doc":"Type > >> inferred from '1503959931000'"},{"name":"deleted","type":"int","doc":" > Type > >> inferred from '0'"}]}}. > >> > >> The stack trace: > >> 2017-08-30 19:42:21,692 ERROR [Timer-Driven Process Thread-9] > >> o.a.nifi.processors.standard.SplitRecord > >> SplitRecord[id=b3453515-caaa-1e1f-8bb6-26dec275a0d5] Failed to create > Record > >> Writer for > >> StandardFlowFileRecord[uuid=a5f720cf-98a8-4c29-bd91-098c7f25448d,claim= >
Re: JSON array chunking
Neil, I'm a little confused as to what format your initial data is in... You showed an example payload as JSON, but then mentioned using an AvroReader, so it wasn't clear to me if your starting point is JSON or Avro. Assuming it is JSON, I put together a template that shows how to split your sample data: https://gist.github.com/bbende/f73d06c0d35ed1aeb2603a8f87276ed7 I used the second schema you have (the one where the top-level element is a record) and then SplitRecord with a JsonTreeReader and JsonRecordSetWriter. The sample data I sent in was your example data, and it produced two flow files coming out of SplitRecord, one for each element of the array. Let us know if this not what you are trying to do. Thanks, Bryan On Wed, Aug 30, 2017 at 8:31 PM, Neil Derraughwrote: > I should have mentioned I tried starting with a JsonPathReader before the > AvroReader. I had a property I was calling root with a value of $. I can > post details about that too if it would be helpful. > > On Wed, Aug 30, 2017 at 8:08 PM, Neil Derraugh > wrote: >> >> I have arbitrary JSON arrays that I want to split into chunks. I've been >> (unsuccessfully) trying to figure this out with InferAvroSchema -> >> SplitJson(AvroReader, JsonRecordSetWriter). >> >> Here's an example payload: >> [{ >> "id": "56740f4b-48de-0502-afdc-59a463b3f6dc", >> "account_id": "b0dad7e2-7bb9-4ca9-b9fd-134870656eb2", >> "contact_id": "a0ebd53a-77c5-e2ea-4787-59a463053b1b", >> "date_modified": 1503959931000, >> "deleted": 0 >> }, >> { >> "id": "1ac80e25-7f28-f5c6-bac0-59a4636ef31f", >> "account_id": "71d4904e-f8f1-4209-bff9-4d080057ea84", >> "contact_id": "e429bfe6-9c89-8b81-9ee6-59a463fc7fd8", >> "date_modified": 1503959873000, >> "deleted": 0 >> }] >> >> Here's the schema that gets inferred (the AvroReader's Avro Record Name is >> "root"): >> { >> "type": "array", >> "items": { >> "type": "record", >> "name": "root", >> "fields": [ >> { >> "name": "id", >> "type": "string", >> "doc": "Type inferred from >> '\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'" >> }, >> { >> "name": "account_id", >> "type": "string", >> "doc": "Type inferred from >> '\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'" >> }, >> { >> "name": "contact_id", >> "type": "string", >> "doc": "Type inferred from >> '\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'" >> }, >> { >> "name": "date_modified", >> "type": "long", >> "doc": "Type inferred from '1503959931000'" >> }, >> { >> "name": "deleted", >> "type": "int", >> "doc": "Type inferred from '0'" >> } >> ] >> } >> } >> >> When I use ${inferred.avro.schema} for both the AvroReader and the >> JsonRecordSetWriter I get: >> SplitRecord[id=b3453515-caaa-1e1f-8bb6-26dec275a0d5] Failed to create >> Record Writer for >> StandardFlowFileRecord[uuid=45d7a0d2-258a-4f40-b5f9-4886eb2c2a76,claim=StandardContentClaim >> [resourceClaim=StandardResourceClaim[id=1504118228480-325, >> container=default, section=325], offset=0, >> length=86462199],offset=0,name=accounts-contacts.json.avro,size=86462199]; >> routing to failure: org.apache.nifi.schema.access.SchemaNotFoundException: >> org.apache.avro.AvroRuntimeException: Not a record: >> {"type":"array","items":{"type":"record","name":"root","fields":[{"name":"id","type":"string","doc":"Type >> inferred from >> '\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"},{"name":"account_id","type":"string","doc":"Type >> inferred from >> '\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"},{"name":"contact_id","type":"string","doc":"Type >> inferred from >> '\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"},{"name":"date_modified","type":"long","doc":"Type >> inferred from '1503959931000'"},{"name":"deleted","type":"int","doc":"Type >> inferred from '0'"}]}}. >> >> The stack trace: >> 2017-08-30 19:42:21,692 ERROR [Timer-Driven Process Thread-9] >> o.a.nifi.processors.standard.SplitRecord >> SplitRecord[id=b3453515-caaa-1e1f-8bb6-26dec275a0d5] Failed to create Record >> Writer for >> StandardFlowFileRecord[uuid=a5f720cf-98a8-4c29-bd91-098c7f25448d,claim=StandardContentClaim >> [resourceClaim=StandardResourceClaim[id=1504121074997-336, >> container=default, section=336], offset=1013917, >> length=454],offset=0,name=626851422080935,size=454]; routing to failure: >> org.apache.nifi.schema.access.SchemaNotFoundException: >> org.apache.avro.AvroRuntimeException: Not a record: >> {"type":"array","items":{"type":"record","name":"root","fields":[{"name":"id","type":"string","doc":"Type >> inferred from >> '\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"},{"name":"account_id","type":"string","doc":"Type >> inferred from >> '\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"},{"name":"contact_id","type":"string","doc":"Type >> inferred from
Re: JSON array chunking
I should have mentioned I tried starting with a JsonPathReader before the AvroReader. I had a property I was calling root with a value of $. I can post details about that too if it would be helpful. On Wed, Aug 30, 2017 at 8:08 PM, Neil Derraugh < neil.derra...@intellifylearning.com> wrote: > I have arbitrary JSON arrays that I want to split into chunks. I've been > (unsuccessfully) trying to figure this out with InferAvroSchema -> > SplitJson(AvroReader, JsonRecordSetWriter). > > Here's an example payload: > [{ > "id": "56740f4b-48de-0502-afdc-59a463b3f6dc", > "account_id": "b0dad7e2-7bb9-4ca9-b9fd-134870656eb2", > "contact_id": "a0ebd53a-77c5-e2ea-4787-59a463053b1b", > "date_modified": 1503959931000, > "deleted": 0 > }, > { > "id": "1ac80e25-7f28-f5c6-bac0-59a4636ef31f", > "account_id": "71d4904e-f8f1-4209-bff9-4d080057ea84", > "contact_id": "e429bfe6-9c89-8b81-9ee6-59a463fc7fd8", > "date_modified": 1503959873000, > "deleted": 0 > }] > > Here's the schema that gets inferred (the AvroReader's Avro Record Name is > "root"): > { > "type": "array", > "items": { > "type": "record", > "name": "root", > "fields": [ > { > "name": "id", > "type": "string", > "doc": "Type inferred from '\"56740f4b-48de-0502-afdc- > 59a463b3f6dc\"'" > }, > { > "name": "account_id", > "type": "string", > "doc": "Type inferred from '\"b0dad7e2-7bb9-4ca9-b9fd- > 134870656eb2\"'" > }, > { > "name": "contact_id", > "type": "string", > "doc": "Type inferred from '\"a0ebd53a-77c5-e2ea-4787- > 59a463053b1b\"'" > }, > { > "name": "date_modified", > "type": "long", > "doc": "Type inferred from '1503959931000'" > }, > { > "name": "deleted", > "type": "int", > "doc": "Type inferred from '0'" > } > ] > } > } > > When I use ${inferred.avro.schema} for both the AvroReader and the > JsonRecordSetWriter I get: > SplitRecord[id=b3453515-caaa-1e1f-8bb6-26dec275a0d5] Failed to create > Record Writer for StandardFlowFileRecord[uuid=45d7a0d2-258a-4f40-b5f9- > 4886eb2c2a76,claim=StandardContentClaim [resourceClaim= > StandardResourceClaim[id=1504118228480-325, container=default, > section=325], offset=0, length=86462199],offset=0, > name=accounts-contacts.json.avro,size=86462199]; routing to failure: > org.apache.nifi.schema.access.SchemaNotFoundException: > org.apache.avro.AvroRuntimeException: > Not a record: {"type":"array","items":{"type":"record","name":"root"," > fields":[{"name":"id","type":"string","doc":"Type inferred from > '\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"},{"name":" > account_id","type":"string","doc":"Type inferred from > '\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"},{"name":" > contact_id","type":"string","doc":"Type inferred from > '\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"},{"name":" > date_modified","type":"long","doc":"Type inferred from > '1503959931000'"},{"name":"deleted","type":"int","doc":"Type inferred > from '0'"}]}}. > > The stack trace: > 2017-08-30 19:42:21,692 ERROR [Timer-Driven Process Thread-9] > o.a.nifi.processors.standard.SplitRecord > SplitRecord[id=b3453515-caaa-1e1f-8bb6-26dec275a0d5] > Failed to create Record Writer for StandardFlowFileRecord[uuid= > a5f720cf-98a8-4c29-bd91-098c7f25448d,claim=StandardContentClaim > [resourceClaim=StandardResourceClaim[id=1504121074997-336, > container=default, section=336], offset=1013917, > length=454],offset=0,name=626851422080935,size=454]; > routing to failure: org.apache.nifi.schema.access.SchemaNotFoundException: > org.apache.avro.AvroRuntimeException: Not a record: > {"type":"array","items":{"type":"record","name":"root"," > fields":[{"name":"id","type":"string","doc":"Type inferred from > '\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"},{"name":" > account_id","type":"string","doc":"Type inferred from > '\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"},{"name":" > contact_id","type":"string","doc":"Type inferred from > '\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"},{"name":" > date_modified","type":"long","doc":"Type inferred from > '1503959931000'"},{"name":"deleted","type":"int","doc":"Type inferred > from '0'"}]}} > org.apache.nifi.schema.access.SchemaNotFoundException: > org.apache.avro.AvroRuntimeException: > Not a record: {"type":"array","items":{"type":"record","name":"root"," > fields":[{"name":"id","type":"string","doc":"Type inferred from > '\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"},{"name":" > account_id","type":"string","doc":"Type inferred from > '\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"},{"name":" > contact_id","type":"string","doc":"Type inferred from > '\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"},{"name":" > date_modified","type":"long","doc":"Type inferred from > '1503959931000'"},{"name":"deleted","type":"int","doc":"Type inferred > from '0'"}]}} > at
JSON array chunking
I have arbitrary JSON arrays that I want to split into chunks. I've been (unsuccessfully) trying to figure this out with InferAvroSchema -> SplitJson(AvroReader, JsonRecordSetWriter). Here's an example payload: [{ "id": "56740f4b-48de-0502-afdc-59a463b3f6dc", "account_id": "b0dad7e2-7bb9-4ca9-b9fd-134870656eb2", "contact_id": "a0ebd53a-77c5-e2ea-4787-59a463053b1b", "date_modified": 1503959931000, "deleted": 0 }, { "id": "1ac80e25-7f28-f5c6-bac0-59a4636ef31f", "account_id": "71d4904e-f8f1-4209-bff9-4d080057ea84", "contact_id": "e429bfe6-9c89-8b81-9ee6-59a463fc7fd8", "date_modified": 1503959873000, "deleted": 0 }] Here's the schema that gets inferred (the AvroReader's Avro Record Name is "root"): { "type": "array", "items": { "type": "record", "name": "root", "fields": [ { "name": "id", "type": "string", "doc": "Type inferred from '\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'" }, { "name": "account_id", "type": "string", "doc": "Type inferred from '\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'" }, { "name": "contact_id", "type": "string", "doc": "Type inferred from '\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'" }, { "name": "date_modified", "type": "long", "doc": "Type inferred from '1503959931000'" }, { "name": "deleted", "type": "int", "doc": "Type inferred from '0'" } ] } } When I use ${inferred.avro.schema} for both the AvroReader and the JsonRecordSetWriter I get: SplitRecord[id=b3453515-caaa-1e1f-8bb6-26dec275a0d5] Failed to create Record Writer for StandardFlowFileRecord[uuid=45d7a0d2-258a-4f40-b5f9-4886eb2c2a76,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1504118228480-325, container=default, section=325], offset=0, length=86462199],offset=0,name=accounts-contacts.json.avro,size=86462199]; routing to failure: org.apache.nifi.schema.access.SchemaNotFoundException: org.apache.avro.AvroRuntimeException: Not a record: {"type":"array","items":{"type":"record","name":"root","fields":[{"name":"id","type":"string","doc":"Type inferred from '\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"},{"name":"account_id","type":"string","doc":"Type inferred from '\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"},{"name":"contact_id","type":"string","doc":"Type inferred from '\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"},{"name":"date_modified","type":"long","doc":"Type inferred from '1503959931000'"},{"name":"deleted","type":"int","doc":"Type inferred from '0'"}]}}. The stack trace: 2017-08-30 19:42:21,692 ERROR [Timer-Driven Process Thread-9] o.a.nifi.processors.standard.SplitRecord SplitRecord[id=b3453515-caaa-1e1f-8bb6-26dec275a0d5] Failed to create Record Writer for StandardFlowFileRecord[uuid=a5f720cf-98a8-4c29-bd91-098c7f25448d,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1504121074997-336, container=default, section=336], offset=1013917, length=454],offset=0,name=626851422080935,size=454]; routing to failure: org.apache.nifi.schema.access.SchemaNotFoundException: org.apache.avro.AvroRuntimeException: Not a record: {"type":"array","items":{"type":"record","name":"root","fields":[{"name":"id","type":"string","doc":"Type inferred from '\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"},{"name":"account_id","type":"string","doc":"Type inferred from '\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"},{"name":"contact_id","type":"string","doc":"Type inferred from '\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"},{"name":"date_modified","type":"long","doc":"Type inferred from '1503959931000'"},{"name":"deleted","type":"int","doc":"Type inferred from '0'"}]}} org.apache.nifi.schema.access.SchemaNotFoundException: org.apache.avro.AvroRuntimeException: Not a record: {"type":"array","items":{"type":"record","name":"root","fields":[{"name":"id","type":"string","doc":"Type inferred from '\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"},{"name":"account_id","type":"string","doc":"Type inferred from '\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"},{"name":"contact_id","type":"string","doc":"Type inferred from '\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"},{"name":"date_modified","type":"long","doc":"Type inferred from '1503959931000'"},{"name":"deleted","type":"int","doc":"Type inferred from '0'"}]}} at org.apache.nifi.schema.access.AvroSchemaTextStrategy.getSchema(AvroSchemaTextStrategy.java:55) at org.apache.nifi.serialization.SchemaRegistryService.getSchema(SchemaRegistryService.java:112) at sun.reflect.GeneratedMethodAccessor1466.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:89) at com.sun.proxy.$Proxy144.getSchema(Unknown Source) at