I've not yet tried that but will today, and will report back on my results. If it worked for you I am optimistic I can get it to work on my end too. Thank you very much Matt. Have a few other workflow improvements to make for customers this morning, and then will get back to you on this. -Jim
On Tue, May 9, 2017 at 3:00 PM, Matt Burgess <[email protected]> wrote: > Jim, > > Not sure my previous email went through, did you try self.attrs['ABCDE'] > rather than self.getAttribute('ABCDE') in your PyStreamCallback.process > method? > > I tried the following and it seems to work fine (added a logger for my own > debugging purposes): > > import json > from org.apache.nifi.processor.io import StreamCallback > from org.apache.nifi.processor import Processor > from org.apache.nifi.processor import Relationship > from org.apache.nifi.components import PropertyDescriptor > from org.slf4j import Logger > from org.slf4j import LoggerFactory > > class PyStreamCallback(StreamCallback) : > def __init__(self, attrs): > self.attrs = attrs > def process(self,inputStream, outputStream): > outputStream.write(unicode(json.dumps(self.attrs['ABCDE']))) > > class UpdateAttributes(Processor) : > def __init__(self) : > self.result = {} > self.__rel_success = Relationship.Builder().name(" > success").description("Success").build() > self.__logger = LoggerFactory.getLogger(self.getClass()) > > def initialize(self, context) : > pass > > def getRelationships(self) : > return set([self.__rel_success]) > > def validate(self, context) : > pass > > def getPropertyDescriptors(self) : > return [] > > def onPropertyModified(self, descriptor, newValue, oldValue) : > pass > > def onTrigger(self, context, sessionFactory): > session = sessionFactory.createSession() > try : > flowfiles = session.get(20) > for flowfile in flowfiles : > if flowfile is None : > return > > flowfile = session.putAttribute(flowfile, "ABCDE", > "LARRY_CURLEY_MOE") > flowfile = session.write(flowfile,PyStreamCallback(flowfile. > getAttributes())) > session.transfer(flowfile, self.__rel_success) > session.commit() > except Exception, e: > self.__logger.error(str(e), e) > session.rollback() > raise > > processor = UpdateAttributes() > > > Regards, > Matt > > On Fri, May 5, 2017 at 2:42 PM, James McMahon <[email protected]> > wrote: > >> Matt, here is what I have tried. No error is thrown, but I can't >> reference attribute ABCD in the callback function. What am I >> misunderstanding? -Jim >> >> class PyStreamCallback(StreamCallback) : >> def __init__(self, attrs): >> self.attrs = attrs >> def process(self,inputStream, outputStream): >> outputStream.write(Unicode(json.dumps(self.getAttribute('ABC >> DE'))) >> >> class UpdateAttributes(Processor) : >> >> def __init__(self) : >> self.result = {} >> self.__rel_success = Relationship.Builder().name("s >> uccess".description("Success").build() >> >> ( I def an initialize, getRelationships, validate, >> getPropertyDescriptor, getPropertyDescriptors, and onPropertyModified) >> >> def onTrigger(self, context, sessionFactory): >> session = sessionFactory.createSession() >> try : >> flowfiles = session.get(20) >> for flowfile in flowfiles : >> if flowfile is none : >> return >> >> flowfile = session.putAttribute(flowfile, >> "ABCDE", "LARRY_CURLEY_MOE") >> >> flowfile = session.write(flowfile,PyStrea >> mCallback(PySet(flowfile.getAttributes()))) >> >> session.transfer(flowfile, self.__rel_success) >> session.commit() >> except: >> session.rollback() >> raise >> >> processor = UpdateAttributes() >> >> On Tue, May 2, 2017 at 1:26 PM, Matt Burgess <[email protected]> >> wrote: >> >>> If you want to use attributes inside the callback, I recommend building >>> a dictionary from flowfile.getAttributes() and passing that into the >>> PyStreamCallback constructor: >>> >>> class PyStreamCallback(StreamCallback): >>> def __init__(self, attrs): >>> self.attrs = attrs >>> >>> >>> # ... later on ... >>> session.write(flowfile, PyStreamCallback(PySet(flowfil >>> e.getAttributes()))) >>> >>> Or something like that. I thought I'd seen an example somewhere but I >>> can't find it. >>> >>> Regards, >>> Matt >>> >>> On Tue, May 2, 2017 at 1:06 PM, Andy LoPresto <[email protected]> >>> wrote: >>> >>>> I am not a Python expert, but if you set “self.result[“x”]” in one >>>> class, can you reference it in a separate class? What is the exception you >>>> are getting? >>>> >>>> Andy LoPresto >>>> [email protected] >>>> *[email protected] <[email protected]>* >>>> PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4 BACE 3C6E F65B 2F7D EF69 >>>> >>>> On May 2, 2017, at 1:01 PM, James McMahon <[email protected]> wrote: >>>> >>>> Thanks for your reply Matt. I think this cuts to the heart of my >>>> failure here. I have tried treating PyStreamCallback() as a second class in >>>> my python file, like so: >>>> >>>> class PyStreamCallback(StreamCallback) : >>>> def __init__(self): >>>> pass >>>> def process(self, inputStream, outputStream) : >>>> outputStream.write(Unicode(json.dumps(self.result['thisThing >>>> ']))) >>>> >>>> classUpdateAttributes(Processor) : >>>> def __init__(self) : >>>> self.result{} >>>> self.__rel_success = Relationship......etc etc >>>> . >>>> . >>>> . >>>> def onTrigger(self, context, sessionFactory): >>>> . >>>> . >>>> . >>>> self.result['thisThing'] = flowfile.getAttribute("s3.key") >>>> # this fails: flowfile = session.write(flowfile,PyStrea >>>> mCallback()) >>>> # this fails too: flowfile = session.write(self,flowfile,Py >>>> StreamCallback()) >>>> >>>> Am I mistaken to configure PyStreamCallback as a second >>>> independent class? Should it be a defined method within class >>>> UpdateAttributes() ? >>>> >>>> On Tue, May 2, 2017 at 12:45 PM, Matt Burgess <[email protected]> >>>> wrote: >>>> >>>>> Jim, you still can/should use something like a PyStreamCallback(). >>>>> ExecuteScript is basically an onTrigger() body, so you can use the same >>>>> approach inside your onTrigger() body in InvokeScriptedProcessor. Pass an >>>>> instance of your PyStreamCallback into something like: >>>>> >>>>> flowfile = session.write(flowfile, PyStreamCallback()) >>>>> >>>>> at some point before you transfer the flow file. If you need >>>>> variables/data from outside the PyStreamCallback() inside, you can pass >>>>> them into the constructor or (more dangerously) mess with the scope of the >>>>> variable(s). >>>>> >>>>> Regards, >>>>> Matt >>>>> >>>>> >>>>> On Tue, May 2, 2017 at 12:39 PM, James McMahon <[email protected]> >>>>> wrote: >>>>> >>>>>> This is an example found at https://github.com/apache/ >>>>>> nifi/blob/master/nifi-nar-bundles/nifi-scripting-bundle/nifi >>>>>> -scripting-processors/src/test/resources/jython/test_update_ >>>>>> attribute.py . It shows how to set up InvokeScriptedProcessor for >>>>>> python (jython) to add a simple attribute to a flowfile. >>>>>> >>>>>> How would I integrate in this structure a call to PyStreamCallback() >>>>>> that allows me to replace the *contents* of the flowfile? Typically >>>>>> PyStreamCallback() to accomplish this is its own class, but here it looks >>>>>> like I would need to make it a method in the UpdateAttributes() class. >>>>>> I've >>>>>> been unable to get that to work. >>>>>> >>>>>> Thank you in advance for any help. >>>>>> >>>>>> #! /usr/bin/python >>>>>> # >>>>>> # Licensed to the Apache Software Foundation (ASF) under one >>>>>> # or more contributor license agreements. See the NOTICE file >>>>>> # distributed with this work for additional information >>>>>> # regarding copyright ownership. The ASF licenses this file >>>>>> # to you under the Apache License, Version 2.0 (the >>>>>> # "License"); you may not use this file except in compliance >>>>>> # with the License. You may obtain a copy of the License at >>>>>> # >>>>>> # http://www.apache.org/licenses/LICENSE-2.0 >>>>>> # >>>>>> # Unless required by applicable law or agreed to in writing, >>>>>> # software distributed under the License is distributed on an >>>>>> # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY >>>>>> # KIND, either express or implied. See the License for the >>>>>> # specific language governing permissions and limitations >>>>>> # under the License. >>>>>> # >>>>>> import sys >>>>>> import traceback >>>>>> from org.apache.nifi.processor import Processor >>>>>> from org.apache.nifi.processor import Relationship >>>>>> from org.apache.nifi.components import PropertyDescriptor >>>>>> from org.apache.nifi.processor.util import StandardValidators >>>>>> class UpdateAttributes(Processor) : >>>>>> __rel_success = Relationship.Builder().description("Success").name(" >>>>>> success").build() >>>>>> def __init__(self) : >>>>>> pass >>>>>> def initialize(self, context) : >>>>>> pass >>>>>> def getRelationships(self) : >>>>>> return set([self.__rel_success]) >>>>>> def validate(self, context) : >>>>>> pass >>>>>> def getPropertyDescriptors(self) : >>>>>> descriptor = PropertyDescriptor.Builder().name("for-attributes" >>>>>> ).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build() >>>>>> return [descriptor] >>>>>> def onPropertyModified(self, descriptor, newValue, oldValue) : >>>>>> pass >>>>>> def onTrigger(self, context, sessionFactory) : >>>>>> session = sessionFactory.createSession() >>>>>> try : >>>>>> # ensure there is work to do >>>>>> flowfile = session.get() >>>>>> if flowfile is None : >>>>>> return >>>>>> # extract some attribute values >>>>>> fromPropertyValue = context.getProperty("for-attributes").getValue() >>>>>> fromAttributeValue = flowfile.getAttribute("for-attributes") >>>>>> # set an attribute >>>>>> flowfile = session.putAttribute(flowfile, "from-property", >>>>>> fromPropertyValue) >>>>>> flowfile = session.putAttribute(flowfile, "from-attribute", >>>>>> fromAttributeValue) >>>>>> # transfer >>>>>> session.transfer(flowfile, self.__rel_success) >>>>>> session.commit() >>>>>> except : >>>>>> print sys.exc_info()[0] >>>>>> print "Exception in TestReader:" >>>>>> print '-' * 60 >>>>>> traceback.print_exc(file=sys.stdout) >>>>>> print '-' * 60 >>>>>> session.rollback(true) >>>>>> raise >>>>>> processor = UpdateAttributes() >>>>>> >>>>> >>>>> >>>> >>>> >>> >> >
