That logging is part of my debug attempt, rather than the original.
The script is pretty simple - it is checking the compression_cd field (the
name of which is defined in a property), and depending on the value of this
field, hands off the content of the blob_content field (name also defined
in a property) to one of two processing branches (not shown).

The failure with the new data format occurs when attempting to retrieve the
compression_cd field, with errors about failing to cast null to integer.

I'm attempting to print all the available fields with the code you quoted.
#############
For the old data format it produced:
23:13:58 BST
WARNING
fbad38e4-5811-1f5d-5d24-92673e573781

ExecuteGroovyScript[id=fbad38e4-5811-1f5d-5d24-92673e573781] Field name :
PERSON_ID

23:13:58 BST
WARNING
fbad38e4-5811-1f5d-5d24-92673e573781

ExecuteGroovyScript[id=fbad38e4-5811-1f5d-5d24-92673e573781] Field name :
gender

23:13:58 BST
WARNING
fbad38e4-5811-1f5d-5d24-92673e573781

ExecuteGroovyScript[id=fbad38e4-5811-1f5d-5d24-92673e573781] Field name :
age

23:13:58 BST
WARNING
fbad38e4-5811-1f5d-5d24-92673e573781

ExecuteGroovyScript[id=fbad38e4-5811-1f5d-5d24-92673e573781] Got next record

23:13:58 BST
WARNING
fbad38e4-5811-1f5d-5d24-92673e573781

ExecuteGroovyScript[id=fbad38e4-5811-1f5d-5d24-92673e573781] Got
compression cod
##############
For the new
##############
0:49:21 BST
WARNING
219801af-0198-1000-88b2-b2d88ce64a3a

ExecuteGroovyScript[id=219801af-0198-1000-88b2-b2d88ce64a3a] Field name :
compression_cd

00:49:21 BST
WARNING
219801af-0198-1000-88b2-b2d88ce64a3a

ExecuteGroovyScript[id=219801af-0198-1000-88b2-b2d88ce64a3a] Field name :
max_sequence_nbr

00:49:21 BST
WARNING
219801af-0198-1000-88b2-b2d88ce64a3a

ExecuteGroovyScript[id=219801af-0198-1000-88b2-b2d88ce64a3a] Field name :
blob_contents

00:49:21 BST
WARNING
219801af-0198-1000-88b2-b2d88ce64a3a

ExecuteGroovyScript[id=219801af-0198-1000-88b2-b2d88ce64a3a] Got next record

00:49:21 BST
ERROR
219801af-0198-1000-88b2-b2d88ce64a3a

ExecuteGroovyScript[id=219801af-0198-1000-88b2-b2d88ce64a3a] Error
appending new record to avro file:
org.codehaus.groovy.runtime.typehandling.GroovyCastException: Cannot cast
object 'null' with class 'null' to class 'int'. Try 'java.lang.Integer'
instead
##############
The strange thing is that the blob content field name isn't displayed in
the output from the old format, yet I'm able to pass the content to the
subsequent processing steps. Also, blob_id isn't listed in the new format
output, but I'm able to retrieve it. compression_cd is listed, but I don't
seem to be able to fetch it, even if I hardcode the name

On Mon, Jul 21, 2025 at 12:57 AM Joe Witt <joe.w...@gmail.com> wrote:

> Richard,
>
> It isn't obvious to me what you're trying to achieve vs what is currently
> happening but I'd be interested to know what the output for this part of
> the code is
>
>             def fields = SCH.getFields()
>             if (fields.isEmpty() ) {
>                log.warn("No fields found")
>             } else {
>                fields.each { field ->
>                   def fieldName = field.name()
>                   log.warn("Field name : ${field.name()}")
>                }
>             }
>
> Is the compression_id field there?
>
> Thanks
>
> On Sun, Jul 20, 2025 at 2:10 AM Richard Beare <richard.be...@gmail.com>
> wrote:
>
>> And I forgot to say - I'm using nifi 1.20.0
>>
>> On Sun, Jul 20, 2025 at 6:56 PM Richard Beare <richard.be...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> I have a groovy script executed from an ExecuteGroovyScript processor
>>> that I wrote a few years ago. I copied the entire group and reconfigured
>>> the input to point at a new source database (postgres rather than mssql).
>>> The data is flowing through to the groovy processor OK, but the structure
>>> of records is a little different (mostly case in the field names, with a
>>> few different names). Most of the names were configured via properties to
>>> allow easy modification via the interface.
>>>
>>> The original workflow seems to work just fine. The new one is having
>>> problems retrieving fields, and I can't figure out why.
>>>
>>> The error I get is when retrieving the compression code, the field name
>>> of which is passed as a property.
>>> I'm able to explicitly retrieve the blob_id
>>> The field printing I've included is only printing the last 3 fields for
>>> for the old and new data.
>>>
>>> What am I missing here. Apologies if this is something obvious, I'm a
>>> bit out of nifi practice.
>>>
>>> The old input records were in this form:
>>> [ {
>>>   "blobid" : 72001,
>>>   "EVENT_ID" : 7.91947467E8,
>>>   "VALID_FROM_DT_TM" : "2020-03-10T02:16:34Z",
>>>   "VALID_UNTIL_DT_TM" : "2100-12-31T00:00:00Z",
>>>   "BLOB_SEQ_NUM" : 1.0,
>>>   "BLOB_LENGTH" : 976.0,
>>>   "COMPRESSION_CD" : 728.0,
>>>   "UPDT_DT_TM" : "2020-03-10T02:16:34Z",
>>>   "BLOB_CONTENTS" : "=—\u000 -- binary stuff",
>>>   "blob_parts" : 1.0,
>>>   "ENCNTR_ID" : 37184618,
>>>   "PERSON_ID" : 9114238,
>>>   "gender" : 2,
>>>   "age" : 92
>>> }, {
>>>   "blobid" : 72002,
>>>   "EVENT_ID" : 7.91948699E8,
>>>   "VALID_FROM_DT_TM" : "2020-03-07T11:11:33Z",
>>>   "VALID_UNTIL_DT_TM" : "2100-12-31T00:00:00Z",
>>>   "BLOB_SEQ_NUM" : 1.0,
>>>   "BLOB_LENGTH" : 2304.0,
>>>   "COMPRESSION_CD" : 728.0,
>>>
>>> ...
>>> ]
>>>
>>> The new ones look like:
>>>
>>> [ {
>>>   "blob_id" : 1001,
>>>   "event_id" : 3188115,
>>>   "person_id" : 8430038,
>>>   "encntr_id" : 13352660,
>>>   "valid_from_dt_tm" : "2011-05-19T00:39:51Z",
>>>   "creation_dt_tm" : "2011-05-19T00:39:51Z",
>>>   "blob_seq_num" : 1,
>>>   "blob_length" : 2252,
>>>   "compression_cd" : 728,
>>>   "max_sequence_nbr" : 1,
>>>   "blob_contents" : "\u00 - binary stuff"
>>> }, {
>>>   "blob_id" : 1002,
>>>   "event_id" : 3188119,
>>>   "person_id" : 7241448,
>>>   "encntr_id" : 11645097,
>>>   "valid_from_dt_tm" : "2011-05-19T00:39:51Z",
>>>   "creation_dt_tm" : "2011-05-19T00:39:51Z",
>>>
>>> So there is some formatting difference, and field ordering difference.
>>> The script only uses compression_cd and blob_contents
>>>
>>> @Grab('org.apache.avro:avro:1.8.1')
>>> import org.apache.avro.*
>>> import org.apache.avro.file.*
>>> import org.apache.avro.generic.*
>>> import java.nio.ByteBuffer
>>> import DecompressOcf.DecompressBlob
>>>
>>> // from
>>> https://github.com/maxbback/avro_reader_writer/blob/master/avroProcessor.groovy
>>>
>>>
>>> // needs docfield and compressfield in the host processor
>>>
>>> def flowFile = session.get()
>>> if(!flowFile) return
>>>
>>> try {
>>>
>>>     flowFile = session.write(flowFile, {inStream, outStream ->
>>>         // Defining avro reader and writer
>>>         DataFileStream<GenericRecord> reader = new
>>> DataFileStream<>(inStream, new GenericDatumReader<GenericRecord>())
>>>         DataFileWriter<GenericRecord> writer = new DataFileWriter<>(new
>>> GenericDatumWriter<GenericRecord>())
>>>
>>>         // get avro schema
>>>         def schema = reader.schema
>>>         // in my case I am processing a address lookup table data with
>>> only one field the address field
>>>         //
>>> {"type":"record","name":"lookuptable","namespace":"any.data","fields":[{"name":"address","type":["null","string"]}]}
>>>
>>>         // Define which schema to be used for writing
>>>         // If you want to extend or change the output record format
>>>         // you define a new schema and specify that it shall be used for
>>> writing
>>>         writer.create(schema, outStream)
>>>         log.warn(String.format("Before while loop"))
>>>         // process record by record
>>>         while (reader.hasNext()) {
>>>             log.warn(String.format("Start of while loop: %s",
>>> compressfield))
>>>             GenericRecord currRecord = reader.next()
>>>             def SCH = currRecord.getSchema()
>>>
>>>             def fields = SCH.getFields()
>>>             if (fields.isEmpty() ) {
>>>                log.warn("No fields found")
>>>             } else {
>>>                fields.each { field ->
>>>                   def fieldName = field.name()
>>>                   log.warn("Field name : ${field.name()}")
>>>                }
>>>             }
>>>             log.warn("Got next record")
>>>             //int I = currRecord.get("event_id")
>>>             //log.warn(String.format(" blob_id direct %d ", I))
>>>
>>>             int CompressionCode = currRecord.get(compressfield as String)
>>>             log.warn(String.format("Got compression code"))
>>>             ByteBuffer sblob = currRecord.get(docfield as String)
>>>             if (CompressionCode == 728) {
>>>                // action normally here
>>>             } else if (CompressionCode == 727) {
>>>                 // this blob isn't compressed - strip the suffix
>>>               // action without decompression.
>>>             } else {
>>>                 log.error('Unknown compression code')
>>>             }
>>>             writer.append(currRecord)
>>>         }
>>>         // Create a new record
>>>         //   GenericRecord newRecord = new GenericData.Record(schema)
>>>         // populate the record with data
>>>         //   newRecord.put("address", new org.apache.avro.util.Utf8("My
>>> street"))
>>>         // Append a new record to avro file
>>>         //   writer.append(newRecord)
>>>         //writer.appendAllFrom(reader, false)
>>>         // do not forget to close the writer
>>>         writer.close()
>>>
>>>     } as StreamCallback)
>>>
>>>     session.transfer(flowFile, REL_SUCCESS)
>>> } catch(e) {
>>>     log.error('Error appending new record to avro file', e)
>>>     flowFile = session.penalize(flowFile)
>>>     session.transfer(flowFile, REL_FAILURE)
>>> }
>>>
>>

Reply via email to