This makes perfect sense. Thank you very much for your assistance Andy. On Mon, Jan 16, 2017 at 6:03 PM, Andy LoPresto <[email protected]> wrote:
> The putAttribute method does not care if the attribute already exists — it > will create or overwrite it as necessary. I guess my example was not clear > enough. The second parameter is the attribute name, so I provided one line > that added a new attribute, and one that updated an existing attribute. > > If you just want to extract an attribute value, manipulate it, and then > add the new value as a new attribute, you shouldn’t need a stream callback > at all (this is used to stream process the content of the flowfile — > attributes are assumed to be small enough to be held completely in memory > without causing problems). > > (My Python is rusty so forgive/ignore syntax errors): > > reversed_remarks = flowfile.getAttribute(‘message.REMARKS’)[::-1] # This > would be .reverse() in Groovy > flowfile = session.putAttribute(flowfile, > ‘this-attribute-did-not-exist-before’, > reversed_remarks) > session.transfer(flowfile, REL_SUCCESS) > > > Andy LoPresto > [email protected] > *[email protected] <[email protected]>* > PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4 BACE 3C6E F65B 2F7D EF69 > > On Jan 16, 2017, at 2:56 PM, James McMahon <[email protected]> wrote: > > In the example you offer Andy, it seems like you are getting an attribute > property and value that already exists. In my case I have created a value > as a dictionary entry, and need to create it in the existing flowFile. I > don't understand how I can do this using your example. > > Let's say I call my PyStreamCallback function with flowFile. I parse from > an incoming existing attribute value 'ABC', and I save that value to > dictionary entry resutl['new_name_for_new_property']. How do I add that > as a new property and attribute? I aplogize if I'm not seeing it offhand, > but best I can tell your example seems to be focused on an existing > property and attribute. Can you offer a few more details to help me > understand? > > Thank you again for your help. > > On Mon, Jan 16, 2017 at 5:10 PM, Andy LoPresto <[email protected]> > wrote: > >> Here is a Python example from one of our test scripts: >> >> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/ >> nifi-scripting-bundle/nifi-scripting-processors/src/test/ >> resources/jython/test_update_attribute.py#L63 >> >> Andy LoPresto >> [email protected] >> *[email protected] <[email protected]>* >> PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4 BACE 3C6E F65B 2F7D EF69 >> >> On Jan 16, 2017, at 2:07 PM, Andy LoPresto <[email protected]> wrote: >> >> James, >> >> You can use session.putAttribute() to do this. Anything you write to the >> output stream will go directly to flowfile content. I hope this makes >> sense. >> >> (Groovy example) >> >> flowFile = session.putAttribute(flowFile, ’some_new_attribute_name’, "This >> is content with ${stringInterpolation.toString()} and math ${ 3 * 2}”) >> >> flowFile = session.putAttribute(flowFile, ’message.REMARKS’, “Here I am >> replacing the prior attribute value with this static string. ") >> >> session.transfer(flowFile, REL_SUCCESS) >> >> >> >> >> Andy LoPresto >> [email protected] >> *[email protected] <[email protected]>* >> PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4 BACE 3C6E F65B 2F7D EF69 >> >> On Jan 16, 2017, at 1:57 PM, James McMahon <[email protected]> wrote: >> >> I am having great difficulty getting a stream callback in python code to >> simply add a new metadata attribute property with value to a flowfile. I >> execute my code within an ExecuteScript processor. My incoming flowfile has >> a number of complex attributes that contain embedded data of high interest >> to my users. An attribute entitled REMARKS is one such example, containing >> critical information that I parse out using regex in python. While simple >> cases may allow me to use UpdateAttribute to add and modify attributes, I >> find that they do not allow me to perform all the complex regex I >> anticipate for my requirements. >> >> >> My code below successfully parses the existing attribute, but it saves >> the value(s) as a new data payload of my flowfile – not as a new attribute. >> I must save the parsed result as a new attribute in the flowfile, and must >> leave my flowfile data payload unchanged. How can I do this? This seems >> like such a fundamental feature of common interest, and so I have to >> believe I am missing the obvious. >> >> >> (My code below was developed based on an example originally offered by >> Matt B. I want to give him credit for his examples and thank him for >> getting me started). >> >> >> Please pardon single quote characters formatted improperly in Word by my >> code retyping efforts below. >> >> >> import json >> >> import re >> >> import java.io >> >> import csv >> >> from org.apache.commons.io import IOUtils >> >> from java.nio.charset import StandardCharsets >> >> from org.apache.nifi.processor.io import StreamCallback >> >> >> result = {} # define a dictionary >> >> def isNotEmpty(s): >> >> return bool(s and s.strip()); >> >> >> def parseEmbeddedColor(s): >> >> pattern = re.compile(r”””.*COLOR\= >> >> (?P<m1>.*?) >> >> \/\/ >> >> .*”””,re.IGNORECASE|re.DOTALL| >> re.VERBOSE) >> >> match = pattern.match(s) >> >> if match is None: >> >> return ‘’ >> >> thisMatch = match.group(“m1”) >> >> if thisMatch: return thisMatch >> >> else: return ‘’ >> >> >> class PyStreamCallback(StreamCallback) : >> >> def __init__(self): >> >> pass >> >> def process(self, inputStream, outputStream): >> >> if isNotEmpty(flowFile.getAttribu >> te(‘message.REMARKS’)): >> >> incoming_metadata_comment = >> flowFile.getAttribute(‘message.REMARKS’) >> >> else: >> >> incoming_metadata_comment = ‘’ >> >> origColor = parseEmbeddedColor(incoming_me >> tadata_content) >> >> if isNotEmpty(origColor): >> >> result[‘origColor’] = >> origColor >> >> else: >> >> result[‘origColor’] = ‘’ >> >> outputStream.write(Unicode(jso >> n.dumps(result[‘origColor’]))) >> >> >> flowFile = session.get() >> >> if (flowFile != None) : >> >> # the following line is all I can get to work currently. It >> is not what I need. It replaces the flowFile data payload… >> >> flowFile = session.write(flowFile, PyStreamCallback()) >> >> >> # I made an attempt to add the new parsed color value as an >> attribute here, but failed… >> >> # flowFile = session.putAttribute(flowFile, >> ”parsedColor”,PyStreamCallback()) >> >> >> session.transfer(flowFile,REL_SUCCESS) >> >> >> >> > >
