I've not yet tried that but will today, and will report back on my results.
If it worked for you I am optimistic I can get it to work on my end too.
Thank you very much Matt. Have a few other workflow improvements to make
for customers this morning, and then will get back to you on this.    -Jim

On Tue, May 9, 2017 at 3:00 PM, Matt Burgess <[email protected]> wrote:

> Jim,
>
> Not sure my previous email went through, did you try self.attrs['ABCDE']
> rather than self.getAttribute('ABCDE') in your PyStreamCallback.process
> method?
>
> I tried the following and it seems to work fine (added a logger for my own
> debugging purposes):
>
> import json
> from org.apache.nifi.processor.io import StreamCallback
> from org.apache.nifi.processor import Processor
> from org.apache.nifi.processor import Relationship
> from org.apache.nifi.components import PropertyDescriptor
> from org.slf4j import Logger
> from org.slf4j import LoggerFactory
>
> class PyStreamCallback(StreamCallback) :
>   def __init__(self, attrs):
>     self.attrs = attrs
>   def process(self,inputStream, outputStream):
>     outputStream.write(unicode(json.dumps(self.attrs['ABCDE'])))
>
> class UpdateAttributes(Processor) :
>   def __init__(self) :
>     self.result = {}
>     self.__rel_success = Relationship.Builder().name("
> success").description("Success").build()
>     self.__logger = LoggerFactory.getLogger(self.getClass())
>
>   def initialize(self, context) :
>     pass
>
>   def getRelationships(self) :
>     return set([self.__rel_success])
>
>   def validate(self, context) :
>     pass
>
>   def getPropertyDescriptors(self) :
>     return []
>
>   def onPropertyModified(self, descriptor, newValue, oldValue) :
>     pass
>
>   def onTrigger(self, context, sessionFactory):
>     session = sessionFactory.createSession()
>     try :
>       flowfiles = session.get(20)
>       for flowfile in flowfiles :
>         if flowfile is None :
>           return
>
>         flowfile = session.putAttribute(flowfile, "ABCDE",
> "LARRY_CURLEY_MOE")
>         flowfile = session.write(flowfile,PyStreamCallback(flowfile.
> getAttributes()))
>         session.transfer(flowfile, self.__rel_success)
>         session.commit()
>     except Exception, e:
>       self.__logger.error(str(e), e)
>       session.rollback()
>       raise
>
> processor = UpdateAttributes()
>
>
> Regards,
> Matt
>
> On Fri, May 5, 2017 at 2:42 PM, James McMahon <[email protected]>
> wrote:
>
>> Matt, here is what I have tried. No error is thrown, but I can't
>> reference attribute ABCD in the callback function. What am I
>> misunderstanding?  -Jim
>>
>> class PyStreamCallback(StreamCallback) :
>>      def __init__(self, attrs):
>>           self.attrs = attrs
>>      def process(self,inputStream, outputStream):
>>           outputStream.write(Unicode(json.dumps(self.getAttribute('ABC
>> DE')))
>>
>> class UpdateAttributes(Processor) :
>>
>>      def __init__(self) :
>>           self.result = {}
>>           self.__rel_success = Relationship.Builder().name("s
>> uccess".description("Success").build()
>>
>>      ( I def an initialize, getRelationships, validate,
>> getPropertyDescriptor, getPropertyDescriptors, and onPropertyModified)
>>
>>      def onTrigger(self, context, sessionFactory):
>>           session = sessionFactory.createSession()
>>           try :
>>                flowfiles = session.get(20)
>>                     for flowfile in flowfiles :
>>                          if flowfile is none :
>>                               return
>>
>>                          flowfile = session.putAttribute(flowfile,
>> "ABCDE", "LARRY_CURLEY_MOE")
>>
>>                          flowfile = session.write(flowfile,PyStrea
>> mCallback(PySet(flowfile.getAttributes())))
>>
>>                          session.transfer(flowfile, self.__rel_success)
>>                  session.commit()
>>           except:
>>                 session.rollback()
>>                 raise
>>
>> processor = UpdateAttributes()
>>
>> On Tue, May 2, 2017 at 1:26 PM, Matt Burgess <[email protected]>
>> wrote:
>>
>>> If you want to use attributes inside the callback, I recommend building
>>> a dictionary from flowfile.getAttributes() and passing that into the
>>> PyStreamCallback constructor:
>>>
>>> class PyStreamCallback(StreamCallback):
>>>     def __init__(self, attrs):
>>>         self.attrs = attrs
>>>
>>>
>>> # ... later on ...
>>> session.write(flowfile, PyStreamCallback(PySet(flowfil
>>> e.getAttributes())))
>>>
>>>  Or something like that.  I thought I'd seen an example somewhere but I
>>> can't find it.
>>>
>>> Regards,
>>> Matt
>>>
>>> On Tue, May 2, 2017 at 1:06 PM, Andy LoPresto <[email protected]>
>>> wrote:
>>>
>>>> I am not a Python expert, but if you set “self.result[“x”]” in one
>>>> class, can you reference it in a separate class? What is the exception you
>>>> are getting?
>>>>
>>>> Andy LoPresto
>>>> [email protected]
>>>> *[email protected] <[email protected]>*
>>>> PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69
>>>>
>>>> On May 2, 2017, at 1:01 PM, James McMahon <[email protected]> wrote:
>>>>
>>>> Thanks for your reply Matt. I think this cuts to the heart of my
>>>> failure here. I have tried treating PyStreamCallback() as a second class in
>>>> my python file, like so:
>>>>
>>>> class PyStreamCallback(StreamCallback) :
>>>>      def __init__(self):
>>>>           pass
>>>>      def process(self, inputStream, outputStream) :
>>>>           outputStream.write(Unicode(json.dumps(self.result['thisThing
>>>> '])))
>>>>
>>>> classUpdateAttributes(Processor) :
>>>>      def __init__(self) :
>>>>           self.result{}
>>>>           self.__rel_success = Relationship......etc etc
>>>> .
>>>> .
>>>> .
>>>>      def onTrigger(self, context, sessionFactory):
>>>> .
>>>> .
>>>> .
>>>>           self.result['thisThing'] = flowfile.getAttribute("s3.key")
>>>>           # this fails:    flowfile = session.write(flowfile,PyStrea
>>>> mCallback())
>>>>           # this fails too:  flowfile = session.write(self,flowfile,Py
>>>> StreamCallback())
>>>>
>>>> Am I mistaken to configure PyStreamCallback as a second
>>>> independent class? Should it be a defined method within class
>>>> UpdateAttributes() ?
>>>>
>>>> On Tue, May 2, 2017 at 12:45 PM, Matt Burgess <[email protected]>
>>>> wrote:
>>>>
>>>>> Jim, you still can/should use something like a PyStreamCallback().
>>>>> ExecuteScript is basically an onTrigger() body, so you can use the same
>>>>> approach inside your onTrigger() body in InvokeScriptedProcessor.  Pass an
>>>>> instance of your PyStreamCallback into something like:
>>>>>
>>>>> flowfile = session.write(flowfile, PyStreamCallback())
>>>>>
>>>>> at some point before you transfer the flow file. If you need
>>>>> variables/data from outside the PyStreamCallback() inside, you can pass
>>>>> them into the constructor or (more dangerously) mess with the scope of the
>>>>> variable(s).
>>>>>
>>>>> Regards,
>>>>> Matt
>>>>>
>>>>>
>>>>> On Tue, May 2, 2017 at 12:39 PM, James McMahon <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> This is an example found at  https://github.com/apache/
>>>>>> nifi/blob/master/nifi-nar-bundles/nifi-scripting-bundle/nifi
>>>>>> -scripting-processors/src/test/resources/jython/test_update_
>>>>>> attribute.py  . It shows how to set up InvokeScriptedProcessor for
>>>>>> python (jython) to add a simple attribute to a flowfile.
>>>>>>
>>>>>> How would I integrate in this structure a call to PyStreamCallback()
>>>>>> that allows me to replace the *contents* of the flowfile? Typically
>>>>>> PyStreamCallback() to accomplish this is its own class, but here it looks
>>>>>> like I would need to make it a method in the UpdateAttributes() class. 
>>>>>> I've
>>>>>> been unable to get that to work.
>>>>>>
>>>>>> Thank you in advance for any help.
>>>>>>
>>>>>> #! /usr/bin/python
>>>>>> #
>>>>>> # Licensed to the Apache Software Foundation (ASF) under one
>>>>>> # or more contributor license agreements. See the NOTICE file
>>>>>> # distributed with this work for additional information
>>>>>> # regarding copyright ownership. The ASF licenses this file
>>>>>> # to you under the Apache License, Version 2.0 (the
>>>>>> # "License"); you may not use this file except in compliance
>>>>>> # with the License. You may obtain a copy of the License at
>>>>>> #
>>>>>> # http://www.apache.org/licenses/LICENSE-2.0
>>>>>> #
>>>>>> # Unless required by applicable law or agreed to in writing,
>>>>>> # software distributed under the License is distributed on an
>>>>>> # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>>>>>> # KIND, either express or implied. See the License for the
>>>>>> # specific language governing permissions and limitations
>>>>>> # under the License.
>>>>>> #
>>>>>> import sys
>>>>>> import traceback
>>>>>> from org.apache.nifi.processor import Processor
>>>>>> from org.apache.nifi.processor import Relationship
>>>>>> from org.apache.nifi.components import PropertyDescriptor
>>>>>> from org.apache.nifi.processor.util import StandardValidators
>>>>>> class UpdateAttributes(Processor) :
>>>>>> __rel_success = Relationship.Builder().description("Success").name("
>>>>>> success").build()
>>>>>> def __init__(self) :
>>>>>> pass
>>>>>> def initialize(self, context) :
>>>>>> pass
>>>>>> def getRelationships(self) :
>>>>>> return set([self.__rel_success])
>>>>>> def validate(self, context) :
>>>>>> pass
>>>>>> def getPropertyDescriptors(self) :
>>>>>> descriptor = PropertyDescriptor.Builder().name("for-attributes"
>>>>>> ).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build()
>>>>>> return [descriptor]
>>>>>> def onPropertyModified(self, descriptor, newValue, oldValue) :
>>>>>> pass
>>>>>> def onTrigger(self, context, sessionFactory) :
>>>>>> session = sessionFactory.createSession()
>>>>>> try :
>>>>>> # ensure there is work to do
>>>>>> flowfile = session.get()
>>>>>> if flowfile is None :
>>>>>> return
>>>>>> # extract some attribute values
>>>>>> fromPropertyValue = context.getProperty("for-attributes").getValue()
>>>>>> fromAttributeValue = flowfile.getAttribute("for-attributes")
>>>>>> # set an attribute
>>>>>> flowfile = session.putAttribute(flowfile, "from-property",
>>>>>> fromPropertyValue)
>>>>>> flowfile = session.putAttribute(flowfile, "from-attribute",
>>>>>> fromAttributeValue)
>>>>>> # transfer
>>>>>> session.transfer(flowfile, self.__rel_success)
>>>>>> session.commit()
>>>>>> except :
>>>>>> print sys.exc_info()[0]
>>>>>> print "Exception in TestReader:"
>>>>>> print '-' * 60
>>>>>> traceback.print_exc(file=sys.stdout)
>>>>>> print '-' * 60
>>>>>> session.rollback(true)
>>>>>> raise
>>>>>> processor = UpdateAttributes()
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to