Shawn, What version of NiFi are you using? I'm wondering if you are running into NIFI-4232 [1] or NIFI-4857 [2] or something like that. If isCompatibleDataType() is the offender, try putting your byte[] into a ByteBuffer.
Regards, Matt [1] https://issues.apache.org/jira/browse/NIFI-4232 [2] https://issues.apache.org/jira/browse/NIFI-4857 On Mon, Sep 17, 2018 at 3:06 PM Shawn Weeks <[email protected]> wrote: > > So I've tried the Apache Commons ArrayUtils.toObject and now I get this which > isn't much different. > > > new MapRecord(recordSchema,[ > 'id':variables.uuid, > 'file_name':variables."original_filename", > 'file_path':variables.path, > 'file_size':variables."file.size", > 'flow_file':ArrayUtils.toObject(inputStream.getBytes()) > ] > > > 2018-09-17 13:59:05,291 ERROR [Timer-Driven Process Thread-220] > o.a.nifi.processors.standard.MergeRecord > MergeRecord[id=b81d3d84-4ffc-110b-8276-64270d885e10] Failed to create merged > FlowFile from 1 input FlowFiles; routing originals to failure: > org.apache.nifi.serialization.record.util.IllegalTypeConversionException: > Cannot convert value [Ljava.lang.Byte;@bd6d229 of type class > [Ljava.lang.Byte; because no compatible types exist in the UNION for field > flow_file > org.apache.nifi.serialization.record.util.IllegalTypeConversionException: > Cannot convert value [Ljava.lang.Byte;@bd6d229 of type class > [Ljava.lang.Byte; because no compatible types exist in the UNION for field > flow_file > at > org.apache.nifi.avro.AvroTypeUtil.convertUnionFieldValue(AvroTypeUtil.java:708) > at > org.apache.nifi.avro.AvroTypeUtil.convertToAvroObject(AvroTypeUtil.java:607) > at > org.apache.nifi.avro.AvroTypeUtil.createAvroRecord(AvroTypeUtil.java:456) > at > org.apache.nifi.avro.WriteAvroResultWithSchema.writeRecord(WriteAvroResultWithSchema.java:60) > at > org.apache.nifi.serialization.AbstractRecordSetWriter.write(AbstractRecordSetWriter.java:59) > at > org.apache.nifi.processors.standard.merge.RecordBin.offer(RecordBin.java:142) > at > org.apache.nifi.processors.standard.merge.RecordBinManager.add(RecordBinManager.java:154) > at > org.apache.nifi.processors.standard.MergeRecord.binFlowFile(MergeRecord.java:327) > at > org.apache.nifi.processors.standard.MergeRecord.onTrigger(MergeRecord.java:294) > at > org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1124) > at > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147) > at > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) > at > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > > ________________________________ > From: Mark Payne <[email protected]> > Sent: Monday, September 17, 2018 1:54:57 PM > To: [email protected] > Subject: Re: Scripted Record Reader - Missing Something Obvious > > Shawn, > > I believe the issue that you're running into is that you're defining the > 'flow_file' field of an Array of type Byte. > Which means that it is expecting as its value an object of type Byte[] but > you are passing it an object of type byte[]. > You'd have to create a Byte[] instead, using the object wrapper instead of > the primitive byte array. > > Thanks > -Mark > > > On Sep 17, 2018, at 2:12 PM, Shawn Weeks <[email protected]> wrote: > > Let's say I'm trying to store the contents of a flow file in byte array > column in Avro. I've defined a recordSchema in Groovy like this. > > def recordSchema = new SimpleRecordSchema( > [ > new RecordField('id',RecordFieldType.STRING.dataType), > new RecordField('file_name',RecordFieldType.STRING.dataType), > new RecordField('file_path',RecordFieldType.STRING.dataType), > new RecordField('file_size',RecordFieldType.LONG.dataType), > new > RecordField('flow_file',RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.dataType)) > ] > ) > > And I'm attempting to populate nextRecord with this. > > new MapRecord(recordSchema,[ > 'id':variables.uuid, > 'file_name':variables."original_filename", > 'file_path':variables.path, > 'file_size':variables."file.size", > 'flow_file':inputStream.getBytes() > ] > > > And I get this error. I'm missing something about how to store the contents > of the flowfile as a binary column Avro but I'm not sure what I'm missing. > > org.apache.nifi.serialization.record.util.IllegalTypeConversionException: > Cannot convert value [B@6c4141cf of type class [B because no compatible types > exist in the UNION for field flow_file > at > org.apache.nifi.avro.AvroTypeUtil.convertUnionFieldValue(AvroTypeUtil.java:708) > at > org.apache.nifi.avro.AvroTypeUtil.convertToAvroObject(AvroTypeUtil.java:607) > at > org.apache.nifi.avro.AvroTypeUtil.createAvroRecord(AvroTypeUtil.java:456) > at > org.apache.nifi.avro.WriteAvroResultWithSchema.writeRecord(WriteAvroResultWithSchema.java:60) > at > org.apache.nifi.serialization.AbstractRecordSetWriter.write(AbstractRecordSetWriter.java:59) > at > org.apache.nifi.processors.standard.merge.RecordBin.offer(RecordBin.java:142) > at > org.apache.nifi.processors.standard.merge.RecordBinManager.add(RecordBinManager.java:154) > at > org.apache.nifi.processors.standard.MergeRecord.binFlowFile(MergeRecord.java:327) > at > org.apache.nifi.processors.standard.MergeRecord.onTrigger(MergeRecord.java:294) > at > org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1124) > at > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147) > at > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) > at > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > > > Thanks > Shawn Weeks > >
