Thank you very much Scott. I think this will help. In particular, the way you access the flowfile attributes within the process() method using
attrs = self.flowFile.getAttributes() This is where I have not quite hit the nail on the head. I will revisit my InvokeScriptedProcessor script and try to integrate this call this morning. Yours is a very interesting use case I've not yet considered but may employ in the future. Mine is also an intriguing use case. After saving the payload content of the incoming flowfile to persistent storage, I wish to post a related message to an AMQP exchange. I don't need or want to post the potentially sizeable data itself. I only need to post a JSON object that includes select high interest metadata such as the endpoint where the data can be retrieved for downstream use. Being able to replace the content payload using PyStreamCallback within an InvokeScriptedProcessor script is one way to do that. Thanks once again. -Jim On Mon, May 8, 2017 at 11:09 AM, Scott Wagner <[email protected]> wrote: > James, > > Please find below a script that takes all of the attribute from a > FlowFile, and then iterates the content of the input flowfile, and calls > python's format() method on the entire content and replaces it with the > output. The use case for this is I can make a FlowFile with content that > contains references to attribute names in the content, and this processor > will replace those strings with the actual attributes themselves using > python's native dictionary formatting methods: > > import sys > import traceback > from java.nio.charset import StandardCharsets > from org.apache.commons.io import IOUtils > from org.apache.nifi.processor.io import StreamCallback > from org.python.core import PyFile > from org.python.core.util import FileUtil, StringUtil > > class TransformCallback(StreamCallback): > def __init__(self, flowFile): > self.flowFile = flowFile > > def process(self, inputStream, outputStream): > try: > attrs = self.flowFile.getAttributes() > pf = FileUtil().wrap(inputStream) > output = [] > for line in pf.readlines(): > line = line.format(**attrs).rstrip('\n') > output.append(line) > outputStream.write('\n'.join(output)) > except: > traceback.print_exc(file=sys.stdout) > raise > > > flowFiles = session.get(10) > for flowFile in flowFiles: > if flowFile is None: > continue > flowFile = session.write(flowFile, TransformCallback(flowFile)) > session.transfer(flowFile, REL_SUCCESS) > > My real-world usage of this is to have a file on disk that contains a set > of SQL queries that will need to be executed and reference the attribute > names in that flat file, then run it through this processor to do the > string replacements for me. Here's a sample of what the input would look > like: > > DROP TABLE IF EXISTS {output_table_name} > ; > CREATE TABLE {output_table_name} AS > SELECT > am.some_id AS {col_some_id}, > am.some_other_id AS {col_some_other_id}, > input.* > FROM > {table_name} input > LEFT OUTER JOIN some_other_table sot > ON sot.data_hash = f_hash_data(substring(input.{col_data_element_1}, > 1, 70), substring(input.{col_data_element_2}, 1, 32), > substring(input.{col_data_element_3}, 1, 2), input.{col_data_element_4}, > input.{col_data_element_5}) > > When a FlowFile is created, I just make sure that the attributes that I > need are populated correctly and then the SQL script will have the > references to the correct column names in it. > > Hope that helps, > Scott > > James McMahon <[email protected]> > Friday, May 5, 2017 1:42 PM > Matt, here is what I have tried. No error is thrown, but I can't reference > attribute ABCD in the callback function. What am I misunderstanding? -Jim > > class PyStreamCallback(StreamCallback) : > def __init__(self, attrs): > self.attrs = attrs > def process(self,inputStream, outputStream): > outputStream.write(Unicode(json.dumps(self.getAttribute('ABC > DE'))) > > class UpdateAttributes(Processor) : > > def __init__(self) : > self.result = {} > self.__rel_success = Relationship.Builder().name("s > uccess".description("Success").build() > > ( I def an initialize, getRelationships, validate, > getPropertyDescriptor, getPropertyDescriptors, and onPropertyModified) > > def onTrigger(self, context, sessionFactory): > session = sessionFactory.createSession() > try : > flowfiles = session.get(20) > for flowfile in flowfiles : > if flowfile is none : > return > > flowfile = session.putAttribute(flowfile, > "ABCDE", "LARRY_CURLEY_MOE") > > flowfile = session.write(flowfile,PyStrea > mCallback(PySet(flowfile.getAttributes()))) > > session.transfer(flowfile, self.__rel_success) > session.commit() > except: > session.rollback() > raise > > processor = UpdateAttributes() > > > Matt Burgess <[email protected]> > Tuesday, May 2, 2017 12:26 PM > If you want to use attributes inside the callback, I recommend building a > dictionary from flowfile.getAttributes() and passing that into the > PyStreamCallback constructor: > > class PyStreamCallback(StreamCallback): > def __init__(self, attrs): > self.attrs = attrs > > > # ... later on ... > session.write(flowfile, PyStreamCallback(PySet(flowfile.getAttributes()))) > > Or something like that. I thought I'd seen an example somewhere but I > can't find it. > > Regards, > Matt > > > Andy LoPresto <[email protected]> > Tuesday, May 2, 2017 12:06 PM > I am not a Python expert, but if you set “self.result[“x”]” in one class, > can you reference it in a separate class? What is the exception you are > getting? > > Andy LoPresto > [email protected] > *[email protected] <[email protected]>* > PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4 BACE 3C6E F65B 2F7D EF69 > > > James McMahon <[email protected]> > Tuesday, May 2, 2017 12:01 PM > Thanks for your reply Matt. I think this cuts to the heart of my failure > here. I have tried treating PyStreamCallback() as a second class in my > python file, like so: > > class PyStreamCallback(StreamCallback) : > def __init__(self): > pass > def process(self, inputStream, outputStream) : > outputStream.write(Unicode(json.dumps(self.result[' > thisThing']))) > > classUpdateAttributes(Processor) : > def __init__(self) : > self.result{} > self.__rel_success = Relationship......etc etc > . > . > . > def onTrigger(self, context, sessionFactory): > . > . > . > self.result['thisThing'] = flowfile.getAttribute("s3.key") > # this fails: flowfile = session.write(flowfile, > PyStreamCallback()) > # this fails too: flowfile = session.write(self,flowfile, > PyStreamCallback()) > > Am I mistaken to configure PyStreamCallback as a second independent class? > Should it be a defined method within class UpdateAttributes() ? > > > Matt Burgess <[email protected]> > Tuesday, May 2, 2017 11:45 AM > Jim, you still can/should use something like a PyStreamCallback(). > ExecuteScript is basically an onTrigger() body, so you can use the same > approach inside your onTrigger() body in InvokeScriptedProcessor. Pass an > instance of your PyStreamCallback into something like: > > flowfile = session.write(flowfile, PyStreamCallback()) > > at some point before you transfer the flow file. If you need > variables/data from outside the PyStreamCallback() inside, you can pass > them into the constructor or (more dangerously) mess with the scope of the > variable(s). > > Regards, > Matt > > > > >
