So I've tried the Apache Commons ArrayUtils.toObject and now I get this which
isn't much different.
new MapRecord(recordSchema,[
'id':variables.uuid,
'file_name':variables."original_filename",
'file_path':variables.path,
'file_size':variables."file.size",
'flow_file':ArrayUtils.toObject(inputStream.getBytes())
]
2018-09-17 13:59:05,291 ERROR [Timer-Driven Process Thread-220]
o.a.nifi.processors.standard.MergeRecord
MergeRecord[id=b81d3d84-4ffc-110b-8276-64270d885e10] Failed to create merged
FlowFile from 1 input FlowFiles; routing originals to failure:
org.apache.nifi.serialization.record.util.IllegalTypeConversionException:
Cannot convert value [Ljava.lang.Byte;@bd6d229 of type class [Ljava.lang.Byte;
because no compatible types exist in the UNION for field flow_file
org.apache.nifi.serialization.record.util.IllegalTypeConversionException:
Cannot convert value [Ljava.lang.Byte;@bd6d229 of type class [Ljava.lang.Byte;
because no compatible types exist in the UNION for field flow_file
at
org.apache.nifi.avro.AvroTypeUtil.convertUnionFieldValue(AvroTypeUtil.java:708)
at
org.apache.nifi.avro.AvroTypeUtil.convertToAvroObject(AvroTypeUtil.java:607)
at
org.apache.nifi.avro.AvroTypeUtil.createAvroRecord(AvroTypeUtil.java:456)
at
org.apache.nifi.avro.WriteAvroResultWithSchema.writeRecord(WriteAvroResultWithSchema.java:60)
at
org.apache.nifi.serialization.AbstractRecordSetWriter.write(AbstractRecordSetWriter.java:59)
at
org.apache.nifi.processors.standard.merge.RecordBin.offer(RecordBin.java:142)
at
org.apache.nifi.processors.standard.merge.RecordBinManager.add(RecordBinManager.java:154)
at
org.apache.nifi.processors.standard.MergeRecord.binFlowFile(MergeRecord.java:327)
at
org.apache.nifi.processors.standard.MergeRecord.onTrigger(MergeRecord.java:294)
at
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1124)
at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
at
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
________________________________
From: Mark Payne <[email protected]>
Sent: Monday, September 17, 2018 1:54:57 PM
To: [email protected]
Subject: Re: Scripted Record Reader - Missing Something Obvious
Shawn,
I believe the issue that you're running into is that you're defining the
'flow_file' field of an Array of type Byte.
Which means that it is expecting as its value an object of type Byte[] but you
are passing it an object of type byte[].
You'd have to create a Byte[] instead, using the object wrapper instead of the
primitive byte array.
Thanks
-Mark
On Sep 17, 2018, at 2:12 PM, Shawn Weeks
<[email protected]<mailto:[email protected]>> wrote:
Let's say I'm trying to store the contents of a flow file in byte array column
in Avro. I've defined a recordSchema in Groovy like this.
def recordSchema = new SimpleRecordSchema(
[
new RecordField('id',RecordFieldType.STRING.dataType),
new RecordField('file_name',RecordFieldType.STRING.dataType),
new RecordField('file_path',RecordFieldType.STRING.dataType),
new RecordField('file_size',RecordFieldType.LONG.dataType),
new
RecordField('flow_file',RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.dataType))
]
)
And I'm attempting to populate nextRecord with this.
new MapRecord(recordSchema,[
'id':variables.uuid,
'file_name':variables."original_filename",
'file_path':variables.path,
'file_size':variables."file.size",
'flow_file':inputStream.getBytes()
]
And I get this error. I'm missing something about how to store the contents of
the flowfile as a binary column Avro but I'm not sure what I'm missing.
org.apache.nifi.serialization.record.util.IllegalTypeConversionException:
Cannot convert value [B@6c4141cf of type class [B because no compatible types
exist in the UNION for field flow_file
at
org.apache.nifi.avro.AvroTypeUtil.convertUnionFieldValue(AvroTypeUtil.java:708)
at
org.apache.nifi.avro.AvroTypeUtil.convertToAvroObject(AvroTypeUtil.java:607)
at
org.apache.nifi.avro.AvroTypeUtil.createAvroRecord(AvroTypeUtil.java:456)
at
org.apache.nifi.avro.WriteAvroResultWithSchema.writeRecord(WriteAvroResultWithSchema.java:60)
at
org.apache.nifi.serialization.AbstractRecordSetWriter.write(AbstractRecordSetWriter.java:59)
at
org.apache.nifi.processors.standard.merge.RecordBin.offer(RecordBin.java:142)
at
org.apache.nifi.processors.standard.merge.RecordBinManager.add(RecordBinManager.java:154)
at
org.apache.nifi.processors.standard.MergeRecord.binFlowFile(MergeRecord.java:327)
at
org.apache.nifi.processors.standard.MergeRecord.onTrigger(MergeRecord.java:294)
at
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1124)
at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
at
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Thanks
Shawn Weeks