Thanks for your reply Matt. I think this cuts to the heart of my failure
here. I have tried treating PyStreamCallback() as a second class in my
python file, like so:

class PyStreamCallback(StreamCallback) :
     def __init__(self):
          pass
     def process(self, inputStream, outputStream) :
          outputStream.write(Unicode(json.dumps(self.result['thisThing'])))

classUpdateAttributes(Processor) :
     def __init__(self) :
          self.result{}
          self.__rel_success = Relationship......etc etc
.
.
.
     def onTrigger(self, context, sessionFactory):
.
.
.
          self.result['thisThing'] = flowfile.getAttribute("s3.key")
          # this fails:    flowfile =
session.write(flowfile,PyStreamCallback())
          # this fails too:  flowfile =
session.write(self,flowfile,PyStreamCallback())

Am I mistaken to configure PyStreamCallback as a second independent class?
Should it be a defined method within class UpdateAttributes() ?

On Tue, May 2, 2017 at 12:45 PM, Matt Burgess <[email protected]> wrote:

> Jim, you still can/should use something like a PyStreamCallback().
> ExecuteScript is basically an onTrigger() body, so you can use the same
> approach inside your onTrigger() body in InvokeScriptedProcessor.  Pass an
> instance of your PyStreamCallback into something like:
>
> flowfile = session.write(flowfile, PyStreamCallback())
>
> at some point before you transfer the flow file. If you need
> variables/data from outside the PyStreamCallback() inside, you can pass
> them into the constructor or (more dangerously) mess with the scope of the
> variable(s).
>
> Regards,
> Matt
>
>
> On Tue, May 2, 2017 at 12:39 PM, James McMahon <[email protected]>
> wrote:
>
>> This is an example found at  https://github.com/apache/
>> nifi/blob/master/nifi-nar-bundles/nifi-scripting-bundle/nifi
>> -scripting-processors/src/test/resources/jython/test_update_attribute.py
>> . It shows how to set up InvokeScriptedProcessor for python (jython) to add
>> a simple attribute to a flowfile.
>>
>> How would I integrate in this structure a call to PyStreamCallback() that
>> allows me to replace the *contents* of the flowfile? Typically
>> PyStreamCallback() to accomplish this is its own class, but here it looks
>> like I would need to make it a method in the UpdateAttributes() class. I've
>> been unable to get that to work.
>>
>> Thank you in advance for any help.
>>
>> #! /usr/bin/python
>> #
>> # Licensed to the Apache Software Foundation (ASF) under one
>> # or more contributor license agreements. See the NOTICE file
>> # distributed with this work for additional information
>> # regarding copyright ownership. The ASF licenses this file
>> # to you under the Apache License, Version 2.0 (the
>> # "License"); you may not use this file except in compliance
>> # with the License. You may obtain a copy of the License at
>> #
>> # http://www.apache.org/licenses/LICENSE-2.0
>> #
>> # Unless required by applicable law or agreed to in writing,
>> # software distributed under the License is distributed on an
>> # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>> # KIND, either express or implied. See the License for the
>> # specific language governing permissions and limitations
>> # under the License.
>> #
>> import sys
>> import traceback
>> from org.apache.nifi.processor import Processor
>> from org.apache.nifi.processor import Relationship
>> from org.apache.nifi.components import PropertyDescriptor
>> from org.apache.nifi.processor.util import StandardValidators
>> class UpdateAttributes(Processor) :
>> __rel_success = Relationship.Builder().description("Success").name("
>> success").build()
>> def __init__(self) :
>> pass
>> def initialize(self, context) :
>> pass
>> def getRelationships(self) :
>> return set([self.__rel_success])
>> def validate(self, context) :
>> pass
>> def getPropertyDescriptors(self) :
>> descriptor = PropertyDescriptor.Builder().name("for-attributes").addValid
>> ator(StandardValidators.NON_EMPTY_VALIDATOR).build()
>> return [descriptor]
>> def onPropertyModified(self, descriptor, newValue, oldValue) :
>> pass
>> def onTrigger(self, context, sessionFactory) :
>> session = sessionFactory.createSession()
>> try :
>> # ensure there is work to do
>> flowfile = session.get()
>> if flowfile is None :
>> return
>> # extract some attribute values
>> fromPropertyValue = context.getProperty("for-attributes").getValue()
>> fromAttributeValue = flowfile.getAttribute("for-attributes")
>> # set an attribute
>> flowfile = session.putAttribute(flowfile, "from-property",
>> fromPropertyValue)
>> flowfile = session.putAttribute(flowfile, "from-attribute",
>> fromAttributeValue)
>> # transfer
>> session.transfer(flowfile, self.__rel_success)
>> session.commit()
>> except :
>> print sys.exc_info()[0]
>> print "Exception in TestReader:"
>> print '-' * 60
>> traceback.print_exc(file=sys.stdout)
>> print '-' * 60
>> session.rollback(true)
>> raise
>> processor = UpdateAttributes()
>>
>
>

Reply via email to