James,

You can use session.putAttribute() to do this. Anything you write to the output 
stream will go directly to flowfile content. I hope this makes sense.

(Groovy example)

flowFile = session.putAttribute(flowFile, ’some_new_attribute_name’, "This is 
content with ${stringInterpolation.toString()} and math ${ 3 * 2}”)
flowFile = session.putAttribute(flowFile, ’message.REMARKS’, “Here I am 
replacing the prior attribute value with this static string. ")
session.transfer(flowFile, REL_SUCCESS)



Andy LoPresto
[email protected]
[email protected]
PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69

> On Jan 16, 2017, at 1:57 PM, James McMahon <[email protected]> wrote:
> 
> I am having great difficulty getting a stream callback in python code to 
> simply add a new metadata attribute property with value to a flowfile. I 
> execute my code within an ExecuteScript processor. My incoming flowfile has a 
> number of complex attributes that contain embedded data of high interest to 
> my users. An attribute entitled REMARKS is one such example, containing 
> critical information that I parse out using regex in python. While simple 
> cases may allow me to use UpdateAttribute to add and modify attributes, I 
> find that they do not allow me to perform all the complex regex I anticipate 
> for my requirements.
> 
> My code below successfully parses the existing attribute, but it saves the 
> value(s) as a new data payload of my flowfile – not as a new attribute. I 
> must save the parsed result as a new attribute in the flowfile, and must 
> leave my flowfile data payload unchanged. How can I do this? This seems like 
> such a fundamental feature of common interest, and so I have to believe I am 
> missing the obvious.
> 
> (My code below was developed based on an example originally offered by Matt 
> B. I want to give him credit for his examples and thank him for getting me 
> started).
> 
> Please pardon single quote characters formatted improperly in Word by my code 
> retyping efforts below.
> 
> import json
> import re
> import java.io <http://java.io/>
> import csv
> from org.apache.commons.io <http://org.apache.commons.io/> import IOUtils
> from java.nio.charset import StandardCharsets
> from org.apache.nifi.processor.io <http://org.apache.nifi.processor.io/> 
> import StreamCallback
> 
> result = {} # define a dictionary
> def isNotEmpty(s):
>               return bool(s and s.strip());
> 
> def parseEmbeddedColor(s):
>               pattern = re.compile(r”””.*COLOR\=
>                                            (?P<m1>.*?)
>                                            \/\/
>                                            
> .*”””,re.IGNORECASE|re.DOTALL|re.VERBOSE)
> match = pattern.match(s)
> if match is None:
>               return ‘’
> thisMatch = match.group(“m1”)
> if thisMatch: return thisMatch
> else: return ‘’
> 
> class PyStreamCallback(StreamCallback) :
>               def __init__(self):
>                              pass
>               def process(self, inputStream, outputStream):
>                              if 
> isNotEmpty(flowFile.getAttribute(‘message.REMARKS’)):
>                                            incoming_metadata_comment = 
> flowFile.getAttribute(‘message.REMARKS’)
>                              else:
>                                            incoming_metadata_comment = ‘’
>                              origColor = 
> parseEmbeddedColor(incoming_metadata_content)
>                              if isNotEmpty(origColor):
>                                            result[‘origColor’] = origColor
>                              else:
>                                            result[‘origColor’] = ‘’
>                             
> outputStream.write(Unicode(json.dumps(result[‘origColor’])))
> 
> flowFile = session.get()
> if (flowFile != None) :
>               # the following line is all I can get to work currently. It is 
> not what I need. It replaces the flowFile data payload…
>               flowFile = session.write(flowFile, PyStreamCallback())
> 
>               # I made an attempt to add the new parsed color value as an 
> attribute here, but failed…
>               # flowFile = 
> session.putAttribute(flowFile,”parsedColor”,PyStreamCallback())
> 
>               session.transfer(flowFile,REL_SUCCESS)

Attachment: signature.asc
Description: Message signed with OpenPGP using GPGMail

Reply via email to