I am not a Python expert, but if you set “self.result[“x”]” in one class, can 
you reference it in a separate class? What is the exception you are getting?

Andy LoPresto
[email protected]
[email protected]
PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69

> On May 2, 2017, at 1:01 PM, James McMahon <[email protected]> wrote:
> 
> Thanks for your reply Matt. I think this cuts to the heart of my failure 
> here. I have tried treating PyStreamCallback() as a second class in my python 
> file, like so:
> 
> class PyStreamCallback(StreamCallback) :
>      def __init__(self):
>           pass
>      def process(self, inputStream, outputStream) :
>           outputStream.write(Unicode(json.dumps(self.result['thisThing'])))
> 
> classUpdateAttributes(Processor) :
>      def __init__(self) :
>           self.result{}
>           self.__rel_success = Relationship......etc etc
> .
> .
> .
>      def onTrigger(self, context, sessionFactory):
> .
> .
> .
>           self.result['thisThing'] = flowfile.getAttribute("s3.key")
>           # this fails:    flowfile = 
> session.write(flowfile,PyStreamCallback())
>           # this fails too:  flowfile = 
> session.write(self,flowfile,PyStreamCallback())
> 
> Am I mistaken to configure PyStreamCallback as a second independent class? 
> Should it be a defined method within class UpdateAttributes() ?
> 
> On Tue, May 2, 2017 at 12:45 PM, Matt Burgess <[email protected] 
> <mailto:[email protected]>> wrote:
> Jim, you still can/should use something like a PyStreamCallback().  
> ExecuteScript is basically an onTrigger() body, so you can use the same 
> approach inside your onTrigger() body in InvokeScriptedProcessor.  Pass an 
> instance of your PyStreamCallback into something like:
> 
> flowfile = session.write(flowfile, PyStreamCallback())
> 
> at some point before you transfer the flow file. If you need variables/data 
> from outside the PyStreamCallback() inside, you can pass them into the 
> constructor or (more dangerously) mess with the scope of the variable(s).
> 
> Regards,
> Matt
> 
> 
> On Tue, May 2, 2017 at 12:39 PM, James McMahon <[email protected] 
> <mailto:[email protected]>> wrote:
> This is an example found at  
> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/test_update_attribute.py
>  
> <https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/test_update_attribute.py>
>   . It shows how to set up InvokeScriptedProcessor for python (jython) to add 
> a simple attribute to a flowfile.
> 
> How would I integrate in this structure a call to PyStreamCallback() that 
> allows me to replace the contents of the flowfile? Typically 
> PyStreamCallback() to accomplish this is its own class, but here it looks 
> like I would need to make it a method in the UpdateAttributes() class. I've 
> been unable to get that to work.
> 
> Thank you in advance for any help.
> 
> #! /usr/bin/python
> #
> # Licensed to the Apache Software Foundation (ASF) under one
> # or more contributor license agreements. See the NOTICE file
> # distributed with this work for additional information
> # regarding copyright ownership. The ASF licenses this file
> # to you under the Apache License, Version 2.0 (the
> # "License"); you may not use this file except in compliance
> # with the License. You may obtain a copy of the License at
> #
> # http://www.apache.org/licenses/LICENSE-2.0 
> <http://www.apache.org/licenses/LICENSE-2.0>
> #
> # Unless required by applicable law or agreed to in writing,
> # software distributed under the License is distributed on an
> # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> # KIND, either express or implied. See the License for the
> # specific language governing permissions and limitations
> # under the License.
> #
> import sys
> import traceback
> from org.apache.nifi.processor import Processor
> from org.apache.nifi.processor import Relationship
> from org.apache.nifi.components import PropertyDescriptor
> from org.apache.nifi.processor.util import StandardValidators
> class UpdateAttributes(Processor) :
> __rel_success = 
> Relationship.Builder().description("Success").name("success").build()
> def __init__(self) :
> pass
> def initialize(self, context) :
> pass
> def getRelationships(self) :
> return set([self.__rel_success])
> def validate(self, context) :
> pass
> def getPropertyDescriptors(self) :
> descriptor = 
> PropertyDescriptor.Builder().name("for-attributes").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build()
> return [descriptor]
> def onPropertyModified(self, descriptor, newValue, oldValue) :
> pass
> def onTrigger(self, context, sessionFactory) :
> session = sessionFactory.createSession()
> try :
> # ensure there is work to do
> flowfile = session.get()
> if flowfile is None :
> return
> # extract some attribute values
> fromPropertyValue = context.getProperty("for-attributes").getValue()
> fromAttributeValue = flowfile.getAttribute("for-attributes")
> # set an attribute
> flowfile = session.putAttribute(flowfile, "from-property", fromPropertyValue)
> flowfile = session.putAttribute(flowfile, "from-attribute", 
> fromAttributeValue)
> # transfer
> session.transfer(flowfile, self.__rel_success)
> session.commit()
> except :
> print sys.exc_info()[0]
> print "Exception in TestReader:"
> print '-' * 60
> traceback.print_exc(file=sys.stdout)
> print '-' * 60
> session.rollback(true)
> raise
> processor = UpdateAttributes()
> 
> 

Attachment: signature.asc
Description: Message signed with OpenPGP using GPGMail

Reply via email to