James,

We don't have any InvokeScriptedProcessor processors anywhere on our NiFi instance. Everything we do is using ExecuteScript (mainly because I found that grabbing multiple FlowFile's during a single pass yields so much better performance).

Sorry, and best of luck.

- Scott

James McMahon <mailto:[email protected]>
Tuesday, May 9, 2017 9:01 AM
Scott, you don't seem to use the onTrigger() method in a processor class of an InvokeScriptedProcessor (ISP) script. Am I correct in assuming that you are doing this in an ExecuteScript processor? Have you incorporated a Python Stream Callback as a second class in a python script that already has a processor class for ISP, in which the onTrigger() method for the ISP processor class calls the Python Stream Callback class? That is what I cannot get to work. -Jim


Scott Wagner <mailto:[email protected]>
Monday, May 8, 2017 10:09 AM
James,

Please find below a script that takes all of the attribute from a FlowFile, and then iterates the content of the input flowfile, and calls python's format() method on the entire content and replaces it with the output. The use case for this is I can make a FlowFile with content that contains references to attribute names in the content, and this processor will replace those strings with the actual attributes themselves using python's native dictionary formatting methods:

import sys
import traceback
from java.nio.charset import StandardCharsets
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import StreamCallback
from org.python.core import PyFile
from org.python.core.util import FileUtil, StringUtil

class TransformCallback(StreamCallback):
    def __init__(self, flowFile):
        self.flowFile = flowFile

    def process(self, inputStream, outputStream):
        try:
            attrs = self.flowFile.getAttributes()
            pf = FileUtil().wrap(inputStream)
            output = []
            for line in pf.readlines():
                line = line.format(**attrs).rstrip('\n')
                output.append(line)
            outputStream.write('\n'.join(output))
        except:
            traceback.print_exc(file=sys.stdout)
            raise


flowFiles = session.get(10)
for flowFile in flowFiles:
    if flowFile is None:
        continue
    flowFile = session.write(flowFile, TransformCallback(flowFile))
    session.transfer(flowFile, REL_SUCCESS)

My real-world usage of this is to have a file on disk that contains a set of SQL queries that will need to be executed and reference the attribute names in that flat file, then run it through this processor to do the string replacements for me. Here's a sample of what the input would look like:

DROP TABLE IF EXISTS {output_table_name}
;
CREATE TABLE {output_table_name} AS
  SELECT
    am.some_id AS {col_some_id},
    am.some_other_id AS {col_some_other_id},
    input.*
  FROM
    {table_name} input
    LEFT OUTER JOIN some_other_table sot
ON sot.data_hash = f_hash_data(substring(input.{col_data_element_1}, 1, 70), substring(input.{col_data_element_2}, 1, 32), substring(input.{col_data_element_3}, 1, 2), input.{col_data_element_4}, input.{col_data_element_5})

When a FlowFile is created, I just make sure that the attributes that I need are populated correctly and then the SQL script will have the references to the correct column names in it.

Hope that helps,
Scott

James McMahon <mailto:[email protected]>
Friday, May 5, 2017 1:42 PM
Matt, here is what I have tried. No error is thrown, but I can't reference attribute ABCD in the callback function. What am I misunderstanding? -Jim

class PyStreamCallback(StreamCallback) :
     def __init__(self, attrs):
          self.attrs = attrs
     def process(self,inputStream, outputStream):
outputStream.write(Unicode(json.dumps(self.getAttribute('ABCDE')))

class UpdateAttributes(Processor) :

     def __init__(self) :
          self.result = {}
self.__rel_success = Relationship.Builder().name("success".description("Success").build()

( I def an initialize, getRelationships, validate, getPropertyDescriptor, getPropertyDescriptors, and onPropertyModified)

     def onTrigger(self, context, sessionFactory):
          session = sessionFactory.createSession()
          try :
               flowfiles = session.get(20)
                    for flowfile in flowfiles :
                         if flowfile is none :
                              return

flowfile = session.putAttribute(flowfile, "ABCDE", "LARRY_CURLEY_MOE")

flowfile = session.write(flowfile,PyStreamCallback(PySet(flowfile.getAttributes())))

                         session.transfer(flowfile, self.__rel_success)
                 session.commit()
          except:
                session.rollback()
                raise

processor = UpdateAttributes()


Matt Burgess <mailto:[email protected]>
Tuesday, May 2, 2017 12:26 PM
If you want to use attributes inside the callback, I recommend building a dictionary from flowfile.getAttributes() and passing that into the PyStreamCallback constructor:

class PyStreamCallback(StreamCallback):
    def __init__(self, attrs):
        self.attrs = attrs


# ... later on ...
session.write(flowfile, PyStreamCallback(PySet(flowfile.getAttributes())))

Or something like that. I thought I'd seen an example somewhere but I can't find it.

Regards,
Matt


Andy LoPresto <mailto:[email protected]>
Tuesday, May 2, 2017 12:06 PM
I am not a Python expert, but if you set “self.result[“x”]” in one class, can you reference it in a separate class? What is the exception you are getting?

Andy LoPresto
[email protected] <mailto:[email protected]>
/[email protected] <mailto:[email protected]>/
PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69



Reply via email to