The ByteBuffer check wasn't added until 1.7.0 so it may not be in HDF
3.1.2. However if that is the place in the code that's causing
trouble, you may need to convert to Avro's Array class or Java's List.
Still just a hunch though, I haven't tried it :)

The other thing is that if you will be outputting anything but null in
that field (empty string, e.g.), you don't need a nullable
RecordField, so you can add a boolean 'false' param to your
RecordField constructor for that field, and that may get you around
the union issue.

Regards,
Matt

On Mon, Sep 17, 2018 at 3:19 PM Shawn Weeks <[email protected]> wrote:
>
> It's the Hortonworks variation on 1.5. HDF 3.1.2. I'll try the ByteBuffer
>
> ________________________________
> From: Matt Burgess <[email protected]>
> Sent: Monday, September 17, 2018 2:18:30 PM
> To: [email protected]
> Subject: Re: Scripted Record Reader - Missing Something Obvious
>
> Shawn,
>
> What version of NiFi are you using? I'm wondering if you are running
> into NIFI-4232 [1]  or NIFI-4857 [2] or something like that. If
> isCompatibleDataType() is the offender, try putting your byte[] into a
> ByteBuffer.
>
> Regards,
> Matt
>
> [1] https://issues.apache.org/jira/browse/NIFI-4232
> [2] https://issues.apache.org/jira/browse/NIFI-4857
>
> On Mon, Sep 17, 2018 at 3:06 PM Shawn Weeks <[email protected]> wrote:
> >
> > So I've tried the Apache Commons ArrayUtils.toObject and now I get this 
> > which isn't much different.
> >
> >
> > new MapRecord(recordSchema,[
> > 'id':variables.uuid,
> > 'file_name':variables."original_filename",
> > 'file_path':variables.path,
> > 'file_size':variables."file.size",
> > 'flow_file':ArrayUtils.toObject(inputStream.getBytes())
> > ]
> >
> >
> > 2018-09-17 13:59:05,291 ERROR [Timer-Driven Process Thread-220] 
> > o.a.nifi.processors.standard.MergeRecord 
> > MergeRecord[id=b81d3d84-4ffc-110b-8276-64270d885e10] Failed to create 
> > merged FlowFile from 1 input FlowFiles; routing originals to failure: 
> > org.apache.nifi.serialization.record.util.IllegalTypeConversionException: 
> > Cannot convert value [Ljava.lang.Byte;@bd6d229 of type class 
> > [Ljava.lang.Byte; because no compatible types exist in the UNION for field 
> > flow_file
> > org.apache.nifi.serialization.record.util.IllegalTypeConversionException: 
> > Cannot convert value [Ljava.lang.Byte;@bd6d229 of type class 
> > [Ljava.lang.Byte; because no compatible types exist in the UNION for field 
> > flow_file
> >         at 
> > org.apache.nifi.avro.AvroTypeUtil.convertUnionFieldValue(AvroTypeUtil.java:708)
> >         at 
> > org.apache.nifi.avro.AvroTypeUtil.convertToAvroObject(AvroTypeUtil.java:607)
> >         at 
> > org.apache.nifi.avro.AvroTypeUtil.createAvroRecord(AvroTypeUtil.java:456)
> >         at 
> > org.apache.nifi.avro.WriteAvroResultWithSchema.writeRecord(WriteAvroResultWithSchema.java:60)
> >         at 
> > org.apache.nifi.serialization.AbstractRecordSetWriter.write(AbstractRecordSetWriter.java:59)
> >         at 
> > org.apache.nifi.processors.standard.merge.RecordBin.offer(RecordBin.java:142)
> >         at 
> > org.apache.nifi.processors.standard.merge.RecordBinManager.add(RecordBinManager.java:154)
> >         at 
> > org.apache.nifi.processors.standard.MergeRecord.binFlowFile(MergeRecord.java:327)
> >         at 
> > org.apache.nifi.processors.standard.MergeRecord.onTrigger(MergeRecord.java:294)
> >         at 
> > org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1124)
> >         at 
> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
> >         at 
> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
> >         at 
> > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)
> >         at 
> > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> >         at 
> > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> >         at 
> > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> >         at 
> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >         at 
> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >         at java.lang.Thread.run(Thread.java:748)
> >
> > ________________________________
> > From: Mark Payne <[email protected]>
> > Sent: Monday, September 17, 2018 1:54:57 PM
> > To: [email protected]
> > Subject: Re: Scripted Record Reader - Missing Something Obvious
> >
> > Shawn,
> >
> > I believe the issue that you're running into is that you're defining the 
> > 'flow_file' field of an Array of type Byte.
> > Which means that it is expecting as its value an object of type Byte[] but 
> > you are passing it an object of type byte[].
> > You'd have to create a Byte[] instead, using the object wrapper instead of 
> > the primitive byte array.
> >
> > Thanks
> > -Mark
> >
> >
> > On Sep 17, 2018, at 2:12 PM, Shawn Weeks <[email protected]> wrote:
> >
> > Let's say I'm trying to store the contents of a flow file in byte array 
> > column in Avro. I've defined a recordSchema in Groovy like this.
> >
> > def recordSchema = new SimpleRecordSchema(
> > [
> > new RecordField('id',RecordFieldType.STRING.dataType),
> > new RecordField('file_name',RecordFieldType.STRING.dataType),
> > new RecordField('file_path',RecordFieldType.STRING.dataType),
> > new RecordField('file_size',RecordFieldType.LONG.dataType),
> > new 
> > RecordField('flow_file',RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.dataType))
> >  ]
> > )
> >
> > And I'm attempting to populate nextRecord with this.
> >
> > new MapRecord(recordSchema,[
> > 'id':variables.uuid,
> > 'file_name':variables."original_filename",
> > 'file_path':variables.path,
> > 'file_size':variables."file.size",
> > 'flow_file':inputStream.getBytes()
> > ]
> >
> >
> > And I get this error. I'm missing something about how to store the contents 
> > of the flowfile as a binary column Avro but I'm not sure what I'm missing.
> >
> > org.apache.nifi.serialization.record.util.IllegalTypeConversionException: 
> > Cannot convert value [B@6c4141cf of type class [B because no compatible 
> > types exist in the UNION for field flow_file
> >         at 
> > org.apache.nifi.avro.AvroTypeUtil.convertUnionFieldValue(AvroTypeUtil.java:708)
> >         at 
> > org.apache.nifi.avro.AvroTypeUtil.convertToAvroObject(AvroTypeUtil.java:607)
> >         at 
> > org.apache.nifi.avro.AvroTypeUtil.createAvroRecord(AvroTypeUtil.java:456)
> >         at 
> > org.apache.nifi.avro.WriteAvroResultWithSchema.writeRecord(WriteAvroResultWithSchema.java:60)
> >         at 
> > org.apache.nifi.serialization.AbstractRecordSetWriter.write(AbstractRecordSetWriter.java:59)
> >         at 
> > org.apache.nifi.processors.standard.merge.RecordBin.offer(RecordBin.java:142)
> >         at 
> > org.apache.nifi.processors.standard.merge.RecordBinManager.add(RecordBinManager.java:154)
> >         at 
> > org.apache.nifi.processors.standard.MergeRecord.binFlowFile(MergeRecord.java:327)
> >         at 
> > org.apache.nifi.processors.standard.MergeRecord.onTrigger(MergeRecord.java:294)
> >         at 
> > org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1124)
> >         at 
> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
> >         at 
> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
> >         at 
> > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)
> >         at 
> > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> >         at 
> > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> >         at 
> > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> >         at 
> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >         at 
> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >         at java.lang.Thread.run(Thread.java:748)
> >
> >
> > Thanks
> > Shawn Weeks
> >
> >

Reply via email to