Viking, I've put a PR [1] up to handle the issue, turns out there was too much copy-paste from the ConvertAvroToORC stuff, and we should've been using only the NiFi Record API rather than Avro at all. That involved a few more changes than I originally anticipated, but it seems to be working now, both from a regression sense as well as for your nested record issue. Please update the PR with any findings/comments/questions if you get a chance to verify. Also I'd ask an Apache NiFi committer to review/merge at their earliest convenience :)
All, I'd like to get this into NiFi 1.8.0 as otherwise PutORC (which purports to be a large improvement over ConvertAvroToORC -> PutHDFS) would not handle nested records. The workaround would be to force the user to flatten the records both in NiFi and in the target Hive table(s). Regards, Matt [1] https://github.com/apache/nifi/pull/3057 On Mon, Oct 8, 2018 at 2:45 PM Ei i <[email protected]> wrote: > > Hi Matt, > > I have created https://issues.apache.org/jira/browse/NIFI-5667 to handle this. > > I can also do verification once there is a fix available. > > > Best regards > /Viking > ________________________________ > From: Matt Burgess <[email protected]> > Sent: Monday, October 8, 2018 5:39 PM > To: [email protected] > Subject: Re: Hive3 PutOrc processors, error when using nestled Avro Record > types > > Viking, > > This is indeed a bug, the code (copied from ConvertAvroToORC and its > util classes) still expects Avro objects but PutORC uses NiFi Record > objects. Please file a Jira, I will look into this immediately. > > Regards, > Matt > > On Sat, Oct 6, 2018 at 10:17 AM Ei i <[email protected]> wrote: > > > > Hi, > > > > I have been testing out the new PutOrc processor that was introduced in 1.7 > > to see if I can replace the ConvertAvroToOrc processer I currently use. > > When I sent in some of the complex Avro messages in my flow I encountered > > the following error (see full stack further down) > > java.lang.IllegalArgumentException: Error converting object of type > > org.apache.nifi.serialization.record.MapRecord to ORC type > > > > The older ConvertAvroToOrc processor processed the flowfile without issues. > > Also to note is that the PutOrc processor handles the flowfile fine if > > there is no Avro data with only the schema present. > > It seems to be related to nestled "Record" types. > > > > Should I create a bug myselfe or just report it here in the mailing list? > > > > How to reproduce: > > > > Avro schema: bug.avsc > > { > > "name": "nifi_hive3_test", > > "namespace": "analytics.models.test", > > "type": "record", > > "fields": [ > > { > > "name": "Serial", > > "type": > > { > > "name": "Serial", > > "namespace": "analytics.models.common.serial", > > "type": "record", > > "fields": [ > > { > > "name": "Serial", > > "type": "long" > > } > > ] > > } > > } > > ] > > } > > > > Small python script to create an Avro file. > > import avro.schema > > from avro.datafile import DataFileReader, DataFileWriter > > from avro.io import DatumReader, DatumWriter > > > > schema = avro.schema.parse(open("bug.avsc", "rb").read()) > > > > writer = DataFileWriter(open("bug.avro", "wb"), DatumWriter(), schema) > > writer.append({'Serial': {'Serial': 11088000000001615L}}) > > writer.close() > > #Print whats entered into the avro file > > reader1 = DataFileReader(open("bug.avro", "rb"), DatumReader()) > > for user in reader1: > > print user > > > > Then just load the avro file using ListFIle -> FetchFile > > > > Full error message: > > 2018-10-06 15:54:10,201 ERROR [Timer-Driven Process Thread-8] > > org.apache.nifi.processors.orc.PutORC > > PutORC[id=8be207cb-b16e-3578-1765-1c9e0c0aa383] Failed to write due to > > java.lang.IllegalArgumentException: Error converting object of type > > org.apache.nifi.serialization.record.MapRecord to ORC type > > struct<serial:bigint>: java.lang.IllegalArgumentException: Error converting > > object of type org.apache.nifi.serialization.record.MapRecord to ORC type > > struct<serial:bigint> > > java.lang.IllegalArgumentException: Error converting object of type > > org.apache.nifi.serialization.record.MapRecord to ORC type > > struct<serial:bigint> > > at > > org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils.convertToORCObject(NiFiOrcUtils.java:206) > > at > > org.apache.nifi.processors.orc.record.ORCHDFSRecordWriter.write(ORCHDFSRecordWriter.java:71) > > at > > org.apache.nifi.processors.orc.record.ORCHDFSRecordWriter.write(ORCHDFSRecordWriter.java:91) > > at > > org.apache.nifi.processors.hadoop.AbstractPutHDFSRecord.lambda$null$0(AbstractPutHDFSRecord.java:324) > > at > > org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2218) > > at > > org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2186) > > at > > org.apache.nifi.processors.hadoop.AbstractPutHDFSRecord.lambda$onTrigger$1(AbstractPutHDFSRecord.java:305) > > at java.security.AccessController.doPrivileged(Native Method) > > at javax.security.auth.Subject.doAs(Subject.java:360) > > at > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1662) > > at > > org.apache.nifi.processors.hadoop.AbstractPutHDFSRecord.onTrigger(AbstractPutHDFSRecord.java:272) > > at > > org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) > > at > > org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165) > > at > > org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203) > > at > > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117) > > at > > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > > at > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > > at > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > > at > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > at java.lang.Thread.run(Thread.java:748) > > > > > > Best regards > > Viking
