Hi,

Nifi version 1.20

I have a python script that is breaking an input json into multiple pieces.
The original is available here
<https://github.com/CogStack/CogStack-NiFi/blob/master/nifi/user-scripts/parse-anns-from-nlp-response-bulk.py>
.

My problem is that the script is only producing components for the first
few elements of the result, and I'm placing lots of log.info statements in
the process method to figure out what is going on.

These aren't behaving as I expect. I usually only see the "doc_idB" entries
in the log. Occasionally but not always the "starting process" entry and
never "Got medcat info"

Is there something special about how these methods are run? I can load a
json flowfile into a regular python session and the annotate_text_record
loop appears to work as expected, but I never see the result inside nifi.

Am I missing something? Is there a better debug method?

class PyStreamCallback(StreamCallback):
    def __init__(self):
        pass

    def process(self, inputStream, outputStream):
        log.info("Starting process ")
        #bytes_arr = IOUtils.toByteArray(inputStream)
        #bytes_io = io.BytesIO(bytes_arr)
        input_text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        json_data_records = json.loads(input_text)
        #json_data_records = json.loads(bytes_io.read())
        log.info("loaded records")
        result = json_data_records["result"]
        medcat_info = json_data_records["medcat_info"]
        log.info("Got medcat info")
        log.info(str(len(result)))

        for annotated_text_record in result:
            footer = annotated_text_record["footer"]
            doc_id = footer[DOCUMENT_ID_FIELD_NAME]
            log.info("doc_idA: " + doc_id)

         for annotated_text_record in result:
            annotations = annotated_text_record["annotations"]
            footer = annotated_text_record["footer"]

            new_footer = {}

            assert DOCUMENT_ID_FIELD_NAME in footer.keys()
            doc_id = footer[DOCUMENT_ID_FIELD_NAME]

            for k,v in footer.iteritems():
                if k in ORIGINAL_FIELDS_TO_INCLUDE:
                    new_footer[FIELD_META_PREFIX + k] = v
            log.info("doc_idB: " + doc_id)

            for annotation in annotations:
                new_ann_record = {}
                annotation_data = annotation.values()

                # more stuff here creating the new flowfile

Reply via email to