Nope, I sure can't. I thought about making a global dictionary results {},
but I hate to make stuff global. and I still couldn't get the callback to
recognize self.result[]So I then tried to pass in result to the callback but it would not recognize self.result['mykey']. It threw an error because it did not recognize result. I will recreate that error and send to you Andy. I have not yet tried what Matt shows below - passing in the dictionary as attrs. I plan to try that next. I will report back. On Tue, May 2, 2017 at 1:06 PM, Andy LoPresto <[email protected]> wrote: > I am not a Python expert, but if you set “self.result[“x”]” in one class, > can you reference it in a separate class? What is the exception you are > getting? > > Andy LoPresto > [email protected] > *[email protected] <[email protected]>* > PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4 BACE 3C6E F65B 2F7D EF69 > > On May 2, 2017, at 1:01 PM, James McMahon <[email protected]> wrote: > > Thanks for your reply Matt. I think this cuts to the heart of my failure > here. I have tried treating PyStreamCallback() as a second class in my > python file, like so: > > class PyStreamCallback(StreamCallback) : > def __init__(self): > pass > def process(self, inputStream, outputStream) : > outputStream.write(Unicode(json.dumps(self.result[' > thisThing']))) > > classUpdateAttributes(Processor) : > def __init__(self) : > self.result{} > self.__rel_success = Relationship......etc etc > . > . > . > def onTrigger(self, context, sessionFactory): > . > . > . > self.result['thisThing'] = flowfile.getAttribute("s3.key") > # this fails: flowfile = session.write(flowfile, > PyStreamCallback()) > # this fails too: flowfile = session.write(self,flowfile, > PyStreamCallback()) > > Am I mistaken to configure PyStreamCallback as a second independent class? > Should it be a defined method within class UpdateAttributes() ? > > On Tue, May 2, 2017 at 12:45 PM, Matt Burgess <[email protected]> > wrote: > >> Jim, you still can/should use something like a PyStreamCallback(). >> ExecuteScript is basically an onTrigger() body, so you can use the same >> approach inside your onTrigger() body in InvokeScriptedProcessor. Pass an >> instance of your PyStreamCallback into something like: >> >> flowfile = session.write(flowfile, PyStreamCallback()) >> >> at some point before you transfer the flow file. If you need >> variables/data from outside the PyStreamCallback() inside, you can pass >> them into the constructor or (more dangerously) mess with the scope of the >> variable(s). >> >> Regards, >> Matt >> >> >> On Tue, May 2, 2017 at 12:39 PM, James McMahon <[email protected]> >> wrote: >> >>> This is an example found at https://github.com/apache/ >>> nifi/blob/master/nifi-nar-bundles/nifi-scripting-bundle/nifi >>> -scripting-processors/src/test/resources/jython/test_update_attribute.py >>> . It shows how to set up InvokeScriptedProcessor for python (jython) to add >>> a simple attribute to a flowfile. >>> >>> How would I integrate in this structure a call to PyStreamCallback() >>> that allows me to replace the *contents* of the flowfile? Typically >>> PyStreamCallback() to accomplish this is its own class, but here it looks >>> like I would need to make it a method in the UpdateAttributes() class. I've >>> been unable to get that to work. >>> >>> Thank you in advance for any help. >>> >>> #! /usr/bin/python >>> # >>> # Licensed to the Apache Software Foundation (ASF) under one >>> # or more contributor license agreements. See the NOTICE file >>> # distributed with this work for additional information >>> # regarding copyright ownership. The ASF licenses this file >>> # to you under the Apache License, Version 2.0 (the >>> # "License"); you may not use this file except in compliance >>> # with the License. You may obtain a copy of the License at >>> # >>> # http://www.apache.org/licenses/LICENSE-2.0 >>> # >>> # Unless required by applicable law or agreed to in writing, >>> # software distributed under the License is distributed on an >>> # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY >>> # KIND, either express or implied. See the License for the >>> # specific language governing permissions and limitations >>> # under the License. >>> # >>> import sys >>> import traceback >>> from org.apache.nifi.processor import Processor >>> from org.apache.nifi.processor import Relationship >>> from org.apache.nifi.components import PropertyDescriptor >>> from org.apache.nifi.processor.util import StandardValidators >>> class UpdateAttributes(Processor) : >>> __rel_success = Relationship.Builder().description("Success").name(" >>> success").build() >>> def __init__(self) : >>> pass >>> def initialize(self, context) : >>> pass >>> def getRelationships(self) : >>> return set([self.__rel_success]) >>> def validate(self, context) : >>> pass >>> def getPropertyDescriptors(self) : >>> descriptor = PropertyDescriptor.Builder().name("for-attributes" >>> ).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build() >>> return [descriptor] >>> def onPropertyModified(self, descriptor, newValue, oldValue) : >>> pass >>> def onTrigger(self, context, sessionFactory) : >>> session = sessionFactory.createSession() >>> try : >>> # ensure there is work to do >>> flowfile = session.get() >>> if flowfile is None : >>> return >>> # extract some attribute values >>> fromPropertyValue = context.getProperty("for-attributes").getValue() >>> fromAttributeValue = flowfile.getAttribute("for-attributes") >>> # set an attribute >>> flowfile = session.putAttribute(flowfile, "from-property", >>> fromPropertyValue) >>> flowfile = session.putAttribute(flowfile, "from-attribute", >>> fromAttributeValue) >>> # transfer >>> session.transfer(flowfile, self.__rel_success) >>> session.commit() >>> except : >>> print sys.exc_info()[0] >>> print "Exception in TestReader:" >>> print '-' * 60 >>> traceback.print_exc(file=sys.stdout) >>> print '-' * 60 >>> session.rollback(true) >>> raise >>> processor = UpdateAttributes() >>> >> >> > >
