Hi,
I have a groovy script executed from an ExecuteGroovyScript processor that
I wrote a few years ago. I copied the entire group and reconfigured the
input to point at a new source database (postgres rather than mssql). The
data is flowing through to the groovy processor OK, but the structure of
records is a little different (mostly case in the field names, with a few
different names). Most of the names were configured via properties to allow
easy modification via the interface.

The original workflow seems to work just fine. The new one is having
problems retrieving fields, and I can't figure out why.

The error I get is when retrieving the compression code, the field name of
which is passed as a property.
I'm able to explicitly retrieve the blob_id
The field printing I've included is only printing the last 3 fields for for
the old and new data.

What am I missing here. Apologies if this is something obvious, I'm a bit
out of nifi practice.

The old input records were in this form:
[ {
  "blobid" : 72001,
  "EVENT_ID" : 7.91947467E8,
  "VALID_FROM_DT_TM" : "2020-03-10T02:16:34Z",
  "VALID_UNTIL_DT_TM" : "2100-12-31T00:00:00Z",
  "BLOB_SEQ_NUM" : 1.0,
  "BLOB_LENGTH" : 976.0,
  "COMPRESSION_CD" : 728.0,
  "UPDT_DT_TM" : "2020-03-10T02:16:34Z",
  "BLOB_CONTENTS" : "=—\u000 -- binary stuff",
  "blob_parts" : 1.0,
  "ENCNTR_ID" : 37184618,
  "PERSON_ID" : 9114238,
  "gender" : 2,
  "age" : 92
}, {
  "blobid" : 72002,
  "EVENT_ID" : 7.91948699E8,
  "VALID_FROM_DT_TM" : "2020-03-07T11:11:33Z",
  "VALID_UNTIL_DT_TM" : "2100-12-31T00:00:00Z",
  "BLOB_SEQ_NUM" : 1.0,
  "BLOB_LENGTH" : 2304.0,
  "COMPRESSION_CD" : 728.0,

...
]

The new ones look like:

[ {
  "blob_id" : 1001,
  "event_id" : 3188115,
  "person_id" : 8430038,
  "encntr_id" : 13352660,
  "valid_from_dt_tm" : "2011-05-19T00:39:51Z",
  "creation_dt_tm" : "2011-05-19T00:39:51Z",
  "blob_seq_num" : 1,
  "blob_length" : 2252,
  "compression_cd" : 728,
  "max_sequence_nbr" : 1,
  "blob_contents" : "\u00 - binary stuff"
}, {
  "blob_id" : 1002,
  "event_id" : 3188119,
  "person_id" : 7241448,
  "encntr_id" : 11645097,
  "valid_from_dt_tm" : "2011-05-19T00:39:51Z",
  "creation_dt_tm" : "2011-05-19T00:39:51Z",

So there is some formatting difference, and field ordering difference.
The script only uses compression_cd and blob_contents

@Grab('org.apache.avro:avro:1.8.1')
import org.apache.avro.*
import org.apache.avro.file.*
import org.apache.avro.generic.*
import java.nio.ByteBuffer
import DecompressOcf.DecompressBlob

// from
https://github.com/maxbback/avro_reader_writer/blob/master/avroProcessor.groovy


// needs docfield and compressfield in the host processor

def flowFile = session.get()
if(!flowFile) return

try {

    flowFile = session.write(flowFile, {inStream, outStream ->
        // Defining avro reader and writer
        DataFileStream<GenericRecord> reader = new
DataFileStream<>(inStream, new GenericDatumReader<GenericRecord>())
        DataFileWriter<GenericRecord> writer = new DataFileWriter<>(new
GenericDatumWriter<GenericRecord>())

        // get avro schema
        def schema = reader.schema
        // in my case I am processing a address lookup table data with only
one field the address field
        //
{"type":"record","name":"lookuptable","namespace":"any.data","fields":[{"name":"address","type":["null","string"]}]}

        // Define which schema to be used for writing
        // If you want to extend or change the output record format
        // you define a new schema and specify that it shall be used for
writing
        writer.create(schema, outStream)
        log.warn(String.format("Before while loop"))
        // process record by record
        while (reader.hasNext()) {
            log.warn(String.format("Start of while loop: %s",
compressfield))
            GenericRecord currRecord = reader.next()
            def SCH = currRecord.getSchema()

            def fields = SCH.getFields()
            if (fields.isEmpty() ) {
               log.warn("No fields found")
            } else {
               fields.each { field ->
                  def fieldName = field.name()
                  log.warn("Field name : ${field.name()}")
               }
            }
            log.warn("Got next record")
            //int I = currRecord.get("event_id")
            //log.warn(String.format(" blob_id direct %d ", I))

            int CompressionCode = currRecord.get(compressfield as String)
            log.warn(String.format("Got compression code"))
            ByteBuffer sblob = currRecord.get(docfield as String)
            if (CompressionCode == 728) {
               // action normally here
            } else if (CompressionCode == 727) {
                // this blob isn't compressed - strip the suffix
              // action without decompression.
            } else {
                log.error('Unknown compression code')
            }
            writer.append(currRecord)
        }
        // Create a new record
        //   GenericRecord newRecord = new GenericData.Record(schema)
        // populate the record with data
        //   newRecord.put("address", new org.apache.avro.util.Utf8("My
street"))
        // Append a new record to avro file
        //   writer.append(newRecord)
        //writer.appendAllFrom(reader, false)
        // do not forget to close the writer
        writer.close()

    } as StreamCallback)

    session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
    log.error('Error appending new record to avro file', e)
    flowFile = session.penalize(flowFile)
    session.transfer(flowFile, REL_FAILURE)
}

Reply via email to