Scott, you don't seem to use the onTrigger() method in a processor class of an InvokeScriptedProcessor (ISP) script. Am I correct in assuming that you are doing this in an ExecuteScript processor? Have you incorporated a Python Stream Callback as a second class in a python script that already has a processor class for ISP, in which the onTrigger() method for the ISP processor class calls the Python Stream Callback class? That is what I cannot get to work. -Jim
On Mon, May 8, 2017 at 11:09 AM, Scott Wagner <[email protected]> wrote: > James, > > Please find below a script that takes all of the attribute from a > FlowFile, and then iterates the content of the input flowfile, and calls > python's format() method on the entire content and replaces it with the > output. The use case for this is I can make a FlowFile with content that > contains references to attribute names in the content, and this processor > will replace those strings with the actual attributes themselves using > python's native dictionary formatting methods: > > import sys > import traceback > from java.nio.charset import StandardCharsets > from org.apache.commons.io import IOUtils > from org.apache.nifi.processor.io import StreamCallback > from org.python.core import PyFile > from org.python.core.util import FileUtil, StringUtil > > class TransformCallback(StreamCallback): > def __init__(self, flowFile): > self.flowFile = flowFile > > def process(self, inputStream, outputStream): > try: > attrs = self.flowFile.getAttributes() > pf = FileUtil().wrap(inputStream) > output = [] > for line in pf.readlines(): > line = line.format(**attrs).rstrip('\n') > output.append(line) > outputStream.write('\n'.join(output)) > except: > traceback.print_exc(file=sys.stdout) > raise > > > flowFiles = session.get(10) > for flowFile in flowFiles: > if flowFile is None: > continue > flowFile = session.write(flowFile, TransformCallback(flowFile)) > session.transfer(flowFile, REL_SUCCESS) > > My real-world usage of this is to have a file on disk that contains a set > of SQL queries that will need to be executed and reference the attribute > names in that flat file, then run it through this processor to do the > string replacements for me. Here's a sample of what the input would look > like: > > DROP TABLE IF EXISTS {output_table_name} > ; > CREATE TABLE {output_table_name} AS > SELECT > am.some_id AS {col_some_id}, > am.some_other_id AS {col_some_other_id}, > input.* > FROM > {table_name} input > LEFT OUTER JOIN some_other_table sot > ON sot.data_hash = f_hash_data(substring(input.{col_data_element_1}, > 1, 70), substring(input.{col_data_element_2}, 1, 32), > substring(input.{col_data_element_3}, 1, 2), input.{col_data_element_4}, > input.{col_data_element_5}) > > When a FlowFile is created, I just make sure that the attributes that I > need are populated correctly and then the SQL script will have the > references to the correct column names in it. > > Hope that helps, > Scott > > James McMahon <[email protected]> > Friday, May 5, 2017 1:42 PM > Matt, here is what I have tried. No error is thrown, but I can't reference > attribute ABCD in the callback function. What am I misunderstanding? -Jim > > class PyStreamCallback(StreamCallback) : > def __init__(self, attrs): > self.attrs = attrs > def process(self,inputStream, outputStream): > outputStream.write(Unicode(json.dumps(self.getAttribute('ABC > DE'))) > > class UpdateAttributes(Processor) : > > def __init__(self) : > self.result = {} > self.__rel_success = Relationship.Builder().name("s > uccess".description("Success").build() > > ( I def an initialize, getRelationships, validate, > getPropertyDescriptor, getPropertyDescriptors, and onPropertyModified) > > def onTrigger(self, context, sessionFactory): > session = sessionFactory.createSession() > try : > flowfiles = session.get(20) > for flowfile in flowfiles : > if flowfile is none : > return > > flowfile = session.putAttribute(flowfile, > "ABCDE", "LARRY_CURLEY_MOE") > > flowfile = session.write(flowfile,PyStrea > mCallback(PySet(flowfile.getAttributes()))) > > session.transfer(flowfile, self.__rel_success) > session.commit() > except: > session.rollback() > raise > > processor = UpdateAttributes() > > > Matt Burgess <[email protected]> > Tuesday, May 2, 2017 12:26 PM > If you want to use attributes inside the callback, I recommend building a > dictionary from flowfile.getAttributes() and passing that into the > PyStreamCallback constructor: > > class PyStreamCallback(StreamCallback): > def __init__(self, attrs): > self.attrs = attrs > > > # ... later on ... > session.write(flowfile, PyStreamCallback(PySet(flowfile.getAttributes()))) > > Or something like that. I thought I'd seen an example somewhere but I > can't find it. > > Regards, > Matt > > > Andy LoPresto <[email protected]> > Tuesday, May 2, 2017 12:06 PM > I am not a Python expert, but if you set “self.result[“x”]” in one class, > can you reference it in a separate class? What is the exception you are > getting? > > Andy LoPresto > [email protected] > *[email protected] <[email protected]>* > PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4 BACE 3C6E F65B 2F7D EF69 > > > James McMahon <[email protected]> > Tuesday, May 2, 2017 12:01 PM > Thanks for your reply Matt. I think this cuts to the heart of my failure > here. I have tried treating PyStreamCallback() as a second class in my > python file, like so: > > class PyStreamCallback(StreamCallback) : > def __init__(self): > pass > def process(self, inputStream, outputStream) : > outputStream.write(Unicode(json.dumps(self.result[' > thisThing']))) > > classUpdateAttributes(Processor) : > def __init__(self) : > self.result{} > self.__rel_success = Relationship......etc etc > . > . > . > def onTrigger(self, context, sessionFactory): > . > . > . > self.result['thisThing'] = flowfile.getAttribute("s3.key") > # this fails: flowfile = session.write(flowfile, > PyStreamCallback()) > # this fails too: flowfile = session.write(self,flowfile, > PyStreamCallback()) > > Am I mistaken to configure PyStreamCallback as a second independent class? > Should it be a defined method within class UpdateAttributes() ? > > > Matt Burgess <[email protected]> > Tuesday, May 2, 2017 11:45 AM > Jim, you still can/should use something like a PyStreamCallback(). > ExecuteScript is basically an onTrigger() body, so you can use the same > approach inside your onTrigger() body in InvokeScriptedProcessor. Pass an > instance of your PyStreamCallback into something like: > > flowfile = session.write(flowfile, PyStreamCallback()) > > at some point before you transfer the flow file. If you need > variables/data from outside the PyStreamCallback() inside, you can pass > them into the constructor or (more dangerously) mess with the scope of the > variable(s). > > Regards, > Matt > > > > >
