I am not a Python expert, but if you set “self.result[“x”]” in one class, can you reference it in a separate class? What is the exception you are getting?
Andy LoPresto [email protected] [email protected] PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4 BACE 3C6E F65B 2F7D EF69 > On May 2, 2017, at 1:01 PM, James McMahon <[email protected]> wrote: > > Thanks for your reply Matt. I think this cuts to the heart of my failure > here. I have tried treating PyStreamCallback() as a second class in my python > file, like so: > > class PyStreamCallback(StreamCallback) : > def __init__(self): > pass > def process(self, inputStream, outputStream) : > outputStream.write(Unicode(json.dumps(self.result['thisThing']))) > > classUpdateAttributes(Processor) : > def __init__(self) : > self.result{} > self.__rel_success = Relationship......etc etc > . > . > . > def onTrigger(self, context, sessionFactory): > . > . > . > self.result['thisThing'] = flowfile.getAttribute("s3.key") > # this fails: flowfile = > session.write(flowfile,PyStreamCallback()) > # this fails too: flowfile = > session.write(self,flowfile,PyStreamCallback()) > > Am I mistaken to configure PyStreamCallback as a second independent class? > Should it be a defined method within class UpdateAttributes() ? > > On Tue, May 2, 2017 at 12:45 PM, Matt Burgess <[email protected] > <mailto:[email protected]>> wrote: > Jim, you still can/should use something like a PyStreamCallback(). > ExecuteScript is basically an onTrigger() body, so you can use the same > approach inside your onTrigger() body in InvokeScriptedProcessor. Pass an > instance of your PyStreamCallback into something like: > > flowfile = session.write(flowfile, PyStreamCallback()) > > at some point before you transfer the flow file. If you need variables/data > from outside the PyStreamCallback() inside, you can pass them into the > constructor or (more dangerously) mess with the scope of the variable(s). > > Regards, > Matt > > > On Tue, May 2, 2017 at 12:39 PM, James McMahon <[email protected] > <mailto:[email protected]>> wrote: > This is an example found at > https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/test_update_attribute.py > > <https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/test_update_attribute.py> > . It shows how to set up InvokeScriptedProcessor for python (jython) to add > a simple attribute to a flowfile. > > How would I integrate in this structure a call to PyStreamCallback() that > allows me to replace the contents of the flowfile? Typically > PyStreamCallback() to accomplish this is its own class, but here it looks > like I would need to make it a method in the UpdateAttributes() class. I've > been unable to get that to work. > > Thank you in advance for any help. > > #! /usr/bin/python > # > # Licensed to the Apache Software Foundation (ASF) under one > # or more contributor license agreements. See the NOTICE file > # distributed with this work for additional information > # regarding copyright ownership. The ASF licenses this file > # to you under the Apache License, Version 2.0 (the > # "License"); you may not use this file except in compliance > # with the License. You may obtain a copy of the License at > # > # http://www.apache.org/licenses/LICENSE-2.0 > <http://www.apache.org/licenses/LICENSE-2.0> > # > # Unless required by applicable law or agreed to in writing, > # software distributed under the License is distributed on an > # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > # KIND, either express or implied. See the License for the > # specific language governing permissions and limitations > # under the License. > # > import sys > import traceback > from org.apache.nifi.processor import Processor > from org.apache.nifi.processor import Relationship > from org.apache.nifi.components import PropertyDescriptor > from org.apache.nifi.processor.util import StandardValidators > class UpdateAttributes(Processor) : > __rel_success = > Relationship.Builder().description("Success").name("success").build() > def __init__(self) : > pass > def initialize(self, context) : > pass > def getRelationships(self) : > return set([self.__rel_success]) > def validate(self, context) : > pass > def getPropertyDescriptors(self) : > descriptor = > PropertyDescriptor.Builder().name("for-attributes").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build() > return [descriptor] > def onPropertyModified(self, descriptor, newValue, oldValue) : > pass > def onTrigger(self, context, sessionFactory) : > session = sessionFactory.createSession() > try : > # ensure there is work to do > flowfile = session.get() > if flowfile is None : > return > # extract some attribute values > fromPropertyValue = context.getProperty("for-attributes").getValue() > fromAttributeValue = flowfile.getAttribute("for-attributes") > # set an attribute > flowfile = session.putAttribute(flowfile, "from-property", fromPropertyValue) > flowfile = session.putAttribute(flowfile, "from-attribute", > fromAttributeValue) > # transfer > session.transfer(flowfile, self.__rel_success) > session.commit() > except : > print sys.exc_info()[0] > print "Exception in TestReader:" > print '-' * 60 > traceback.print_exc(file=sys.stdout) > print '-' * 60 > session.rollback(true) > raise > processor = UpdateAttributes() > >
signature.asc
Description: Message signed with OpenPGP using GPGMail
