Re: JSON array chunking

2017-09-01 Thread Neil Derraugh
Hi Bryan,

Thanks for the tip, that was very helpful, and helped me finish.  I wanted
essentially your flow with a schema inference and no schema registry.  The
flow takes an arbitrary (no pre-defined schema other than being an array)
JSON array and chunks it out.  I attached the final result for anyone who's
curious.

Neil

On Thu, Aug 31, 2017 at 9:28 AM, Bryan Bende  wrote:

> Neil,
>
> I'm a little confused as to what format your initial data is in... You
> showed an example payload as JSON, but then mentioned using an
> AvroReader, so it wasn't clear to me if your starting point is JSON or
> Avro.
>
> Assuming it is JSON, I put together a template that shows how to split
> your sample data:
>
> https://gist.github.com/bbende/f73d06c0d35ed1aeb2603a8f87276ed7
>
> I used the second schema you have (the one where the top-level element
> is a record) and then SplitRecord with a JsonTreeReader and
> JsonRecordSetWriter.
>
> The sample data I sent in was your example data, and it produced two
> flow files coming out of SplitRecord, one for each element of the
> array.
>
> Let us know if this not what you are trying to do.
>
> Thanks,
>
> Bryan
>
>
> On Wed, Aug 30, 2017 at 8:31 PM, Neil Derraugh
>  wrote:
> > I should have mentioned I tried starting with a JsonPathReader before the
> > AvroReader.  I had a property I was calling root with a value of $.  I
> can
> > post details about that too if it would be helpful.
> >
> > On Wed, Aug 30, 2017 at 8:08 PM, Neil Derraugh
> >  wrote:
> >>
> >> I have arbitrary JSON arrays that I want to split into chunks.  I've
> been
> >> (unsuccessfully) trying to figure this out with InferAvroSchema ->
> >> SplitJson(AvroReader, JsonRecordSetWriter).
> >>
> >> Here's an example payload:
> >> [{
> >> "id": "56740f4b-48de-0502-afdc-59a463b3f6dc",
> >> "account_id": "b0dad7e2-7bb9-4ca9-b9fd-134870656eb2",
> >> "contact_id": "a0ebd53a-77c5-e2ea-4787-59a463053b1b",
> >> "date_modified": 1503959931000,
> >> "deleted": 0
> >>   },
> >>   {
> >> "id": "1ac80e25-7f28-f5c6-bac0-59a4636ef31f",
> >> "account_id": "71d4904e-f8f1-4209-bff9-4d080057ea84",
> >> "contact_id": "e429bfe6-9c89-8b81-9ee6-59a463fc7fd8",
> >> "date_modified": 1503959873000,
> >> "deleted": 0
> >>   }]
> >>
> >> Here's the schema that gets inferred (the AvroReader's Avro Record Name
> is
> >> "root"):
> >> {
> >>   "type": "array",
> >>   "items": {
> >> "type": "record",
> >> "name": "root",
> >> "fields": [
> >>   {
> >> "name": "id",
> >> "type": "string",
> >> "doc": "Type inferred from
> >> '\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"
> >>   },
> >>   {
> >> "name": "account_id",
> >> "type": "string",
> >> "doc": "Type inferred from
> >> '\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"
> >>   },
> >>   {
> >> "name": "contact_id",
> >> "type": "string",
> >> "doc": "Type inferred from
> >> '\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"
> >>   },
> >>   {
> >> "name": "date_modified",
> >> "type": "long",
> >> "doc": "Type inferred from '1503959931000'"
> >>   },
> >>   {
> >> "name": "deleted",
> >> "type": "int",
> >> "doc": "Type inferred from '0'"
> >>   }
> >> ]
> >>   }
> >> }
> >>
> >> When I use ${inferred.avro.schema} for both the AvroReader and the
> >> JsonRecordSetWriter I get:
> >> SplitRecord[id=b3453515-caaa-1e1f-8bb6-26dec275a0d5] Failed to create
> >> Record Writer for
> >> StandardFlowFileRecord[uuid=45d7a0d2-258a-4f40-b5f9-4886eb2c2a76,claim=
> StandardContentClaim
> >> [resourceClaim=StandardResourceClaim[id=1504118228480-325,
> >> container=default, section=325], offset=0,
> >> length=86462199],offset=0,name=accounts-contacts.json.
> avro,size=86462199];
> >> routing to failure: org.apache.nifi.schema.access.
> SchemaNotFoundException:
> >> org.apache.avro.AvroRuntimeException: Not a record:
> >> {"type":"array","items":{"type":"record","name":"root","
> fields":[{"name":"id","type":"string","doc":"Type
> >> inferred from
> >> '\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"},{"name":"
> account_id","type":"string","doc":"Type
> >> inferred from
> >> '\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"},{"name":"
> contact_id","type":"string","doc":"Type
> >> inferred from
> >> '\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"},{"name":"
> date_modified","type":"long","doc":"Type
> >> inferred from '1503959931000'"},{"name":"deleted","type":"int","doc":"
> Type
> >> inferred from '0'"}]}}.
> >>
> >> The stack trace:
> >> 2017-08-30 19:42:21,692 ERROR [Timer-Driven Process Thread-9]
> >> o.a.nifi.processors.standard.SplitRecord
> >> SplitRecord[id=b3453515-caaa-1e1f-8bb6-26dec275a0d5] Failed to create
> Record
> >> Writer for
> >> StandardFlowFileRecord[uuid=a5f720cf-98a8-4c29-bd91-098c7f25448d,claim=
> 

Re: JSON array chunking

2017-08-31 Thread Bryan Bende
Neil,

I'm a little confused as to what format your initial data is in... You
showed an example payload as JSON, but then mentioned using an
AvroReader, so it wasn't clear to me if your starting point is JSON or
Avro.

Assuming it is JSON, I put together a template that shows how to split
your sample data:

https://gist.github.com/bbende/f73d06c0d35ed1aeb2603a8f87276ed7

I used the second schema you have (the one where the top-level element
is a record) and then SplitRecord with a JsonTreeReader and
JsonRecordSetWriter.

The sample data I sent in was your example data, and it produced two
flow files coming out of SplitRecord, one for each element of the
array.

Let us know if this not what you are trying to do.

Thanks,

Bryan


On Wed, Aug 30, 2017 at 8:31 PM, Neil Derraugh
 wrote:
> I should have mentioned I tried starting with a JsonPathReader before the
> AvroReader.  I had a property I was calling root with a value of $.  I can
> post details about that too if it would be helpful.
>
> On Wed, Aug 30, 2017 at 8:08 PM, Neil Derraugh
>  wrote:
>>
>> I have arbitrary JSON arrays that I want to split into chunks.  I've been
>> (unsuccessfully) trying to figure this out with InferAvroSchema ->
>> SplitJson(AvroReader, JsonRecordSetWriter).
>>
>> Here's an example payload:
>> [{
>> "id": "56740f4b-48de-0502-afdc-59a463b3f6dc",
>> "account_id": "b0dad7e2-7bb9-4ca9-b9fd-134870656eb2",
>> "contact_id": "a0ebd53a-77c5-e2ea-4787-59a463053b1b",
>> "date_modified": 1503959931000,
>> "deleted": 0
>>   },
>>   {
>> "id": "1ac80e25-7f28-f5c6-bac0-59a4636ef31f",
>> "account_id": "71d4904e-f8f1-4209-bff9-4d080057ea84",
>> "contact_id": "e429bfe6-9c89-8b81-9ee6-59a463fc7fd8",
>> "date_modified": 1503959873000,
>> "deleted": 0
>>   }]
>>
>> Here's the schema that gets inferred (the AvroReader's Avro Record Name is
>> "root"):
>> {
>>   "type": "array",
>>   "items": {
>> "type": "record",
>> "name": "root",
>> "fields": [
>>   {
>> "name": "id",
>> "type": "string",
>> "doc": "Type inferred from
>> '\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"
>>   },
>>   {
>> "name": "account_id",
>> "type": "string",
>> "doc": "Type inferred from
>> '\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"
>>   },
>>   {
>> "name": "contact_id",
>> "type": "string",
>> "doc": "Type inferred from
>> '\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"
>>   },
>>   {
>> "name": "date_modified",
>> "type": "long",
>> "doc": "Type inferred from '1503959931000'"
>>   },
>>   {
>> "name": "deleted",
>> "type": "int",
>> "doc": "Type inferred from '0'"
>>   }
>> ]
>>   }
>> }
>>
>> When I use ${inferred.avro.schema} for both the AvroReader and the
>> JsonRecordSetWriter I get:
>> SplitRecord[id=b3453515-caaa-1e1f-8bb6-26dec275a0d5] Failed to create
>> Record Writer for
>> StandardFlowFileRecord[uuid=45d7a0d2-258a-4f40-b5f9-4886eb2c2a76,claim=StandardContentClaim
>> [resourceClaim=StandardResourceClaim[id=1504118228480-325,
>> container=default, section=325], offset=0,
>> length=86462199],offset=0,name=accounts-contacts.json.avro,size=86462199];
>> routing to failure: org.apache.nifi.schema.access.SchemaNotFoundException:
>> org.apache.avro.AvroRuntimeException: Not a record:
>> {"type":"array","items":{"type":"record","name":"root","fields":[{"name":"id","type":"string","doc":"Type
>> inferred from
>> '\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"},{"name":"account_id","type":"string","doc":"Type
>> inferred from
>> '\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"},{"name":"contact_id","type":"string","doc":"Type
>> inferred from
>> '\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"},{"name":"date_modified","type":"long","doc":"Type
>> inferred from '1503959931000'"},{"name":"deleted","type":"int","doc":"Type
>> inferred from '0'"}]}}.
>>
>> The stack trace:
>> 2017-08-30 19:42:21,692 ERROR [Timer-Driven Process Thread-9]
>> o.a.nifi.processors.standard.SplitRecord
>> SplitRecord[id=b3453515-caaa-1e1f-8bb6-26dec275a0d5] Failed to create Record
>> Writer for
>> StandardFlowFileRecord[uuid=a5f720cf-98a8-4c29-bd91-098c7f25448d,claim=StandardContentClaim
>> [resourceClaim=StandardResourceClaim[id=1504121074997-336,
>> container=default, section=336], offset=1013917,
>> length=454],offset=0,name=626851422080935,size=454]; routing to failure:
>> org.apache.nifi.schema.access.SchemaNotFoundException:
>> org.apache.avro.AvroRuntimeException: Not a record:
>> {"type":"array","items":{"type":"record","name":"root","fields":[{"name":"id","type":"string","doc":"Type
>> inferred from
>> '\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"},{"name":"account_id","type":"string","doc":"Type
>> inferred from
>> '\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"},{"name":"contact_id","type":"string","doc":"Type
>> inferred from

Re: JSON array chunking

2017-08-30 Thread Neil Derraugh
I should have mentioned I tried starting with a JsonPathReader before the
AvroReader.  I had a property I was calling root with a value of $.  I can
post details about that too if it would be helpful.

On Wed, Aug 30, 2017 at 8:08 PM, Neil Derraugh <
neil.derra...@intellifylearning.com> wrote:

> I have arbitrary JSON arrays that I want to split into chunks.  I've been
> (unsuccessfully) trying to figure this out with InferAvroSchema ->
> SplitJson(AvroReader, JsonRecordSetWriter).
>
> Here's an example payload:
> [{
> "id": "56740f4b-48de-0502-afdc-59a463b3f6dc",
> "account_id": "b0dad7e2-7bb9-4ca9-b9fd-134870656eb2",
> "contact_id": "a0ebd53a-77c5-e2ea-4787-59a463053b1b",
> "date_modified": 1503959931000,
> "deleted": 0
>   },
>   {
> "id": "1ac80e25-7f28-f5c6-bac0-59a4636ef31f",
> "account_id": "71d4904e-f8f1-4209-bff9-4d080057ea84",
> "contact_id": "e429bfe6-9c89-8b81-9ee6-59a463fc7fd8",
> "date_modified": 1503959873000,
> "deleted": 0
>   }]
>
> Here's the schema that gets inferred (the AvroReader's Avro Record Name is
> "root"):
> {
>   "type": "array",
>   "items": {
> "type": "record",
> "name": "root",
> "fields": [
>   {
> "name": "id",
> "type": "string",
> "doc": "Type inferred from '\"56740f4b-48de-0502-afdc-
> 59a463b3f6dc\"'"
>   },
>   {
> "name": "account_id",
> "type": "string",
> "doc": "Type inferred from '\"b0dad7e2-7bb9-4ca9-b9fd-
> 134870656eb2\"'"
>   },
>   {
> "name": "contact_id",
> "type": "string",
> "doc": "Type inferred from '\"a0ebd53a-77c5-e2ea-4787-
> 59a463053b1b\"'"
>   },
>   {
> "name": "date_modified",
> "type": "long",
> "doc": "Type inferred from '1503959931000'"
>   },
>   {
> "name": "deleted",
> "type": "int",
> "doc": "Type inferred from '0'"
>   }
> ]
>   }
> }
>
> When I use ${inferred.avro.schema} for both the AvroReader and the
> JsonRecordSetWriter I get:
> SplitRecord[id=b3453515-caaa-1e1f-8bb6-26dec275a0d5] Failed to create
> Record Writer for StandardFlowFileRecord[uuid=45d7a0d2-258a-4f40-b5f9-
> 4886eb2c2a76,claim=StandardContentClaim [resourceClaim=
> StandardResourceClaim[id=1504118228480-325, container=default,
> section=325], offset=0, length=86462199],offset=0,
> name=accounts-contacts.json.avro,size=86462199]; routing to failure:
> org.apache.nifi.schema.access.SchemaNotFoundException: 
> org.apache.avro.AvroRuntimeException:
> Not a record: {"type":"array","items":{"type":"record","name":"root","
> fields":[{"name":"id","type":"string","doc":"Type inferred from
> '\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"},{"name":"
> account_id","type":"string","doc":"Type inferred from
> '\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"},{"name":"
> contact_id","type":"string","doc":"Type inferred from
> '\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"},{"name":"
> date_modified","type":"long","doc":"Type inferred from
> '1503959931000'"},{"name":"deleted","type":"int","doc":"Type inferred
> from '0'"}]}}.
>
> The stack trace:
> 2017-08-30 19:42:21,692 ERROR [Timer-Driven Process Thread-9]
> o.a.nifi.processors.standard.SplitRecord 
> SplitRecord[id=b3453515-caaa-1e1f-8bb6-26dec275a0d5]
> Failed to create Record Writer for StandardFlowFileRecord[uuid=
> a5f720cf-98a8-4c29-bd91-098c7f25448d,claim=StandardContentClaim
> [resourceClaim=StandardResourceClaim[id=1504121074997-336,
> container=default, section=336], offset=1013917, 
> length=454],offset=0,name=626851422080935,size=454];
> routing to failure: org.apache.nifi.schema.access.SchemaNotFoundException:
> org.apache.avro.AvroRuntimeException: Not a record:
> {"type":"array","items":{"type":"record","name":"root","
> fields":[{"name":"id","type":"string","doc":"Type inferred from
> '\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"},{"name":"
> account_id","type":"string","doc":"Type inferred from
> '\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"},{"name":"
> contact_id","type":"string","doc":"Type inferred from
> '\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"},{"name":"
> date_modified","type":"long","doc":"Type inferred from
> '1503959931000'"},{"name":"deleted","type":"int","doc":"Type inferred
> from '0'"}]}}
> org.apache.nifi.schema.access.SchemaNotFoundException: 
> org.apache.avro.AvroRuntimeException:
> Not a record: {"type":"array","items":{"type":"record","name":"root","
> fields":[{"name":"id","type":"string","doc":"Type inferred from
> '\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"},{"name":"
> account_id","type":"string","doc":"Type inferred from
> '\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"},{"name":"
> contact_id","type":"string","doc":"Type inferred from
> '\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"},{"name":"
> date_modified","type":"long","doc":"Type inferred from
> '1503959931000'"},{"name":"deleted","type":"int","doc":"Type inferred
> from '0'"}]}}
> at 

JSON array chunking

2017-08-30 Thread Neil Derraugh
I have arbitrary JSON arrays that I want to split into chunks.  I've been
(unsuccessfully) trying to figure this out with InferAvroSchema ->
SplitJson(AvroReader, JsonRecordSetWriter).

Here's an example payload:
[{
"id": "56740f4b-48de-0502-afdc-59a463b3f6dc",
"account_id": "b0dad7e2-7bb9-4ca9-b9fd-134870656eb2",
"contact_id": "a0ebd53a-77c5-e2ea-4787-59a463053b1b",
"date_modified": 1503959931000,
"deleted": 0
  },
  {
"id": "1ac80e25-7f28-f5c6-bac0-59a4636ef31f",
"account_id": "71d4904e-f8f1-4209-bff9-4d080057ea84",
"contact_id": "e429bfe6-9c89-8b81-9ee6-59a463fc7fd8",
"date_modified": 1503959873000,
"deleted": 0
  }]

Here's the schema that gets inferred (the AvroReader's Avro Record Name is
"root"):
{
  "type": "array",
  "items": {
"type": "record",
"name": "root",
"fields": [
  {
"name": "id",
"type": "string",
"doc": "Type inferred from
'\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"
  },
  {
"name": "account_id",
"type": "string",
"doc": "Type inferred from
'\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"
  },
  {
"name": "contact_id",
"type": "string",
"doc": "Type inferred from
'\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"
  },
  {
"name": "date_modified",
"type": "long",
"doc": "Type inferred from '1503959931000'"
  },
  {
"name": "deleted",
"type": "int",
"doc": "Type inferred from '0'"
  }
]
  }
}

When I use ${inferred.avro.schema} for both the AvroReader and the
JsonRecordSetWriter I get:
SplitRecord[id=b3453515-caaa-1e1f-8bb6-26dec275a0d5] Failed to create
Record Writer for
StandardFlowFileRecord[uuid=45d7a0d2-258a-4f40-b5f9-4886eb2c2a76,claim=StandardContentClaim
[resourceClaim=StandardResourceClaim[id=1504118228480-325,
container=default, section=325], offset=0,
length=86462199],offset=0,name=accounts-contacts.json.avro,size=86462199];
routing to failure: org.apache.nifi.schema.access.SchemaNotFoundException:
org.apache.avro.AvroRuntimeException: Not a record:
{"type":"array","items":{"type":"record","name":"root","fields":[{"name":"id","type":"string","doc":"Type
inferred from
'\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"},{"name":"account_id","type":"string","doc":"Type
inferred from
'\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"},{"name":"contact_id","type":"string","doc":"Type
inferred from
'\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"},{"name":"date_modified","type":"long","doc":"Type
inferred from '1503959931000'"},{"name":"deleted","type":"int","doc":"Type
inferred from '0'"}]}}.

The stack trace:
2017-08-30 19:42:21,692 ERROR [Timer-Driven Process Thread-9]
o.a.nifi.processors.standard.SplitRecord
SplitRecord[id=b3453515-caaa-1e1f-8bb6-26dec275a0d5] Failed to create
Record Writer for
StandardFlowFileRecord[uuid=a5f720cf-98a8-4c29-bd91-098c7f25448d,claim=StandardContentClaim
[resourceClaim=StandardResourceClaim[id=1504121074997-336,
container=default, section=336], offset=1013917,
length=454],offset=0,name=626851422080935,size=454]; routing to failure:
org.apache.nifi.schema.access.SchemaNotFoundException:
org.apache.avro.AvroRuntimeException: Not a record:
{"type":"array","items":{"type":"record","name":"root","fields":[{"name":"id","type":"string","doc":"Type
inferred from
'\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"},{"name":"account_id","type":"string","doc":"Type
inferred from
'\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"},{"name":"contact_id","type":"string","doc":"Type
inferred from
'\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"},{"name":"date_modified","type":"long","doc":"Type
inferred from '1503959931000'"},{"name":"deleted","type":"int","doc":"Type
inferred from '0'"}]}}
org.apache.nifi.schema.access.SchemaNotFoundException:
org.apache.avro.AvroRuntimeException: Not a record:
{"type":"array","items":{"type":"record","name":"root","fields":[{"name":"id","type":"string","doc":"Type
inferred from
'\"56740f4b-48de-0502-afdc-59a463b3f6dc\"'"},{"name":"account_id","type":"string","doc":"Type
inferred from
'\"b0dad7e2-7bb9-4ca9-b9fd-134870656eb2\"'"},{"name":"contact_id","type":"string","doc":"Type
inferred from
'\"a0ebd53a-77c5-e2ea-4787-59a463053b1b\"'"},{"name":"date_modified","type":"long","doc":"Type
inferred from '1503959931000'"},{"name":"deleted","type":"int","doc":"Type
inferred from '0'"}]}}
at
org.apache.nifi.schema.access.AvroSchemaTextStrategy.getSchema(AvroSchemaTextStrategy.java:55)
at
org.apache.nifi.serialization.SchemaRegistryService.getSchema(SchemaRegistryService.java:112)
at sun.reflect.GeneratedMethodAccessor1466.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:89)
at com.sun.proxy.$Proxy144.getSchema(Unknown Source)
at