In your PyStreamCallback.process() method, you are calling
self.getAttribute(), which I would expect to fail because "self" is a
PyStreamCallback, not a FlowFile. Instead you have access to the attributes
in self.attrs, so try replacing self.getAttribute('ABCDE') with
self.attrs['ABCDE']Regards, Matt On Fri, May 5, 2017 at 2:42 PM, James McMahon <[email protected]> wrote: > Matt, here is what I have tried. No error is thrown, but I can't reference > attribute ABCD in the callback function. What am I misunderstanding? -Jim > > class PyStreamCallback(StreamCallback) : > def __init__(self, attrs): > self.attrs = attrs > def process(self,inputStream, outputStream): > outputStream.write(Unicode(json.dumps(self.getAttribute('ABC > DE'))) > > class UpdateAttributes(Processor) : > > def __init__(self) : > self.result = {} > self.__rel_success = Relationship.Builder().name("s > uccess".description("Success").build() > > ( I def an initialize, getRelationships, validate, > getPropertyDescriptor, getPropertyDescriptors, and onPropertyModified) > > def onTrigger(self, context, sessionFactory): > session = sessionFactory.createSession() > try : > flowfiles = session.get(20) > for flowfile in flowfiles : > if flowfile is none : > return > > flowfile = session.putAttribute(flowfile, > "ABCDE", "LARRY_CURLEY_MOE") > > flowfile = session.write(flowfile,PyStrea > mCallback(PySet(flowfile.getAttributes()))) > > session.transfer(flowfile, self.__rel_success) > session.commit() > except: > session.rollback() > raise > > processor = UpdateAttributes() > > On Tue, May 2, 2017 at 1:26 PM, Matt Burgess <[email protected]> wrote: > >> If you want to use attributes inside the callback, I recommend building a >> dictionary from flowfile.getAttributes() and passing that into the >> PyStreamCallback constructor: >> >> class PyStreamCallback(StreamCallback): >> def __init__(self, attrs): >> self.attrs = attrs >> >> >> # ... later on ... >> session.write(flowfile, PyStreamCallback(PySet(flowfil >> e.getAttributes()))) >> >> Or something like that. I thought I'd seen an example somewhere but I >> can't find it. >> >> Regards, >> Matt >> >> On Tue, May 2, 2017 at 1:06 PM, Andy LoPresto <[email protected]> >> wrote: >> >>> I am not a Python expert, but if you set “self.result[“x”]” in one >>> class, can you reference it in a separate class? What is the exception you >>> are getting? >>> >>> Andy LoPresto >>> [email protected] >>> *[email protected] <[email protected]>* >>> PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4 BACE 3C6E F65B 2F7D EF69 >>> >>> On May 2, 2017, at 1:01 PM, James McMahon <[email protected]> wrote: >>> >>> Thanks for your reply Matt. I think this cuts to the heart of my failure >>> here. I have tried treating PyStreamCallback() as a second class in my >>> python file, like so: >>> >>> class PyStreamCallback(StreamCallback) : >>> def __init__(self): >>> pass >>> def process(self, inputStream, outputStream) : >>> outputStream.write(Unicode(json.dumps(self.result['thisThing >>> ']))) >>> >>> classUpdateAttributes(Processor) : >>> def __init__(self) : >>> self.result{} >>> self.__rel_success = Relationship......etc etc >>> . >>> . >>> . >>> def onTrigger(self, context, sessionFactory): >>> . >>> . >>> . >>> self.result['thisThing'] = flowfile.getAttribute("s3.key") >>> # this fails: flowfile = session.write(flowfile,PyStrea >>> mCallback()) >>> # this fails too: flowfile = session.write(self,flowfile,Py >>> StreamCallback()) >>> >>> Am I mistaken to configure PyStreamCallback as a second >>> independent class? Should it be a defined method within class >>> UpdateAttributes() ? >>> >>> On Tue, May 2, 2017 at 12:45 PM, Matt Burgess <[email protected]> >>> wrote: >>> >>>> Jim, you still can/should use something like a PyStreamCallback(). >>>> ExecuteScript is basically an onTrigger() body, so you can use the same >>>> approach inside your onTrigger() body in InvokeScriptedProcessor. Pass an >>>> instance of your PyStreamCallback into something like: >>>> >>>> flowfile = session.write(flowfile, PyStreamCallback()) >>>> >>>> at some point before you transfer the flow file. If you need >>>> variables/data from outside the PyStreamCallback() inside, you can pass >>>> them into the constructor or (more dangerously) mess with the scope of the >>>> variable(s). >>>> >>>> Regards, >>>> Matt >>>> >>>> >>>> On Tue, May 2, 2017 at 12:39 PM, James McMahon <[email protected]> >>>> wrote: >>>> >>>>> This is an example found at https://github.com/apache/ >>>>> nifi/blob/master/nifi-nar-bundles/nifi-scripting-bundle/nifi >>>>> -scripting-processors/src/test/resources/jython/test_update_ >>>>> attribute.py . It shows how to set up InvokeScriptedProcessor for >>>>> python (jython) to add a simple attribute to a flowfile. >>>>> >>>>> How would I integrate in this structure a call to PyStreamCallback() >>>>> that allows me to replace the *contents* of the flowfile? Typically >>>>> PyStreamCallback() to accomplish this is its own class, but here it looks >>>>> like I would need to make it a method in the UpdateAttributes() class. >>>>> I've >>>>> been unable to get that to work. >>>>> >>>>> Thank you in advance for any help. >>>>> >>>>> #! /usr/bin/python >>>>> # >>>>> # Licensed to the Apache Software Foundation (ASF) under one >>>>> # or more contributor license agreements. See the NOTICE file >>>>> # distributed with this work for additional information >>>>> # regarding copyright ownership. The ASF licenses this file >>>>> # to you under the Apache License, Version 2.0 (the >>>>> # "License"); you may not use this file except in compliance >>>>> # with the License. You may obtain a copy of the License at >>>>> # >>>>> # http://www.apache.org/licenses/LICENSE-2.0 >>>>> # >>>>> # Unless required by applicable law or agreed to in writing, >>>>> # software distributed under the License is distributed on an >>>>> # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY >>>>> # KIND, either express or implied. See the License for the >>>>> # specific language governing permissions and limitations >>>>> # under the License. >>>>> # >>>>> import sys >>>>> import traceback >>>>> from org.apache.nifi.processor import Processor >>>>> from org.apache.nifi.processor import Relationship >>>>> from org.apache.nifi.components import PropertyDescriptor >>>>> from org.apache.nifi.processor.util import StandardValidators >>>>> class UpdateAttributes(Processor) : >>>>> __rel_success = Relationship.Builder().description("Success").name(" >>>>> success").build() >>>>> def __init__(self) : >>>>> pass >>>>> def initialize(self, context) : >>>>> pass >>>>> def getRelationships(self) : >>>>> return set([self.__rel_success]) >>>>> def validate(self, context) : >>>>> pass >>>>> def getPropertyDescriptors(self) : >>>>> descriptor = PropertyDescriptor.Builder().name("for-attributes" >>>>> ).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build() >>>>> return [descriptor] >>>>> def onPropertyModified(self, descriptor, newValue, oldValue) : >>>>> pass >>>>> def onTrigger(self, context, sessionFactory) : >>>>> session = sessionFactory.createSession() >>>>> try : >>>>> # ensure there is work to do >>>>> flowfile = session.get() >>>>> if flowfile is None : >>>>> return >>>>> # extract some attribute values >>>>> fromPropertyValue = context.getProperty("for-attributes").getValue() >>>>> fromAttributeValue = flowfile.getAttribute("for-attributes") >>>>> # set an attribute >>>>> flowfile = session.putAttribute(flowfile, "from-property", >>>>> fromPropertyValue) >>>>> flowfile = session.putAttribute(flowfile, "from-attribute", >>>>> fromAttributeValue) >>>>> # transfer >>>>> session.transfer(flowfile, self.__rel_success) >>>>> session.commit() >>>>> except : >>>>> print sys.exc_info()[0] >>>>> print "Exception in TestReader:" >>>>> print '-' * 60 >>>>> traceback.print_exc(file=sys.stdout) >>>>> print '-' * 60 >>>>> session.rollback(true) >>>>> raise >>>>> processor = UpdateAttributes() >>>>> >>>> >>>> >>> >>> >> >
