The putAttribute method does not care if the attribute already exists — it will create or overwrite it as necessary. I guess my example was not clear enough. The second parameter is the attribute name, so I provided one line that added a new attribute, and one that updated an existing attribute.
If you just want to extract an attribute value, manipulate it, and then add the new value as a new attribute, you shouldn’t need a stream callback at all (this is used to stream process the content of the flowfile — attributes are assumed to be small enough to be held completely in memory without causing problems). (My Python is rusty so forgive/ignore syntax errors): reversed_remarks = flowfile.getAttribute(‘message.REMARKS’)[::-1] # This would be .reverse() in Groovy flowfile = session.putAttribute(flowfile, ‘this-attribute-did-not-exist-before’, reversed_remarks) session.transfer(flowfile, REL_SUCCESS) Andy LoPresto [email protected] [email protected] PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4 BACE 3C6E F65B 2F7D EF69 > On Jan 16, 2017, at 2:56 PM, James McMahon <[email protected]> wrote: > > In the example you offer Andy, it seems like you are getting an attribute > property and value that already exists. In my case I have created a value as > a dictionary entry, and need to create it in the existing flowFile. I don't > understand how I can do this using your example. > > Let's say I call my PyStreamCallback function with flowFile. I parse from an > incoming existing attribute value 'ABC', and I save that value to dictionary > entry resutl['new_name_for_new_property']. How do I add that as a new > property and attribute? I aplogize if I'm not seeing it offhand, but best I > can tell your example seems to be focused on an existing property and > attribute. Can you offer a few more details to help me understand? > > Thank you again for your help. > > On Mon, Jan 16, 2017 at 5:10 PM, Andy LoPresto <[email protected] > <mailto:[email protected]>> wrote: > Here is a Python example from one of our test scripts: > > https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/test_update_attribute.py#L63 > > <https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/test_update_attribute.py#L63> > > Andy LoPresto > [email protected] <mailto:[email protected]> > [email protected] <mailto:[email protected]> > PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4 BACE 3C6E F65B 2F7D EF69 > >> On Jan 16, 2017, at 2:07 PM, Andy LoPresto <[email protected] >> <mailto:[email protected]>> wrote: >> >> James, >> >> You can use session.putAttribute() to do this. Anything you write to the >> output stream will go directly to flowfile content. I hope this makes sense. >> >> (Groovy example) >> >> flowFile = session.putAttribute(flowFile, ’some_new_attribute_name’, "This >> is content with ${stringInterpolation.toString()} and math ${ 3 * 2}”) >> flowFile = session.putAttribute(flowFile, ’message.REMARKS’, “Here I am >> replacing the prior attribute value with this static string. ") >> session.transfer(flowFile, REL_SUCCESS) >> >> >> >> Andy LoPresto >> [email protected] <mailto:[email protected]> >> [email protected] <mailto:[email protected]> >> PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4 BACE 3C6E F65B 2F7D EF69 >> >>> On Jan 16, 2017, at 1:57 PM, James McMahon <[email protected] >>> <mailto:[email protected]>> wrote: >>> >>> I am having great difficulty getting a stream callback in python code to >>> simply add a new metadata attribute property with value to a flowfile. I >>> execute my code within an ExecuteScript processor. My incoming flowfile has >>> a number of complex attributes that contain embedded data of high interest >>> to my users. An attribute entitled REMARKS is one such example, containing >>> critical information that I parse out using regex in python. While simple >>> cases may allow me to use UpdateAttribute to add and modify attributes, I >>> find that they do not allow me to perform all the complex regex I >>> anticipate for my requirements. >>> >>> My code below successfully parses the existing attribute, but it saves the >>> value(s) as a new data payload of my flowfile – not as a new attribute. I >>> must save the parsed result as a new attribute in the flowfile, and must >>> leave my flowfile data payload unchanged. How can I do this? This seems >>> like such a fundamental feature of common interest, and so I have to >>> believe I am missing the obvious. >>> >>> (My code below was developed based on an example originally offered by Matt >>> B. I want to give him credit for his examples and thank him for getting me >>> started). >>> >>> Please pardon single quote characters formatted improperly in Word by my >>> code retyping efforts below. >>> >>> import json >>> import re >>> import java.io <http://java.io/> >>> import csv >>> from org.apache.commons.io <http://org.apache.commons.io/> import IOUtils >>> from java.nio.charset import StandardCharsets >>> from org.apache.nifi.processor.io <http://org.apache.nifi.processor.io/> >>> import StreamCallback >>> >>> result = {} # define a dictionary >>> def isNotEmpty(s): >>> return bool(s and s.strip()); >>> >>> def parseEmbeddedColor(s): >>> pattern = re.compile(r”””.*COLOR\= >>> (?P<m1>.*?) >>> \/\/ >>> >>> .*”””,re.IGNORECASE|re.DOTALL|re.VERBOSE) >>> match = pattern.match(s) >>> if match is None: >>> return ‘’ >>> thisMatch = match.group(“m1”) >>> if thisMatch: return thisMatch >>> else: return ‘’ >>> >>> class PyStreamCallback(StreamCallback) : >>> def __init__(self): >>> pass >>> def process(self, inputStream, outputStream): >>> if >>> isNotEmpty(flowFile.getAttribute(‘message.REMARKS’)): >>> incoming_metadata_comment = >>> flowFile.getAttribute(‘message.REMARKS’) >>> else: >>> incoming_metadata_comment = ‘’ >>> origColor = >>> parseEmbeddedColor(incoming_metadata_content) >>> if isNotEmpty(origColor): >>> result[‘origColor’] = origColor >>> else: >>> result[‘origColor’] = ‘’ >>> >>> outputStream.write(Unicode(json.dumps(result[‘origColor’]))) >>> >>> flowFile = session.get() >>> if (flowFile != None) : >>> # the following line is all I can get to work currently. It >>> is not what I need. It replaces the flowFile data payload… >>> flowFile = session.write(flowFile, PyStreamCallback()) >>> >>> # I made an attempt to add the new parsed color value as an >>> attribute here, but failed… >>> # flowFile = >>> session.putAttribute(flowFile,”parsedColor”,PyStreamCallback()) >>> >>> session.transfer(flowFile,REL_SUCCESS) >> > >
signature.asc
Description: Message signed with OpenPGP using GPGMail
