Jim, you still can/should use something like a PyStreamCallback().
ExecuteScript is basically an onTrigger() body, so you can use the same
approach inside your onTrigger() body in InvokeScriptedProcessor.  Pass an
instance of your PyStreamCallback into something like:

flowfile = session.write(flowfile, PyStreamCallback())

at some point before you transfer the flow file. If you need variables/data
from outside the PyStreamCallback() inside, you can pass them into the
constructor or (more dangerously) mess with the scope of the variable(s).

Regards,
Matt


On Tue, May 2, 2017 at 12:39 PM, James McMahon <[email protected]> wrote:

> This is an example found at  https://github.com/apache/
> nifi/blob/master/nifi-nar-bundles/nifi-scripting-bundle/
> nifi-scripting-processors/src/test/resources/jython/test_
> update_attribute.py  . It shows how to set up InvokeScriptedProcessor for
> python (jython) to add a simple attribute to a flowfile.
>
> How would I integrate in this structure a call to PyStreamCallback() that
> allows me to replace the *contents* of the flowfile? Typically
> PyStreamCallback() to accomplish this is its own class, but here it looks
> like I would need to make it a method in the UpdateAttributes() class. I've
> been unable to get that to work.
>
> Thank you in advance for any help.
>
> #! /usr/bin/python
> #
> # Licensed to the Apache Software Foundation (ASF) under one
> # or more contributor license agreements. See the NOTICE file
> # distributed with this work for additional information
> # regarding copyright ownership. The ASF licenses this file
> # to you under the Apache License, Version 2.0 (the
> # "License"); you may not use this file except in compliance
> # with the License. You may obtain a copy of the License at
> #
> # http://www.apache.org/licenses/LICENSE-2.0
> #
> # Unless required by applicable law or agreed to in writing,
> # software distributed under the License is distributed on an
> # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> # KIND, either express or implied. See the License for the
> # specific language governing permissions and limitations
> # under the License.
> #
> import sys
> import traceback
> from org.apache.nifi.processor import Processor
> from org.apache.nifi.processor import Relationship
> from org.apache.nifi.components import PropertyDescriptor
> from org.apache.nifi.processor.util import StandardValidators
> class UpdateAttributes(Processor) :
> __rel_success = Relationship.Builder().description("Success").name("s
> uccess").build()
> def __init__(self) :
> pass
> def initialize(self, context) :
> pass
> def getRelationships(self) :
> return set([self.__rel_success])
> def validate(self, context) :
> pass
> def getPropertyDescriptors(self) :
> descriptor = PropertyDescriptor.Builder().name("for-attributes").
> addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build()
> return [descriptor]
> def onPropertyModified(self, descriptor, newValue, oldValue) :
> pass
> def onTrigger(self, context, sessionFactory) :
> session = sessionFactory.createSession()
> try :
> # ensure there is work to do
> flowfile = session.get()
> if flowfile is None :
> return
> # extract some attribute values
> fromPropertyValue = context.getProperty("for-attributes").getValue()
> fromAttributeValue = flowfile.getAttribute("for-attributes")
> # set an attribute
> flowfile = session.putAttribute(flowfile, "from-property",
> fromPropertyValue)
> flowfile = session.putAttribute(flowfile, "from-attribute",
> fromAttributeValue)
> # transfer
> session.transfer(flowfile, self.__rel_success)
> session.commit()
> except :
> print sys.exc_info()[0]
> print "Exception in TestReader:"
> print '-' * 60
> traceback.print_exc(file=sys.stdout)
> print '-' * 60
> session.rollback(true)
> raise
> processor = UpdateAttributes()
>

Reply via email to