Matt, here is what I have tried. No error is thrown, but I can't reference
attribute ABCD in the callback function. What am I misunderstanding? -Jim
class PyStreamCallback(StreamCallback) :
def __init__(self, attrs):
self.attrs = attrs
def process(self,inputStream, outputStream):
outputStream.write(Unicode(json.dumps(self.getAttribute('ABCDE')))
class UpdateAttributes(Processor) :
def __init__(self) :
self.result = {}
self.__rel_success = Relationship.Builder().name("
success".description("Success").build()
( I def an initialize, getRelationships, validate,
getPropertyDescriptor, getPropertyDescriptors, and onPropertyModified)
def onTrigger(self, context, sessionFactory):
session = sessionFactory.createSession()
try :
flowfiles = session.get(20)
for flowfile in flowfiles :
if flowfile is none :
return
flowfile = session.putAttribute(flowfile, "ABCDE",
"LARRY_CURLEY_MOE")
flowfile = session.write(flowfile,
PyStreamCallback(PySet(flowfile.getAttributes())))
session.transfer(flowfile, self.__rel_success)
session.commit()
except:
session.rollback()
raise
processor = UpdateAttributes()
On Tue, May 2, 2017 at 1:26 PM, Matt Burgess <[email protected]> wrote:
> If you want to use attributes inside the callback, I recommend building a
> dictionary from flowfile.getAttributes() and passing that into the
> PyStreamCallback constructor:
>
> class PyStreamCallback(StreamCallback):
> def __init__(self, attrs):
> self.attrs = attrs
>
>
> # ... later on ...
> session.write(flowfile, PyStreamCallback(PySet(flowfile.getAttributes())))
>
> Or something like that. I thought I'd seen an example somewhere but I
> can't find it.
>
> Regards,
> Matt
>
> On Tue, May 2, 2017 at 1:06 PM, Andy LoPresto <[email protected]>
> wrote:
>
>> I am not a Python expert, but if you set “self.result[“x”]” in one class,
>> can you reference it in a separate class? What is the exception you are
>> getting?
>>
>> Andy LoPresto
>> [email protected]
>> *[email protected] <[email protected]>*
>> PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4 BACE 3C6E F65B 2F7D EF69
>>
>> On May 2, 2017, at 1:01 PM, James McMahon <[email protected]> wrote:
>>
>> Thanks for your reply Matt. I think this cuts to the heart of my failure
>> here. I have tried treating PyStreamCallback() as a second class in my
>> python file, like so:
>>
>> class PyStreamCallback(StreamCallback) :
>> def __init__(self):
>> pass
>> def process(self, inputStream, outputStream) :
>> outputStream.write(Unicode(json.dumps(self.result['thisThing
>> '])))
>>
>> classUpdateAttributes(Processor) :
>> def __init__(self) :
>> self.result{}
>> self.__rel_success = Relationship......etc etc
>> .
>> .
>> .
>> def onTrigger(self, context, sessionFactory):
>> .
>> .
>> .
>> self.result['thisThing'] = flowfile.getAttribute("s3.key")
>> # this fails: flowfile = session.write(flowfile,PyStrea
>> mCallback())
>> # this fails too: flowfile = session.write(self,flowfile,Py
>> StreamCallback())
>>
>> Am I mistaken to configure PyStreamCallback as a second
>> independent class? Should it be a defined method within class
>> UpdateAttributes() ?
>>
>> On Tue, May 2, 2017 at 12:45 PM, Matt Burgess <[email protected]>
>> wrote:
>>
>>> Jim, you still can/should use something like a PyStreamCallback().
>>> ExecuteScript is basically an onTrigger() body, so you can use the same
>>> approach inside your onTrigger() body in InvokeScriptedProcessor. Pass an
>>> instance of your PyStreamCallback into something like:
>>>
>>> flowfile = session.write(flowfile, PyStreamCallback())
>>>
>>> at some point before you transfer the flow file. If you need
>>> variables/data from outside the PyStreamCallback() inside, you can pass
>>> them into the constructor or (more dangerously) mess with the scope of the
>>> variable(s).
>>>
>>> Regards,
>>> Matt
>>>
>>>
>>> On Tue, May 2, 2017 at 12:39 PM, James McMahon <[email protected]>
>>> wrote:
>>>
>>>> This is an example found at https://github.com/apache/
>>>> nifi/blob/master/nifi-nar-bundles/nifi-scripting-bundle/nifi
>>>> -scripting-processors/src/test/resources/jython/test_update_
>>>> attribute.py . It shows how to set up InvokeScriptedProcessor for
>>>> python (jython) to add a simple attribute to a flowfile.
>>>>
>>>> How would I integrate in this structure a call to PyStreamCallback()
>>>> that allows me to replace the *contents* of the flowfile? Typically
>>>> PyStreamCallback() to accomplish this is its own class, but here it looks
>>>> like I would need to make it a method in the UpdateAttributes() class. I've
>>>> been unable to get that to work.
>>>>
>>>> Thank you in advance for any help.
>>>>
>>>> #! /usr/bin/python
>>>> #
>>>> # Licensed to the Apache Software Foundation (ASF) under one
>>>> # or more contributor license agreements. See the NOTICE file
>>>> # distributed with this work for additional information
>>>> # regarding copyright ownership. The ASF licenses this file
>>>> # to you under the Apache License, Version 2.0 (the
>>>> # "License"); you may not use this file except in compliance
>>>> # with the License. You may obtain a copy of the License at
>>>> #
>>>> # http://www.apache.org/licenses/LICENSE-2.0
>>>> #
>>>> # Unless required by applicable law or agreed to in writing,
>>>> # software distributed under the License is distributed on an
>>>> # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>>>> # KIND, either express or implied. See the License for the
>>>> # specific language governing permissions and limitations
>>>> # under the License.
>>>> #
>>>> import sys
>>>> import traceback
>>>> from org.apache.nifi.processor import Processor
>>>> from org.apache.nifi.processor import Relationship
>>>> from org.apache.nifi.components import PropertyDescriptor
>>>> from org.apache.nifi.processor.util import StandardValidators
>>>> class UpdateAttributes(Processor) :
>>>> __rel_success = Relationship.Builder().description("Success").name("
>>>> success").build()
>>>> def __init__(self) :
>>>> pass
>>>> def initialize(self, context) :
>>>> pass
>>>> def getRelationships(self) :
>>>> return set([self.__rel_success])
>>>> def validate(self, context) :
>>>> pass
>>>> def getPropertyDescriptors(self) :
>>>> descriptor = PropertyDescriptor.Builder().name("for-attributes"
>>>> ).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build()
>>>> return [descriptor]
>>>> def onPropertyModified(self, descriptor, newValue, oldValue) :
>>>> pass
>>>> def onTrigger(self, context, sessionFactory) :
>>>> session = sessionFactory.createSession()
>>>> try :
>>>> # ensure there is work to do
>>>> flowfile = session.get()
>>>> if flowfile is None :
>>>> return
>>>> # extract some attribute values
>>>> fromPropertyValue = context.getProperty("for-attributes").getValue()
>>>> fromAttributeValue = flowfile.getAttribute("for-attributes")
>>>> # set an attribute
>>>> flowfile = session.putAttribute(flowfile, "from-property",
>>>> fromPropertyValue)
>>>> flowfile = session.putAttribute(flowfile, "from-attribute",
>>>> fromAttributeValue)
>>>> # transfer
>>>> session.transfer(flowfile, self.__rel_success)
>>>> session.commit()
>>>> except :
>>>> print sys.exc_info()[0]
>>>> print "Exception in TestReader:"
>>>> print '-' * 60
>>>> traceback.print_exc(file=sys.stdout)
>>>> print '-' * 60
>>>> session.rollback(true)
>>>> raise
>>>> processor = UpdateAttributes()
>>>>
>>>
>>>
>>
>>
>