I am having great difficulty getting a stream callback in python code to
simply add a new metadata attribute property with value to a flowfile. I
execute my code within an ExecuteScript processor. My incoming flowfile has
a number of complex attributes that contain embedded data of high interest
to my users. An attribute entitled REMARKS is one such example, containing
critical information that I parse out using regex in python. While simple
cases may allow me to use UpdateAttribute to add and modify attributes, I
find that they do not allow me to perform all the complex regex I
anticipate for my requirements.
My code below successfully parses the existing attribute, but it saves the
value(s) as a new data payload of my flowfile – not as a new attribute. I
must save the parsed result as a new attribute in the flowfile, and must
leave my flowfile data payload unchanged. How can I do this? This seems
like such a fundamental feature of common interest, and so I have to
believe I am missing the obvious.
(My code below was developed based on an example originally offered by Matt
B. I want to give him credit for his examples and thank him for getting me
started).
Please pardon single quote characters formatted improperly in Word by my
code retyping efforts below.
import json
import re
import java.io
import csv
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
result = {} # define a dictionary
def isNotEmpty(s):
return bool(s and s.strip());
def parseEmbeddedColor(s):
pattern = re.compile(r”””.*COLOR\=
(?P<m1>.*?)
\/\/
.*”””,re.IGNORECASE|re.DOTALL|re.VERBOSE)
match = pattern.match(s)
if match is None:
return ‘’
thisMatch = match.group(“m1”)
if thisMatch: return thisMatch
else: return ‘’
class PyStreamCallback(StreamCallback) :
def __init__(self):
pass
def process(self, inputStream, outputStream):
if
isNotEmpty(flowFile.getAttribute(‘message.REMARKS’)):
incoming_metadata_comment =
flowFile.getAttribute(‘message.REMARKS’)
else:
incoming_metadata_comment = ‘’
origColor =
parseEmbeddedColor(incoming_metadata_content)
if isNotEmpty(origColor):
result[‘origColor’] = origColor
else:
result[‘origColor’] = ‘’
outputStream.write(Unicode(json.dumps(result[‘origColor’])))
flowFile = session.get()
if (flowFile != None) :
# the following line is all I can get to work currently. It
is not what I need. It replaces the flowFile data payload…
flowFile = session.write(flowFile, PyStreamCallback())
# I made an attempt to add the new parsed color value as an
attribute here, but failed…
# flowFile =
session.putAttribute(flowFile,”parsedColor”,PyStreamCallback())
session.transfer(flowFile,REL_SUCCESS)