Here is a Python example from one of our test scripts: https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/test_update_attribute.py#L63 <https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/test_update_attribute.py#L63>
Andy LoPresto [email protected] [email protected] PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4 BACE 3C6E F65B 2F7D EF69 > On Jan 16, 2017, at 2:07 PM, Andy LoPresto <[email protected]> wrote: > > James, > > You can use session.putAttribute() to do this. Anything you write to the > output stream will go directly to flowfile content. I hope this makes sense. > > (Groovy example) > > flowFile = session.putAttribute(flowFile, ’some_new_attribute_name’, "This is > content with ${stringInterpolation.toString()} and math ${ 3 * 2}”) > flowFile = session.putAttribute(flowFile, ’message.REMARKS’, “Here I am > replacing the prior attribute value with this static string. ") > session.transfer(flowFile, REL_SUCCESS) > > > > Andy LoPresto > [email protected] <mailto:[email protected]> > [email protected] <mailto:[email protected]> > PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4 BACE 3C6E F65B 2F7D EF69 > >> On Jan 16, 2017, at 1:57 PM, James McMahon <[email protected] >> <mailto:[email protected]>> wrote: >> >> I am having great difficulty getting a stream callback in python code to >> simply add a new metadata attribute property with value to a flowfile. I >> execute my code within an ExecuteScript processor. My incoming flowfile has >> a number of complex attributes that contain embedded data of high interest >> to my users. An attribute entitled REMARKS is one such example, containing >> critical information that I parse out using regex in python. While simple >> cases may allow me to use UpdateAttribute to add and modify attributes, I >> find that they do not allow me to perform all the complex regex I anticipate >> for my requirements. >> >> My code below successfully parses the existing attribute, but it saves the >> value(s) as a new data payload of my flowfile – not as a new attribute. I >> must save the parsed result as a new attribute in the flowfile, and must >> leave my flowfile data payload unchanged. How can I do this? This seems like >> such a fundamental feature of common interest, and so I have to believe I am >> missing the obvious. >> >> (My code below was developed based on an example originally offered by Matt >> B. I want to give him credit for his examples and thank him for getting me >> started). >> >> Please pardon single quote characters formatted improperly in Word by my >> code retyping efforts below. >> >> import json >> import re >> import java.io <http://java.io/> >> import csv >> from org.apache.commons.io <http://org.apache.commons.io/> import IOUtils >> from java.nio.charset import StandardCharsets >> from org.apache.nifi.processor.io <http://org.apache.nifi.processor.io/> >> import StreamCallback >> >> result = {} # define a dictionary >> def isNotEmpty(s): >> return bool(s and s.strip()); >> >> def parseEmbeddedColor(s): >> pattern = re.compile(r”””.*COLOR\= >> (?P<m1>.*?) >> \/\/ >> >> .*”””,re.IGNORECASE|re.DOTALL|re.VERBOSE) >> match = pattern.match(s) >> if match is None: >> return ‘’ >> thisMatch = match.group(“m1”) >> if thisMatch: return thisMatch >> else: return ‘’ >> >> class PyStreamCallback(StreamCallback) : >> def __init__(self): >> pass >> def process(self, inputStream, outputStream): >> if >> isNotEmpty(flowFile.getAttribute(‘message.REMARKS’)): >> incoming_metadata_comment = >> flowFile.getAttribute(‘message.REMARKS’) >> else: >> incoming_metadata_comment = ‘’ >> origColor = >> parseEmbeddedColor(incoming_metadata_content) >> if isNotEmpty(origColor): >> result[‘origColor’] = origColor >> else: >> result[‘origColor’] = ‘’ >> >> outputStream.write(Unicode(json.dumps(result[‘origColor’]))) >> >> flowFile = session.get() >> if (flowFile != None) : >> # the following line is all I can get to work currently. It is >> not what I need. It replaces the flowFile data payload… >> flowFile = session.write(flowFile, PyStreamCallback()) >> >> # I made an attempt to add the new parsed color value as an >> attribute here, but failed… >> # flowFile = >> session.putAttribute(flowFile,”parsedColor”,PyStreamCallback()) >> >> session.transfer(flowFile,REL_SUCCESS) >
signature.asc
Description: Message signed with OpenPGP using GPGMail
