James, You can use session.putAttribute() to do this. Anything you write to the output stream will go directly to flowfile content. I hope this makes sense.
(Groovy example)
flowFile = session.putAttribute(flowFile, ’some_new_attribute_name’, "This is
content with ${stringInterpolation.toString()} and math ${ 3 * 2}”)
flowFile = session.putAttribute(flowFile, ’message.REMARKS’, “Here I am
replacing the prior attribute value with this static string. ")
session.transfer(flowFile, REL_SUCCESS)
Andy LoPresto
[email protected]
[email protected]
PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4 BACE 3C6E F65B 2F7D EF69
> On Jan 16, 2017, at 1:57 PM, James McMahon <[email protected]> wrote:
>
> I am having great difficulty getting a stream callback in python code to
> simply add a new metadata attribute property with value to a flowfile. I
> execute my code within an ExecuteScript processor. My incoming flowfile has a
> number of complex attributes that contain embedded data of high interest to
> my users. An attribute entitled REMARKS is one such example, containing
> critical information that I parse out using regex in python. While simple
> cases may allow me to use UpdateAttribute to add and modify attributes, I
> find that they do not allow me to perform all the complex regex I anticipate
> for my requirements.
>
> My code below successfully parses the existing attribute, but it saves the
> value(s) as a new data payload of my flowfile – not as a new attribute. I
> must save the parsed result as a new attribute in the flowfile, and must
> leave my flowfile data payload unchanged. How can I do this? This seems like
> such a fundamental feature of common interest, and so I have to believe I am
> missing the obvious.
>
> (My code below was developed based on an example originally offered by Matt
> B. I want to give him credit for his examples and thank him for getting me
> started).
>
> Please pardon single quote characters formatted improperly in Word by my code
> retyping efforts below.
>
> import json
> import re
> import java.io <http://java.io/>
> import csv
> from org.apache.commons.io <http://org.apache.commons.io/> import IOUtils
> from java.nio.charset import StandardCharsets
> from org.apache.nifi.processor.io <http://org.apache.nifi.processor.io/>
> import StreamCallback
>
> result = {} # define a dictionary
> def isNotEmpty(s):
> return bool(s and s.strip());
>
> def parseEmbeddedColor(s):
> pattern = re.compile(r”””.*COLOR\=
> (?P<m1>.*?)
> \/\/
>
> .*”””,re.IGNORECASE|re.DOTALL|re.VERBOSE)
> match = pattern.match(s)
> if match is None:
> return ‘’
> thisMatch = match.group(“m1”)
> if thisMatch: return thisMatch
> else: return ‘’
>
> class PyStreamCallback(StreamCallback) :
> def __init__(self):
> pass
> def process(self, inputStream, outputStream):
> if
> isNotEmpty(flowFile.getAttribute(‘message.REMARKS’)):
> incoming_metadata_comment =
> flowFile.getAttribute(‘message.REMARKS’)
> else:
> incoming_metadata_comment = ‘’
> origColor =
> parseEmbeddedColor(incoming_metadata_content)
> if isNotEmpty(origColor):
> result[‘origColor’] = origColor
> else:
> result[‘origColor’] = ‘’
>
> outputStream.write(Unicode(json.dumps(result[‘origColor’])))
>
> flowFile = session.get()
> if (flowFile != None) :
> # the following line is all I can get to work currently. It is
> not what I need. It replaces the flowFile data payload…
> flowFile = session.write(flowFile, PyStreamCallback())
>
> # I made an attempt to add the new parsed color value as an
> attribute here, but failed…
> # flowFile =
> session.putAttribute(flowFile,”parsedColor”,PyStreamCallback())
>
> session.transfer(flowFile,REL_SUCCESS)
signature.asc
Description: Message signed with OpenPGP using GPGMail
