In your PyStreamCallback.process() method, you are calling
self.getAttribute(), which I would expect to fail because "self" is a
PyStreamCallback, not a FlowFile. Instead you have access to the attributes
in self.attrs, so try replacing self.getAttribute('ABCDE') with
self.attrs['ABCDE']

Regards,
Matt

On Fri, May 5, 2017 at 2:42 PM, James McMahon <[email protected]> wrote:

> Matt, here is what I have tried. No error is thrown, but I can't reference
> attribute ABCD in the callback function. What am I misunderstanding?  -Jim
>
> class PyStreamCallback(StreamCallback) :
>      def __init__(self, attrs):
>           self.attrs = attrs
>      def process(self,inputStream, outputStream):
>           outputStream.write(Unicode(json.dumps(self.getAttribute('ABC
> DE')))
>
> class UpdateAttributes(Processor) :
>
>      def __init__(self) :
>           self.result = {}
>           self.__rel_success = Relationship.Builder().name("s
> uccess".description("Success").build()
>
>      ( I def an initialize, getRelationships, validate,
> getPropertyDescriptor, getPropertyDescriptors, and onPropertyModified)
>
>      def onTrigger(self, context, sessionFactory):
>           session = sessionFactory.createSession()
>           try :
>                flowfiles = session.get(20)
>                     for flowfile in flowfiles :
>                          if flowfile is none :
>                               return
>
>                          flowfile = session.putAttribute(flowfile,
> "ABCDE", "LARRY_CURLEY_MOE")
>
>                          flowfile = session.write(flowfile,PyStrea
> mCallback(PySet(flowfile.getAttributes())))
>
>                          session.transfer(flowfile, self.__rel_success)
>                  session.commit()
>           except:
>                 session.rollback()
>                 raise
>
> processor = UpdateAttributes()
>
> On Tue, May 2, 2017 at 1:26 PM, Matt Burgess <[email protected]> wrote:
>
>> If you want to use attributes inside the callback, I recommend building a
>> dictionary from flowfile.getAttributes() and passing that into the
>> PyStreamCallback constructor:
>>
>> class PyStreamCallback(StreamCallback):
>>     def __init__(self, attrs):
>>         self.attrs = attrs
>>
>>
>> # ... later on ...
>> session.write(flowfile, PyStreamCallback(PySet(flowfil
>> e.getAttributes())))
>>
>>  Or something like that.  I thought I'd seen an example somewhere but I
>> can't find it.
>>
>> Regards,
>> Matt
>>
>> On Tue, May 2, 2017 at 1:06 PM, Andy LoPresto <[email protected]>
>> wrote:
>>
>>> I am not a Python expert, but if you set “self.result[“x”]” in one
>>> class, can you reference it in a separate class? What is the exception you
>>> are getting?
>>>
>>> Andy LoPresto
>>> [email protected]
>>> *[email protected] <[email protected]>*
>>> PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69
>>>
>>> On May 2, 2017, at 1:01 PM, James McMahon <[email protected]> wrote:
>>>
>>> Thanks for your reply Matt. I think this cuts to the heart of my failure
>>> here. I have tried treating PyStreamCallback() as a second class in my
>>> python file, like so:
>>>
>>> class PyStreamCallback(StreamCallback) :
>>>      def __init__(self):
>>>           pass
>>>      def process(self, inputStream, outputStream) :
>>>           outputStream.write(Unicode(json.dumps(self.result['thisThing
>>> '])))
>>>
>>> classUpdateAttributes(Processor) :
>>>      def __init__(self) :
>>>           self.result{}
>>>           self.__rel_success = Relationship......etc etc
>>> .
>>> .
>>> .
>>>      def onTrigger(self, context, sessionFactory):
>>> .
>>> .
>>> .
>>>           self.result['thisThing'] = flowfile.getAttribute("s3.key")
>>>           # this fails:    flowfile = session.write(flowfile,PyStrea
>>> mCallback())
>>>           # this fails too:  flowfile = session.write(self,flowfile,Py
>>> StreamCallback())
>>>
>>> Am I mistaken to configure PyStreamCallback as a second
>>> independent class? Should it be a defined method within class
>>> UpdateAttributes() ?
>>>
>>> On Tue, May 2, 2017 at 12:45 PM, Matt Burgess <[email protected]>
>>> wrote:
>>>
>>>> Jim, you still can/should use something like a PyStreamCallback().
>>>> ExecuteScript is basically an onTrigger() body, so you can use the same
>>>> approach inside your onTrigger() body in InvokeScriptedProcessor.  Pass an
>>>> instance of your PyStreamCallback into something like:
>>>>
>>>> flowfile = session.write(flowfile, PyStreamCallback())
>>>>
>>>> at some point before you transfer the flow file. If you need
>>>> variables/data from outside the PyStreamCallback() inside, you can pass
>>>> them into the constructor or (more dangerously) mess with the scope of the
>>>> variable(s).
>>>>
>>>> Regards,
>>>> Matt
>>>>
>>>>
>>>> On Tue, May 2, 2017 at 12:39 PM, James McMahon <[email protected]>
>>>> wrote:
>>>>
>>>>> This is an example found at  https://github.com/apache/
>>>>> nifi/blob/master/nifi-nar-bundles/nifi-scripting-bundle/nifi
>>>>> -scripting-processors/src/test/resources/jython/test_update_
>>>>> attribute.py  . It shows how to set up InvokeScriptedProcessor for
>>>>> python (jython) to add a simple attribute to a flowfile.
>>>>>
>>>>> How would I integrate in this structure a call to PyStreamCallback()
>>>>> that allows me to replace the *contents* of the flowfile? Typically
>>>>> PyStreamCallback() to accomplish this is its own class, but here it looks
>>>>> like I would need to make it a method in the UpdateAttributes() class. 
>>>>> I've
>>>>> been unable to get that to work.
>>>>>
>>>>> Thank you in advance for any help.
>>>>>
>>>>> #! /usr/bin/python
>>>>> #
>>>>> # Licensed to the Apache Software Foundation (ASF) under one
>>>>> # or more contributor license agreements. See the NOTICE file
>>>>> # distributed with this work for additional information
>>>>> # regarding copyright ownership. The ASF licenses this file
>>>>> # to you under the Apache License, Version 2.0 (the
>>>>> # "License"); you may not use this file except in compliance
>>>>> # with the License. You may obtain a copy of the License at
>>>>> #
>>>>> # http://www.apache.org/licenses/LICENSE-2.0
>>>>> #
>>>>> # Unless required by applicable law or agreed to in writing,
>>>>> # software distributed under the License is distributed on an
>>>>> # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>>>>> # KIND, either express or implied. See the License for the
>>>>> # specific language governing permissions and limitations
>>>>> # under the License.
>>>>> #
>>>>> import sys
>>>>> import traceback
>>>>> from org.apache.nifi.processor import Processor
>>>>> from org.apache.nifi.processor import Relationship
>>>>> from org.apache.nifi.components import PropertyDescriptor
>>>>> from org.apache.nifi.processor.util import StandardValidators
>>>>> class UpdateAttributes(Processor) :
>>>>> __rel_success = Relationship.Builder().description("Success").name("
>>>>> success").build()
>>>>> def __init__(self) :
>>>>> pass
>>>>> def initialize(self, context) :
>>>>> pass
>>>>> def getRelationships(self) :
>>>>> return set([self.__rel_success])
>>>>> def validate(self, context) :
>>>>> pass
>>>>> def getPropertyDescriptors(self) :
>>>>> descriptor = PropertyDescriptor.Builder().name("for-attributes"
>>>>> ).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build()
>>>>> return [descriptor]
>>>>> def onPropertyModified(self, descriptor, newValue, oldValue) :
>>>>> pass
>>>>> def onTrigger(self, context, sessionFactory) :
>>>>> session = sessionFactory.createSession()
>>>>> try :
>>>>> # ensure there is work to do
>>>>> flowfile = session.get()
>>>>> if flowfile is None :
>>>>> return
>>>>> # extract some attribute values
>>>>> fromPropertyValue = context.getProperty("for-attributes").getValue()
>>>>> fromAttributeValue = flowfile.getAttribute("for-attributes")
>>>>> # set an attribute
>>>>> flowfile = session.putAttribute(flowfile, "from-property",
>>>>> fromPropertyValue)
>>>>> flowfile = session.putAttribute(flowfile, "from-attribute",
>>>>> fromAttributeValue)
>>>>> # transfer
>>>>> session.transfer(flowfile, self.__rel_success)
>>>>> session.commit()
>>>>> except :
>>>>> print sys.exc_info()[0]
>>>>> print "Exception in TestReader:"
>>>>> print '-' * 60
>>>>> traceback.print_exc(file=sys.stdout)
>>>>> print '-' * 60
>>>>> session.rollback(true)
>>>>> raise
>>>>> processor = UpdateAttributes()
>>>>>
>>>>
>>>>
>>>
>>>
>>
>

Reply via email to