Scott, you don't seem to use the onTrigger() method in a processor class of
an InvokeScriptedProcessor (ISP) script. Am I correct in assuming that you
are doing this in an ExecuteScript processor?
Have you incorporated a Python Stream Callback as a second class in a
python script that already has a processor class for ISP, in which the
onTrigger() method for the ISP processor class calls the Python Stream
Callback class? That is what I cannot get to work.  -Jim

On Mon, May 8, 2017 at 11:09 AM, Scott Wagner <[email protected]>
wrote:

> James,
>
>     Please find below a script that takes all of the attribute from a
> FlowFile, and then iterates the content of the input flowfile, and calls
> python's format() method on the entire content and replaces it with the
> output.  The use case for this is I can make a FlowFile with content that
> contains references to attribute names in the content, and this processor
> will replace those strings with the actual attributes themselves using
> python's native dictionary formatting methods:
>
> import sys
> import traceback
> from java.nio.charset import StandardCharsets
> from org.apache.commons.io import IOUtils
> from org.apache.nifi.processor.io import StreamCallback
> from org.python.core import PyFile
> from org.python.core.util import FileUtil, StringUtil
>
> class TransformCallback(StreamCallback):
>     def __init__(self, flowFile):
>         self.flowFile = flowFile
>
>     def process(self, inputStream, outputStream):
>         try:
>             attrs = self.flowFile.getAttributes()
>             pf = FileUtil().wrap(inputStream)
>             output = []
>             for line in pf.readlines():
>                 line = line.format(**attrs).rstrip('\n')
>                 output.append(line)
>             outputStream.write('\n'.join(output))
>         except:
>             traceback.print_exc(file=sys.stdout)
>             raise
>
>
> flowFiles = session.get(10)
> for flowFile in flowFiles:
>     if flowFile is None:
>         continue
>     flowFile = session.write(flowFile, TransformCallback(flowFile))
>     session.transfer(flowFile, REL_SUCCESS)
>
> My real-world usage of this is to have a file on disk that contains a set
> of SQL queries that will need to be executed and reference the attribute
> names in that flat file, then run it through this processor to do the
> string replacements for me.  Here's a sample of what the input would look
> like:
>
> DROP TABLE IF EXISTS {output_table_name}
> ;
> CREATE TABLE {output_table_name} AS
>   SELECT
>     am.some_id AS {col_some_id},
>     am.some_other_id AS {col_some_other_id},
>     input.*
>   FROM
>     {table_name} input
>     LEFT OUTER JOIN some_other_table sot
>       ON sot.data_hash = f_hash_data(substring(input.{col_data_element_1},
> 1, 70), substring(input.{col_data_element_2}, 1, 32),
> substring(input.{col_data_element_3}, 1, 2), input.{col_data_element_4},
> input.{col_data_element_5})
>
> When a FlowFile is created, I just make sure that the attributes that I
> need are populated correctly and then the SQL script will have the
> references to the correct column names in it.
>
> Hope that helps,
> Scott
>
> James McMahon <[email protected]>
> Friday, May 5, 2017 1:42 PM
> Matt, here is what I have tried. No error is thrown, but I can't reference
> attribute ABCD in the callback function. What am I misunderstanding?  -Jim
>
> class PyStreamCallback(StreamCallback) :
>      def __init__(self, attrs):
>           self.attrs = attrs
>      def process(self,inputStream, outputStream):
>           outputStream.write(Unicode(json.dumps(self.getAttribute('ABC
> DE')))
>
> class UpdateAttributes(Processor) :
>
>      def __init__(self) :
>           self.result = {}
>           self.__rel_success = Relationship.Builder().name("s
> uccess".description("Success").build()
>
>      ( I def an initialize, getRelationships, validate,
> getPropertyDescriptor, getPropertyDescriptors, and onPropertyModified)
>
>      def onTrigger(self, context, sessionFactory):
>           session = sessionFactory.createSession()
>           try :
>                flowfiles = session.get(20)
>                     for flowfile in flowfiles :
>                          if flowfile is none :
>                               return
>
>                          flowfile = session.putAttribute(flowfile,
> "ABCDE", "LARRY_CURLEY_MOE")
>
>                          flowfile = session.write(flowfile,PyStrea
> mCallback(PySet(flowfile.getAttributes())))
>
>                          session.transfer(flowfile, self.__rel_success)
>                  session.commit()
>           except:
>                 session.rollback()
>                 raise
>
> processor = UpdateAttributes()
>
>
> Matt Burgess <[email protected]>
> Tuesday, May 2, 2017 12:26 PM
> If you want to use attributes inside the callback, I recommend building a
> dictionary from flowfile.getAttributes() and passing that into the
> PyStreamCallback constructor:
>
> class PyStreamCallback(StreamCallback):
>     def __init__(self, attrs):
>         self.attrs = attrs
>
>
> # ... later on ...
> session.write(flowfile, PyStreamCallback(PySet(flowfile.getAttributes())))
>
>  Or something like that.  I thought I'd seen an example somewhere but I
> can't find it.
>
> Regards,
> Matt
>
>
> Andy LoPresto <[email protected]>
> Tuesday, May 2, 2017 12:06 PM
> I am not a Python expert, but if you set “self.result[“x”]” in one class,
> can you reference it in a separate class? What is the exception you are
> getting?
>
> Andy LoPresto
> [email protected]
> *[email protected] <[email protected]>*
> PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69
>
>
> James McMahon <[email protected]>
> Tuesday, May 2, 2017 12:01 PM
> Thanks for your reply Matt. I think this cuts to the heart of my failure
> here. I have tried treating PyStreamCallback() as a second class in my
> python file, like so:
>
> class PyStreamCallback(StreamCallback) :
>      def __init__(self):
>           pass
>      def process(self, inputStream, outputStream) :
>           outputStream.write(Unicode(json.dumps(self.result['
> thisThing'])))
>
> classUpdateAttributes(Processor) :
>      def __init__(self) :
>           self.result{}
>           self.__rel_success = Relationship......etc etc
> .
> .
> .
>      def onTrigger(self, context, sessionFactory):
> .
> .
> .
>           self.result['thisThing'] = flowfile.getAttribute("s3.key")
>           # this fails:    flowfile = session.write(flowfile,
> PyStreamCallback())
>           # this fails too:  flowfile = session.write(self,flowfile,
> PyStreamCallback())
>
> Am I mistaken to configure PyStreamCallback as a second independent class?
> Should it be a defined method within class UpdateAttributes() ?
>
>
> Matt Burgess <[email protected]>
> Tuesday, May 2, 2017 11:45 AM
> Jim, you still can/should use something like a PyStreamCallback().
> ExecuteScript is basically an onTrigger() body, so you can use the same
> approach inside your onTrigger() body in InvokeScriptedProcessor.  Pass an
> instance of your PyStreamCallback into something like:
>
> flowfile = session.write(flowfile, PyStreamCallback())
>
> at some point before you transfer the flow file. If you need
> variables/data from outside the PyStreamCallback() inside, you can pass
> them into the constructor or (more dangerously) mess with the scope of the
> variable(s).
>
> Regards,
> Matt
>
>
>
>
>

Reply via email to