The putAttribute method does not care if the attribute already exists — it will 
create or overwrite it as necessary. I guess my example was not clear enough. 
The second parameter is the attribute name, so I provided one line that added a 
new attribute, and one that updated an existing attribute.

If you just want to extract an attribute value, manipulate it, and then add the 
new value as a new attribute, you shouldn’t need a stream callback at all (this 
is used to stream process the content of the flowfile — attributes are assumed 
to be small enough to be held completely in memory without causing problems).

(My Python is rusty so forgive/ignore syntax errors):

reversed_remarks = flowfile.getAttribute(‘message.REMARKS’)[::-1] # This would 
be .reverse() in Groovy
flowfile = session.putAttribute(flowfile, 
‘this-attribute-did-not-exist-before’, reversed_remarks)
session.transfer(flowfile, REL_SUCCESS)


Andy LoPresto
[email protected]
[email protected]
PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69

> On Jan 16, 2017, at 2:56 PM, James McMahon <[email protected]> wrote:
> 
> In the example you offer Andy, it seems like you are getting an attribute 
> property and value that already exists. In my case I have created a value as 
> a dictionary entry, and need to create it in the existing flowFile. I don't 
> understand how I can do this using your example.
> 
> Let's say I call my PyStreamCallback function with flowFile. I parse from an 
> incoming existing attribute value 'ABC', and I save that value to dictionary 
> entry resutl['new_name_for_new_property']. How do I add that as a new 
> property and attribute? I aplogize if I'm not seeing it offhand, but best I 
> can tell your example seems to be focused on an existing property and 
> attribute. Can you offer a few more details to help me understand?
> 
> Thank you again for your help.
> 
> On Mon, Jan 16, 2017 at 5:10 PM, Andy LoPresto <[email protected] 
> <mailto:[email protected]>> wrote:
> Here is a Python example from one of our test scripts:
> 
> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/test_update_attribute.py#L63
>  
> <https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/test_update_attribute.py#L63>
> 
> Andy LoPresto
> [email protected] <mailto:[email protected]>
> [email protected] <mailto:[email protected]>
> PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69
> 
>> On Jan 16, 2017, at 2:07 PM, Andy LoPresto <[email protected] 
>> <mailto:[email protected]>> wrote:
>> 
>> James,
>> 
>> You can use session.putAttribute() to do this. Anything you write to the 
>> output stream will go directly to flowfile content. I hope this makes sense.
>> 
>> (Groovy example)
>> 
>> flowFile = session.putAttribute(flowFile, ’some_new_attribute_name’, "This 
>> is content with ${stringInterpolation.toString()} and math ${ 3 * 2}”)
>> flowFile = session.putAttribute(flowFile, ’message.REMARKS’, “Here I am 
>> replacing the prior attribute value with this static string. ")
>> session.transfer(flowFile, REL_SUCCESS)
>> 
>> 
>> 
>> Andy LoPresto
>> [email protected] <mailto:[email protected]>
>> [email protected] <mailto:[email protected]>
>> PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69
>> 
>>> On Jan 16, 2017, at 1:57 PM, James McMahon <[email protected] 
>>> <mailto:[email protected]>> wrote:
>>> 
>>> I am having great difficulty getting a stream callback in python code to 
>>> simply add a new metadata attribute property with value to a flowfile. I 
>>> execute my code within an ExecuteScript processor. My incoming flowfile has 
>>> a number of complex attributes that contain embedded data of high interest 
>>> to my users. An attribute entitled REMARKS is one such example, containing 
>>> critical information that I parse out using regex in python. While simple 
>>> cases may allow me to use UpdateAttribute to add and modify attributes, I 
>>> find that they do not allow me to perform all the complex regex I 
>>> anticipate for my requirements.
>>> 
>>> My code below successfully parses the existing attribute, but it saves the 
>>> value(s) as a new data payload of my flowfile – not as a new attribute. I 
>>> must save the parsed result as a new attribute in the flowfile, and must 
>>> leave my flowfile data payload unchanged. How can I do this? This seems 
>>> like such a fundamental feature of common interest, and so I have to 
>>> believe I am missing the obvious.
>>> 
>>> (My code below was developed based on an example originally offered by Matt 
>>> B. I want to give him credit for his examples and thank him for getting me 
>>> started).
>>> 
>>> Please pardon single quote characters formatted improperly in Word by my 
>>> code retyping efforts below.
>>> 
>>> import json
>>> import re
>>> import java.io <http://java.io/>
>>> import csv
>>> from org.apache.commons.io <http://org.apache.commons.io/> import IOUtils
>>> from java.nio.charset import StandardCharsets
>>> from org.apache.nifi.processor.io <http://org.apache.nifi.processor.io/> 
>>> import StreamCallback
>>> 
>>> result = {} # define a dictionary
>>> def isNotEmpty(s):
>>>               return bool(s and s.strip());
>>> 
>>> def parseEmbeddedColor(s):
>>>               pattern = re.compile(r”””.*COLOR\=
>>>                                            (?P<m1>.*?)
>>>                                            \/\/
>>>                                            
>>> .*”””,re.IGNORECASE|re.DOTALL|re.VERBOSE)
>>> match = pattern.match(s)
>>> if match is None:
>>>               return ‘’
>>> thisMatch = match.group(“m1”)
>>> if thisMatch: return thisMatch
>>> else: return ‘’
>>> 
>>> class PyStreamCallback(StreamCallback) :
>>>               def __init__(self):
>>>                              pass
>>>               def process(self, inputStream, outputStream):
>>>                              if 
>>> isNotEmpty(flowFile.getAttribute(‘message.REMARKS’)):
>>>                                            incoming_metadata_comment = 
>>> flowFile.getAttribute(‘message.REMARKS’)
>>>                              else:
>>>                                            incoming_metadata_comment = ‘’
>>>                              origColor = 
>>> parseEmbeddedColor(incoming_metadata_content)
>>>                              if isNotEmpty(origColor):
>>>                                            result[‘origColor’] = origColor
>>>                              else:
>>>                                            result[‘origColor’] = ‘’
>>>                             
>>> outputStream.write(Unicode(json.dumps(result[‘origColor’])))
>>> 
>>> flowFile = session.get()
>>> if (flowFile != None) :
>>>               # the following line is all I can get to work currently. It 
>>> is not what I need. It replaces the flowFile data payload…
>>>               flowFile = session.write(flowFile, PyStreamCallback())
>>> 
>>>               # I made an attempt to add the new parsed color value as an 
>>> attribute here, but failed…
>>>               # flowFile = 
>>> session.putAttribute(flowFile,”parsedColor”,PyStreamCallback())
>>> 
>>>               session.transfer(flowFile,REL_SUCCESS)
>> 
> 
> 

Attachment: signature.asc
Description: Message signed with OpenPGP using GPGMail

Reply via email to