Jim,
Not sure my previous email went through, did you try self.attrs['ABCDE']
rather than self.getAttribute('ABCDE') in your PyStreamCallback.process
method?
I tried the following and it seems to work fine (added a logger for my own
debugging purposes):
import json
from org.apache.nifi.processor.io import StreamCallback
from org.apache.nifi.processor import Processor
from org.apache.nifi.processor import Relationship
from org.apache.nifi.components import PropertyDescriptor
from org.slf4j import Logger
from org.slf4j import LoggerFactory
class PyStreamCallback(StreamCallback) :
def __init__(self, attrs):
self.attrs = attrs
def process(self,inputStream, outputStream):
outputStream.write(unicode(json.dumps(self.attrs['ABCDE'])))
class UpdateAttributes(Processor) :
def __init__(self) :
self.result = {}
self.__rel_success =
Relationship.Builder().name("success").description("Success").build()
self.__logger = LoggerFactory.getLogger(self.getClass())
def initialize(self, context) :
pass
def getRelationships(self) :
return set([self.__rel_success])
def validate(self, context) :
pass
def getPropertyDescriptors(self) :
return []
def onPropertyModified(self, descriptor, newValue, oldValue) :
pass
def onTrigger(self, context, sessionFactory):
session = sessionFactory.createSession()
try :
flowfiles = session.get(20)
for flowfile in flowfiles :
if flowfile is None :
return
flowfile = session.putAttribute(flowfile, "ABCDE",
"LARRY_CURLEY_MOE")
flowfile =
session.write(flowfile,PyStreamCallback(flowfile.getAttributes()))
session.transfer(flowfile, self.__rel_success)
session.commit()
except Exception, e:
self.__logger.error(str(e), e)
session.rollback()
raise
processor = UpdateAttributes()
Regards,
Matt
On Fri, May 5, 2017 at 2:42 PM, James McMahon <[email protected]> wrote:
> Matt, here is what I have tried. No error is thrown, but I can't reference
> attribute ABCD in the callback function. What am I misunderstanding? -Jim
>
> class PyStreamCallback(StreamCallback) :
> def __init__(self, attrs):
> self.attrs = attrs
> def process(self,inputStream, outputStream):
> outputStream.write(Unicode(json.dumps(self.getAttribute('ABC
> DE')))
>
> class UpdateAttributes(Processor) :
>
> def __init__(self) :
> self.result = {}
> self.__rel_success = Relationship.Builder().name("s
> uccess".description("Success").build()
>
> ( I def an initialize, getRelationships, validate,
> getPropertyDescriptor, getPropertyDescriptors, and onPropertyModified)
>
> def onTrigger(self, context, sessionFactory):
> session = sessionFactory.createSession()
> try :
> flowfiles = session.get(20)
> for flowfile in flowfiles :
> if flowfile is none :
> return
>
> flowfile = session.putAttribute(flowfile,
> "ABCDE", "LARRY_CURLEY_MOE")
>
> flowfile = session.write(flowfile,PyStrea
> mCallback(PySet(flowfile.getAttributes())))
>
> session.transfer(flowfile, self.__rel_success)
> session.commit()
> except:
> session.rollback()
> raise
>
> processor = UpdateAttributes()
>
> On Tue, May 2, 2017 at 1:26 PM, Matt Burgess <[email protected]> wrote:
>
>> If you want to use attributes inside the callback, I recommend building a
>> dictionary from flowfile.getAttributes() and passing that into the
>> PyStreamCallback constructor:
>>
>> class PyStreamCallback(StreamCallback):
>> def __init__(self, attrs):
>> self.attrs = attrs
>>
>>
>> # ... later on ...
>> session.write(flowfile, PyStreamCallback(PySet(flowfil
>> e.getAttributes())))
>>
>> Or something like that. I thought I'd seen an example somewhere but I
>> can't find it.
>>
>> Regards,
>> Matt
>>
>> On Tue, May 2, 2017 at 1:06 PM, Andy LoPresto <[email protected]>
>> wrote:
>>
>>> I am not a Python expert, but if you set “self.result[“x”]” in one
>>> class, can you reference it in a separate class? What is the exception you
>>> are getting?
>>>
>>> Andy LoPresto
>>> [email protected]
>>> *[email protected] <[email protected]>*
>>> PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4 BACE 3C6E F65B 2F7D EF69
>>>
>>> On May 2, 2017, at 1:01 PM, James McMahon <[email protected]> wrote:
>>>
>>> Thanks for your reply Matt. I think this cuts to the heart of my failure
>>> here. I have tried treating PyStreamCallback() as a second class in my
>>> python file, like so:
>>>
>>> class PyStreamCallback(StreamCallback) :
>>> def __init__(self):
>>> pass
>>> def process(self, inputStream, outputStream) :
>>> outputStream.write(Unicode(json.dumps(self.result['thisThing
>>> '])))
>>>
>>> classUpdateAttributes(Processor) :
>>> def __init__(self) :
>>> self.result{}
>>> self.__rel_success = Relationship......etc etc
>>> .
>>> .
>>> .
>>> def onTrigger(self, context, sessionFactory):
>>> .
>>> .
>>> .
>>> self.result['thisThing'] = flowfile.getAttribute("s3.key")
>>> # this fails: flowfile = session.write(flowfile,PyStrea
>>> mCallback())
>>> # this fails too: flowfile = session.write(self,flowfile,Py
>>> StreamCallback())
>>>
>>> Am I mistaken to configure PyStreamCallback as a second
>>> independent class? Should it be a defined method within class
>>> UpdateAttributes() ?
>>>
>>> On Tue, May 2, 2017 at 12:45 PM, Matt Burgess <[email protected]>
>>> wrote:
>>>
>>>> Jim, you still can/should use something like a PyStreamCallback().
>>>> ExecuteScript is basically an onTrigger() body, so you can use the same
>>>> approach inside your onTrigger() body in InvokeScriptedProcessor. Pass an
>>>> instance of your PyStreamCallback into something like:
>>>>
>>>> flowfile = session.write(flowfile, PyStreamCallback())
>>>>
>>>> at some point before you transfer the flow file. If you need
>>>> variables/data from outside the PyStreamCallback() inside, you can pass
>>>> them into the constructor or (more dangerously) mess with the scope of the
>>>> variable(s).
>>>>
>>>> Regards,
>>>> Matt
>>>>
>>>>
>>>> On Tue, May 2, 2017 at 12:39 PM, James McMahon <[email protected]>
>>>> wrote:
>>>>
>>>>> This is an example found at https://github.com/apache/
>>>>> nifi/blob/master/nifi-nar-bundles/nifi-scripting-bundle/nifi
>>>>> -scripting-processors/src/test/resources/jython/test_update_
>>>>> attribute.py . It shows how to set up InvokeScriptedProcessor for
>>>>> python (jython) to add a simple attribute to a flowfile.
>>>>>
>>>>> How would I integrate in this structure a call to PyStreamCallback()
>>>>> that allows me to replace the *contents* of the flowfile? Typically
>>>>> PyStreamCallback() to accomplish this is its own class, but here it looks
>>>>> like I would need to make it a method in the UpdateAttributes() class.
>>>>> I've
>>>>> been unable to get that to work.
>>>>>
>>>>> Thank you in advance for any help.
>>>>>
>>>>> #! /usr/bin/python
>>>>> #
>>>>> # Licensed to the Apache Software Foundation (ASF) under one
>>>>> # or more contributor license agreements. See the NOTICE file
>>>>> # distributed with this work for additional information
>>>>> # regarding copyright ownership. The ASF licenses this file
>>>>> # to you under the Apache License, Version 2.0 (the
>>>>> # "License"); you may not use this file except in compliance
>>>>> # with the License. You may obtain a copy of the License at
>>>>> #
>>>>> # http://www.apache.org/licenses/LICENSE-2.0
>>>>> #
>>>>> # Unless required by applicable law or agreed to in writing,
>>>>> # software distributed under the License is distributed on an
>>>>> # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>>>>> # KIND, either express or implied. See the License for the
>>>>> # specific language governing permissions and limitations
>>>>> # under the License.
>>>>> #
>>>>> import sys
>>>>> import traceback
>>>>> from org.apache.nifi.processor import Processor
>>>>> from org.apache.nifi.processor import Relationship
>>>>> from org.apache.nifi.components import PropertyDescriptor
>>>>> from org.apache.nifi.processor.util import StandardValidators
>>>>> class UpdateAttributes(Processor) :
>>>>> __rel_success = Relationship.Builder().description("Success").name("
>>>>> success").build()
>>>>> def __init__(self) :
>>>>> pass
>>>>> def initialize(self, context) :
>>>>> pass
>>>>> def getRelationships(self) :
>>>>> return set([self.__rel_success])
>>>>> def validate(self, context) :
>>>>> pass
>>>>> def getPropertyDescriptors(self) :
>>>>> descriptor = PropertyDescriptor.Builder().name("for-attributes"
>>>>> ).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build()
>>>>> return [descriptor]
>>>>> def onPropertyModified(self, descriptor, newValue, oldValue) :
>>>>> pass
>>>>> def onTrigger(self, context, sessionFactory) :
>>>>> session = sessionFactory.createSession()
>>>>> try :
>>>>> # ensure there is work to do
>>>>> flowfile = session.get()
>>>>> if flowfile is None :
>>>>> return
>>>>> # extract some attribute values
>>>>> fromPropertyValue = context.getProperty("for-attributes").getValue()
>>>>> fromAttributeValue = flowfile.getAttribute("for-attributes")
>>>>> # set an attribute
>>>>> flowfile = session.putAttribute(flowfile, "from-property",
>>>>> fromPropertyValue)
>>>>> flowfile = session.putAttribute(flowfile, "from-attribute",
>>>>> fromAttributeValue)
>>>>> # transfer
>>>>> session.transfer(flowfile, self.__rel_success)
>>>>> session.commit()
>>>>> except :
>>>>> print sys.exc_info()[0]
>>>>> print "Exception in TestReader:"
>>>>> print '-' * 60
>>>>> traceback.print_exc(file=sys.stdout)
>>>>> print '-' * 60
>>>>> session.rollback(true)
>>>>> raise
>>>>> processor = UpdateAttributes()
>>>>>
>>>>
>>>>
>>>
>>>
>>
>