Hi, I have a groovy script executed from an ExecuteGroovyScript processor that I wrote a few years ago. I copied the entire group and reconfigured the input to point at a new source database (postgres rather than mssql). The data is flowing through to the groovy processor OK, but the structure of records is a little different (mostly case in the field names, with a few different names). Most of the names were configured via properties to allow easy modification via the interface.
The original workflow seems to work just fine. The new one is having problems retrieving fields, and I can't figure out why. The error I get is when retrieving the compression code, the field name of which is passed as a property. I'm able to explicitly retrieve the blob_id The field printing I've included is only printing the last 3 fields for for the old and new data. What am I missing here. Apologies if this is something obvious, I'm a bit out of nifi practice. The old input records were in this form: [ { "blobid" : 72001, "EVENT_ID" : 7.91947467E8, "VALID_FROM_DT_TM" : "2020-03-10T02:16:34Z", "VALID_UNTIL_DT_TM" : "2100-12-31T00:00:00Z", "BLOB_SEQ_NUM" : 1.0, "BLOB_LENGTH" : 976.0, "COMPRESSION_CD" : 728.0, "UPDT_DT_TM" : "2020-03-10T02:16:34Z", "BLOB_CONTENTS" : "=—\u000 -- binary stuff", "blob_parts" : 1.0, "ENCNTR_ID" : 37184618, "PERSON_ID" : 9114238, "gender" : 2, "age" : 92 }, { "blobid" : 72002, "EVENT_ID" : 7.91948699E8, "VALID_FROM_DT_TM" : "2020-03-07T11:11:33Z", "VALID_UNTIL_DT_TM" : "2100-12-31T00:00:00Z", "BLOB_SEQ_NUM" : 1.0, "BLOB_LENGTH" : 2304.0, "COMPRESSION_CD" : 728.0, ... ] The new ones look like: [ { "blob_id" : 1001, "event_id" : 3188115, "person_id" : 8430038, "encntr_id" : 13352660, "valid_from_dt_tm" : "2011-05-19T00:39:51Z", "creation_dt_tm" : "2011-05-19T00:39:51Z", "blob_seq_num" : 1, "blob_length" : 2252, "compression_cd" : 728, "max_sequence_nbr" : 1, "blob_contents" : "\u00 - binary stuff" }, { "blob_id" : 1002, "event_id" : 3188119, "person_id" : 7241448, "encntr_id" : 11645097, "valid_from_dt_tm" : "2011-05-19T00:39:51Z", "creation_dt_tm" : "2011-05-19T00:39:51Z", So there is some formatting difference, and field ordering difference. The script only uses compression_cd and blob_contents @Grab('org.apache.avro:avro:1.8.1') import org.apache.avro.* import org.apache.avro.file.* import org.apache.avro.generic.* import java.nio.ByteBuffer import DecompressOcf.DecompressBlob // from https://github.com/maxbback/avro_reader_writer/blob/master/avroProcessor.groovy // needs docfield and compressfield in the host processor def flowFile = session.get() if(!flowFile) return try { flowFile = session.write(flowFile, {inStream, outStream -> // Defining avro reader and writer DataFileStream<GenericRecord> reader = new DataFileStream<>(inStream, new GenericDatumReader<GenericRecord>()) DataFileWriter<GenericRecord> writer = new DataFileWriter<>(new GenericDatumWriter<GenericRecord>()) // get avro schema def schema = reader.schema // in my case I am processing a address lookup table data with only one field the address field // {"type":"record","name":"lookuptable","namespace":"any.data","fields":[{"name":"address","type":["null","string"]}]} // Define which schema to be used for writing // If you want to extend or change the output record format // you define a new schema and specify that it shall be used for writing writer.create(schema, outStream) log.warn(String.format("Before while loop")) // process record by record while (reader.hasNext()) { log.warn(String.format("Start of while loop: %s", compressfield)) GenericRecord currRecord = reader.next() def SCH = currRecord.getSchema() def fields = SCH.getFields() if (fields.isEmpty() ) { log.warn("No fields found") } else { fields.each { field -> def fieldName = field.name() log.warn("Field name : ${field.name()}") } } log.warn("Got next record") //int I = currRecord.get("event_id") //log.warn(String.format(" blob_id direct %d ", I)) int CompressionCode = currRecord.get(compressfield as String) log.warn(String.format("Got compression code")) ByteBuffer sblob = currRecord.get(docfield as String) if (CompressionCode == 728) { // action normally here } else if (CompressionCode == 727) { // this blob isn't compressed - strip the suffix // action without decompression. } else { log.error('Unknown compression code') } writer.append(currRecord) } // Create a new record // GenericRecord newRecord = new GenericData.Record(schema) // populate the record with data // newRecord.put("address", new org.apache.avro.util.Utf8("My street")) // Append a new record to avro file // writer.append(newRecord) //writer.appendAllFrom(reader, false) // do not forget to close the writer writer.close() } as StreamCallback) session.transfer(flowFile, REL_SUCCESS) } catch(e) { log.error('Error appending new record to avro file', e) flowFile = session.penalize(flowFile) session.transfer(flowFile, REL_FAILURE) }