I have arbitrary JSON arrays that I want to split into chunks. I've been
(unsuccessfully) trying to figure this out with InferAvroSchema ->
SplitJson(AvroReader, JsonRecordSetWriter).
Here's an example payload:
[{
"id": "56740f4b-48de-0502-afdc-59a463b3f6dc",
"account_id": "b0dad7e2-7bb9-4ca9-b9fd-134870656eb2",
"contact_id": "a0ebd53a-77c5-e2ea-4787-59a463053b1b",
"date_modified": 1503959931000,
"deleted": 0
},
{
"id": "1ac80e25-7f28-f5c6-bac0-59a4636ef31f",
"account_id": "71d4904e-f8f1-4209-bff9-4d080057ea84",
"contact_id": "e429bfe6-9c89-8b81-9ee6-59a463fc7fd8",
"date_modified": 1503959873000,
"deleted": 0
}]
Here's the schema that gets inferred (the AvroReader's Avro Record Name is
"root"):
{
"type": "array",
"items": {
"type": "record",
"name": "root",
"fields": [
{
"name": "id",
"type": "string",
"doc": "Type inferred from
'\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"
},
{
"name": "account_id",
"type": "string",
"doc": "Type inferred from
'\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"
},
{
"name": "contact_id",
"type": "string",
"doc": "Type inferred from
'\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"
},
{
"name": "date_modified",
"type": "long",
"doc": "Type inferred from '1503959931000'"
},
{
"name": "deleted",
"type": "int",
"doc": "Type inferred from '0'"
}
]
}
}
When I use ${inferred.avro.schema} for both the AvroReader and the
JsonRecordSetWriter I get:
SplitRecord[id=b3453515-caaa-1e1f-8bb6-26dec275a0d5] Failed to create
Record Writer for
StandardFlowFileRecord[uuid=45d7a0d2-258a-4f40-b5f9-4886eb2c2a76,claim=StandardContentClaim
[resourceClaim=StandardResourceClaim[id=1504118228480-325,
container=default, section=325], offset=0,
length=86462199],offset=0,name=accounts-contacts.json.avro,size=86462199];
routing to failure: org.apache.nifi.schema.access.SchemaNotFoundException:
org.apache.avro.AvroRuntimeException: Not a record:
{"type":"array","items":{"type":"record","name":"root","fields":[{"name":"id","type":"string","doc":"Type
inferred from
'\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"},{"name":"account_id","type":"string","doc":"Type
inferred from
'\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"},{"name":"contact_id","type":"string","doc":"Type
inferred from
'\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"},{"name":"date_modified","type":"long","doc":"Type
inferred from '1503959931000'"},{"name":"deleted","type":"int","doc":"Type
inferred from '0'"}]}}.
The stack trace:
2017-08-30 19:42:21,692 ERROR [Timer-Driven Process Thread-9]
o.a.nifi.processors.standard.SplitRecord
SplitRecord[id=b3453515-caaa-1e1f-8bb6-26dec275a0d5] Failed to create
Record Writer for
StandardFlowFileRecord[uuid=a5f720cf-98a8-4c29-bd91-098c7f25448d,claim=StandardContentClaim
[resourceClaim=StandardResourceClaim[id=1504121074997-336,
container=default, section=336], offset=1013917,
length=454],offset=0,name=626851422080935,size=454]; routing to failure:
org.apache.nifi.schema.access.SchemaNotFoundException:
org.apache.avro.AvroRuntimeException: Not a record:
{"type":"array","items":{"type":"record","name":"root","fields":[{"name":"id","type":"string","doc":"Type
inferred from
'\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"},{"name":"account_id","type":"string","doc":"Type
inferred from
'\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"},{"name":"contact_id","type":"string","doc":"Type
inferred from
'\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"},{"name":"date_modified","type":"long","doc":"Type
inferred from '1503959931000'"},{"name":"deleted","type":"int","doc":"Type
inferred from '0'"}]}}
org.apache.nifi.schema.access.SchemaNotFoundException:
org.apache.avro.AvroRuntimeException: Not a record:
{"type":"array","items":{"type":"record","name":"root","fields":[{"name":"id","type":"string","doc":"Type
inferred from
'\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"},{"name":"account_id","type":"string","doc":"Type
inferred from
'\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"},{"name":"contact_id","type":"string","doc":"Type
inferred from
'\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"},{"name":"date_modified","type":"long","doc":"Type
inferred from '1503959931000'"},{"name":"deleted","type":"int","doc":"Type
inferred from '0'"}]}}
at
org.apache.nifi.schema.access.AvroSchemaTextStrategy.getSchema(AvroSchemaTextStrategy.java:55)
at
org.apache.nifi.serialization.SchemaRegistryService.getSchema(SchemaRegistryService.java:112)
at sun.reflect.GeneratedMethodAccessor1466.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:89)
at com.sun.proxy.$Proxy144.getSchema(Unknown Source)
at
org.apache.nifi.processors.standard.SplitRecord.onTrigger(SplitRecord.java:138)
at
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1120)
at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
at
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.avro.AvroRuntimeException: Not a record:
{"type":"array","items":{"type":"record","name":"root","fields":[{"name":"id","type":"string","doc":"Type
inferred from
'\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"},{"name":"account_id","type":"string","doc":"Type
inferred from
'\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"},{"name":"contact_id","type":"string","doc":"Type
inferred from
'\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"},{"name":"date_modified","type":"long","doc":"Type
inferred from '1503959931000'"},{"name":"deleted","type":"int","doc":"Type
inferred from '0'"}]}}
at org.apache.avro.Schema.getFields(Schema.java:220)
at org.apache.nifi.avro.AvroTypeUtil.createSchema(AvroTypeUtil.java:218)
at org.apache.nifi.avro.AvroTypeUtil.createSchema(AvroTypeUtil.java:202)
at
org.apache.nifi.schema.access.AvroSchemaTextStrategy.getSchema(AvroSchemaTextStrategy.java:53)
... 19 common frames omitted
Which looks like it's coming from the Writer, and is maybe about the root
element being an array as opposed to a record. So I hardcoded the schema
in the JsonRecordSetWriter to be just the record like this:
{
"type": "record",
"name": "root",
"fields": [
{
"name": "id",
"type": "string",
"doc": "Type inferred from '\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"
},
{
"name": "account_id",
"type": "string",
"doc": "Type inferred from '\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"
},
{
"name": "contact_id",
"type": "string",
"doc": "Type inferred from '\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"
},
{
"name": "date_modified",
"type": "long",
"doc": "Type inferred from '1503959931000'"
},
{
"name": "deleted",
"type": "int",
"doc": "Type inferred from '0'"
}
]
}
Which gave me:
SplitRecord[id=b3453515-caaa-1e1f-8bb6-26dec275a0d5] Failed to split
StandardFlowFileRecord[uuid=e2098e1b-f2f8-4ca8-926f-6d0e3643ce45,claim=StandardContentClaim
[resourceClaim=StandardResourceClaim[id=1504118228480-325,
container=default, section=325], offset=0,
length=86462199],offset=0,name=accounts-contacts.json.avro,size=86462199]:
org.apache.nifi.processor.exception.ProcessException: Failed to parse
incoming data.
The stack trace:
2017-08-30 19:31:21,690 ERROR [Timer-Driven Process Thread-3]
o.a.nifi.processors.standard.SplitRecord
SplitRecord[id=b3453515-caaa-1e1f-8bb6-26dec275a0d5] Failed to split
StandardFlowFileRecord[uuid=41f02871-e5e8-496e-a671-bc3200c4bf8e,claim=StandardContentClaim
[resourceClaim=StandardResourceClaim[id=1504121074874-335,
container=default, section=335], offset=1028220,
length=454],offset=0,name=626191417134736,size=454]:
org.apache.nifi.processor.exception.ProcessException: Failed to parse
incoming data
org.apache.nifi.processor.exception.ProcessException: Failed to parse
incoming data
at
org.apache.nifi.processors.standard.SplitRecord$1.process(SplitRecord.java:187)
at
org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2136)
at
org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2106)
at
org.apache.nifi.processors.standard.SplitRecord.onTrigger(SplitRecord.java:149)
at
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1120)
at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
at
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.nifi.schema.access.SchemaNotFoundException:
org.apache.avro.AvroRuntimeException: Not a record:
{"type":"array","items":{"type":"record","name":"root","fields":[{"name":"id","type":"string","doc":"Type
inferred from
'\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"},{"name":"account_id","type":"string","doc":"Type
inferred from
'\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"},{"name":"contact_id","type":"string","doc":"Type
inferred from
'\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"},{"name":"date_modified","type":"long","doc":"Type
inferred from '1503959931000'"},{"name":"deleted","type":"int","doc":"Type
inferred from '0'"}]}}
at
org.apache.nifi.schema.access.AvroSchemaTextStrategy.getSchema(AvroSchemaTextStrategy.java:55)
at
org.apache.nifi.serialization.SchemaRegistryService.getSchema(SchemaRegistryService.java:112)
at org.apache.nifi.avro.AvroReader.createRecordReader(AvroReader.java:92)
at sun.reflect.GeneratedMethodAccessor1467.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:89)
at com.sun.proxy.$Proxy158.createRecordReader(Unknown Source)
at
org.apache.nifi.processors.standard.SplitRecord$1.process(SplitRecord.java:152)
... 15 common frames omitted
Caused by: org.apache.avro.AvroRuntimeException: Not a record:
{"type":"array","items":{"type":"record","name":"root","fields":[{"name":"id","type":"string","doc":"Type
inferred from
'\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"},{"name":"account_id","type":"string","doc":"Type
inferred from
'\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"},{"name":"contact_id","type":"string","doc":"Type
inferred from
'\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"},{"name":"date_modified","type":"long","doc":"Type
inferred from '1503959931000'"},{"name":"deleted","type":"int","doc":"Type
inferred from '0'"}]}}
at org.apache.avro.Schema.getFields(Schema.java:220)
at org.apache.nifi.avro.AvroTypeUtil.createSchema(AvroTypeUtil.java:218)
at org.apache.nifi.avro.AvroTypeUtil.createSchema(AvroTypeUtil.java:202)
at
org.apache.nifi.schema.access.AvroSchemaTextStrategy.getSchema(AvroSchemaTextStrategy.java:53)
... 23 common frames omitted
Can somebody point me to what I'm doing wrong? Or suggest an alternative
approach to chunking arbitrary JSON arrays?
Thanks,
Neil