Viking,

I've put a PR [1] up to handle the issue, turns out there was too much
copy-paste from the ConvertAvroToORC stuff, and we should've been
using only the NiFi Record API rather than Avro at all. That involved
a few more changes than I originally anticipated, but it seems to be
working now, both from a regression sense as well as for your nested
record issue.  Please update the PR with any
findings/comments/questions if you get a chance to verify. Also I'd
ask an Apache NiFi committer to review/merge at their earliest
convenience :)

All,

I'd like to get this into NiFi 1.8.0 as otherwise PutORC (which
purports to be a large improvement over ConvertAvroToORC -> PutHDFS)
would not handle nested records. The workaround would be to force the
user to flatten the records both in NiFi and in the target Hive
table(s).

Regards,
Matt

[1] https://github.com/apache/nifi/pull/3057

On Mon, Oct 8, 2018 at 2:45 PM Ei i <[email protected]> wrote:
>
> Hi Matt,
>
> I have created https://issues.apache.org/jira/browse/NIFI-5667 to handle this.
>
> I can also do verification once there is a fix available.
>
>
> Best regards
> /Viking
> ________________________________
> From: Matt Burgess <[email protected]>
> Sent: Monday, October 8, 2018 5:39 PM
> To: [email protected]
> Subject: Re: Hive3 PutOrc processors, error when using nestled Avro Record 
> types
>
> Viking,
>
> This is indeed a bug, the code (copied from ConvertAvroToORC and its
> util classes) still expects Avro objects but PutORC uses NiFi Record
> objects.  Please file a Jira, I will look into this immediately.
>
> Regards,
> Matt
>
> On Sat, Oct 6, 2018 at 10:17 AM Ei i <[email protected]> wrote:
> >
> > Hi,
> >
> > I have been testing out the new PutOrc processor that was introduced in 1.7 
> > to see if I can replace the ConvertAvroToOrc processer I currently use.
> > When I sent in some of the complex Avro messages in my flow I encountered 
> > the following error (see full stack further down)
> > java.lang.IllegalArgumentException: Error converting object of type 
> > org.apache.nifi.serialization.record.MapRecord to ORC type
> >
> > The older ConvertAvroToOrc processor processed the flowfile without issues.
> > Also to note is that the PutOrc processor handles the flowfile fine if 
> > there is no Avro data with only the schema present.
> > It seems to be related to nestled "Record" types.
> >
> > Should I create a bug myselfe or just report it here in the mailing list?
> >
> > How to reproduce:
> >
> > Avro schema: bug.avsc
> > {
> >   "name": "nifi_hive3_test",
> >   "namespace": "analytics.models.test",
> >   "type": "record",
> >   "fields": [
> >        {
> >       "name": "Serial",
> >       "type":
> >       {
> >         "name": "Serial",
> >         "namespace": "analytics.models.common.serial",
> >         "type": "record",
> >         "fields": [
> >           {
> >             "name": "Serial",
> >             "type": "long"
> >           }
> >         ]
> >       }
> >     }
> >   ]
> > }
> >
> > Small python script to create an Avro file.
> > import avro.schema
> > from avro.datafile import DataFileReader, DataFileWriter
> > from avro.io import DatumReader, DatumWriter
> >
> > schema = avro.schema.parse(open("bug.avsc", "rb").read())
> >
> > writer = DataFileWriter(open("bug.avro", "wb"), DatumWriter(), schema)
> > writer.append({'Serial': {'Serial': 11088000000001615L}})
> > writer.close()
> > #Print whats entered into the avro file
> > reader1 = DataFileReader(open("bug.avro", "rb"), DatumReader())
> > for user in reader1:
> >     print user
> >
> > Then just load the avro file using ListFIle -> FetchFile
> >
> > Full error message:
> > 2018-10-06 15:54:10,201 ERROR [Timer-Driven Process Thread-8] 
> > org.apache.nifi.processors.orc.PutORC 
> > PutORC[id=8be207cb-b16e-3578-1765-1c9e0c0aa383] Failed to write due to 
> > java.lang.IllegalArgumentException: Error converting object of type 
> > org.apache.nifi.serialization.record.MapRecord to ORC type 
> > struct<serial:bigint>: java.lang.IllegalArgumentException: Error converting 
> > object of type org.apache.nifi.serialization.record.MapRecord to ORC type 
> > struct<serial:bigint>
> > java.lang.IllegalArgumentException: Error converting object of type 
> > org.apache.nifi.serialization.record.MapRecord to ORC type 
> > struct<serial:bigint>
> >         at 
> > org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils.convertToORCObject(NiFiOrcUtils.java:206)
> >         at 
> > org.apache.nifi.processors.orc.record.ORCHDFSRecordWriter.write(ORCHDFSRecordWriter.java:71)
> >         at 
> > org.apache.nifi.processors.orc.record.ORCHDFSRecordWriter.write(ORCHDFSRecordWriter.java:91)
> >         at 
> > org.apache.nifi.processors.hadoop.AbstractPutHDFSRecord.lambda$null$0(AbstractPutHDFSRecord.java:324)
> >         at 
> > org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2218)
> >         at 
> > org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2186)
> >         at 
> > org.apache.nifi.processors.hadoop.AbstractPutHDFSRecord.lambda$onTrigger$1(AbstractPutHDFSRecord.java:305)
> >         at java.security.AccessController.doPrivileged(Native Method)
> >         at javax.security.auth.Subject.doAs(Subject.java:360)
> >         at 
> > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1662)
> >         at 
> > org.apache.nifi.processors.hadoop.AbstractPutHDFSRecord.onTrigger(AbstractPutHDFSRecord.java:272)
> >         at 
> > org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
> >         at 
> > org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)
> >         at 
> > org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)
> >         at 
> > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
> >         at 
> > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> >         at 
> > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> >         at 
> > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> >         at 
> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >         at 
> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >         at java.lang.Thread.run(Thread.java:748)
> >
> >
> > Best regards
> > Viking

Reply via email to