This makes perfect sense. Thank you very much for your assistance Andy.

On Mon, Jan 16, 2017 at 6:03 PM, Andy LoPresto <[email protected]> wrote:

> The putAttribute method does not care if the attribute already exists — it
> will create or overwrite it as necessary. I guess my example was not clear
> enough. The second parameter is the attribute name, so I provided one line
> that added a new attribute, and one that updated an existing attribute.
>
> If you just want to extract an attribute value, manipulate it, and then
> add the new value as a new attribute, you shouldn’t need a stream callback
> at all (this is used to stream process the content of the flowfile —
> attributes are assumed to be small enough to be held completely in memory
> without causing problems).
>
> (My Python is rusty so forgive/ignore syntax errors):
>
> reversed_remarks = flowfile.getAttribute(‘message.REMARKS’)[::-1] # This
> would be .reverse() in Groovy
> flowfile = session.putAttribute(flowfile, 
> ‘this-attribute-did-not-exist-before’,
> reversed_remarks)
> session.transfer(flowfile, REL_SUCCESS)
>
>
> Andy LoPresto
> [email protected]
> *[email protected] <[email protected]>*
> PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69
>
> On Jan 16, 2017, at 2:56 PM, James McMahon <[email protected]> wrote:
>
> In the example you offer Andy, it seems like you are getting an attribute
> property and value that already exists. In my case I have created a value
> as a dictionary entry, and need to create it in the existing flowFile. I
> don't understand how I can do this using your example.
>
> Let's say I call my PyStreamCallback function with flowFile. I parse from
> an incoming existing attribute value 'ABC', and I save that value to
> dictionary entry resutl['new_name_for_new_property']. How do I add that
> as a new property and attribute? I aplogize if I'm not seeing it offhand,
> but best I can tell your example seems to be focused on an existing
> property and attribute. Can you offer a few more details to help me
> understand?
>
> Thank you again for your help.
>
> On Mon, Jan 16, 2017 at 5:10 PM, Andy LoPresto <[email protected]>
> wrote:
>
>> Here is a Python example from one of our test scripts:
>>
>> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/
>> nifi-scripting-bundle/nifi-scripting-processors/src/test/
>> resources/jython/test_update_attribute.py#L63
>>
>> Andy LoPresto
>> [email protected]
>> *[email protected] <[email protected]>*
>> PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69
>>
>> On Jan 16, 2017, at 2:07 PM, Andy LoPresto <[email protected]> wrote:
>>
>> James,
>>
>> You can use session.putAttribute() to do this. Anything you write to the
>> output stream will go directly to flowfile content. I hope this makes
>> sense.
>>
>> (Groovy example)
>>
>> flowFile = session.putAttribute(flowFile, ’some_new_attribute_name’, "This 
>> is content with ${stringInterpolation.toString()} and math ${ 3 * 2}”)
>>
>> flowFile = session.putAttribute(flowFile, ’message.REMARKS’, “Here I am 
>> replacing the prior attribute value with this static string. ")
>>
>> session.transfer(flowFile, REL_SUCCESS)
>>
>>
>>
>>
>> Andy LoPresto
>> [email protected]
>> *[email protected] <[email protected]>*
>> PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69
>>
>> On Jan 16, 2017, at 1:57 PM, James McMahon <[email protected]> wrote:
>>
>> I am having great difficulty getting a stream callback in python code to
>> simply add a new metadata attribute property with value to a flowfile. I
>> execute my code within an ExecuteScript processor. My incoming flowfile has
>> a number of complex attributes that contain embedded data of high interest
>> to my users. An attribute entitled REMARKS is one such example, containing
>> critical information that I parse out using regex in python. While simple
>> cases may allow me to use UpdateAttribute to add and modify attributes, I
>> find that they do not allow me to perform all the complex regex I
>> anticipate for my requirements.
>>
>>
>> My code below successfully parses the existing attribute, but it saves
>> the value(s) as a new data payload of my flowfile – not as a new attribute.
>> I must save the parsed result as a new attribute in the flowfile, and must
>> leave my flowfile data payload unchanged. How can I do this? This seems
>> like such a fundamental feature of common interest, and so I have to
>> believe I am missing the obvious.
>>
>>
>> (My code below was developed based on an example originally offered by
>> Matt B. I want to give him credit for his examples and thank him for
>> getting me started).
>>
>>
>> Please pardon single quote characters formatted improperly in Word by my
>> code retyping efforts below.
>>
>>
>> import json
>>
>> import re
>>
>> import java.io
>>
>> import csv
>>
>> from org.apache.commons.io import IOUtils
>>
>> from java.nio.charset import StandardCharsets
>>
>> from org.apache.nifi.processor.io import StreamCallback
>>
>>
>> result = {} # define a dictionary
>>
>> def isNotEmpty(s):
>>
>>               return bool(s and s.strip());
>>
>>
>> def parseEmbeddedColor(s):
>>
>>               pattern = re.compile(r”””.*COLOR\=
>>
>>                                            (?P<m1>.*?)
>>
>>                                            \/\/
>>
>>                                            .*”””,re.IGNORECASE|re.DOTALL|
>> re.VERBOSE)
>>
>> match = pattern.match(s)
>>
>> if match is None:
>>
>>               return ‘’
>>
>> thisMatch = match.group(“m1”)
>>
>> if thisMatch: return thisMatch
>>
>> else: return ‘’
>>
>>
>> class PyStreamCallback(StreamCallback) :
>>
>>               def __init__(self):
>>
>>                              pass
>>
>>               def process(self, inputStream, outputStream):
>>
>>                              if isNotEmpty(flowFile.getAttribu
>> te(‘message.REMARKS’)):
>>
>>                                            incoming_metadata_comment =
>> flowFile.getAttribute(‘message.REMARKS’)
>>
>>                              else:
>>
>>                                            incoming_metadata_comment = ‘’
>>
>>                              origColor = parseEmbeddedColor(incoming_me
>> tadata_content)
>>
>>                              if isNotEmpty(origColor):
>>
>>                                            result[‘origColor’] =
>> origColor
>>
>>                              else:
>>
>>                                            result[‘origColor’] = ‘’
>>
>>                             outputStream.write(Unicode(jso
>> n.dumps(result[‘origColor’])))
>>
>>
>> flowFile = session.get()
>>
>> if (flowFile != None) :
>>
>>               # the following line is all I can get to work currently. It
>> is not what I need. It replaces the flowFile data payload…
>>
>>               flowFile = session.write(flowFile, PyStreamCallback())
>>
>>
>>               # I made an attempt to add the new parsed color value as an
>> attribute here, but failed…
>>
>>               # flowFile = session.putAttribute(flowFile,
>> ”parsedColor”,PyStreamCallback())
>>
>>
>>               session.transfer(flowFile,REL_SUCCESS)
>>
>>
>>
>>
>
>

Reply via email to