James,
Please find below a script that takes all of the attribute from a
FlowFile, and then iterates the content of the input flowfile, and calls
python's format() method on the entire content and replaces it with the
output. The use case for this is I can make a FlowFile with content
that contains references to attribute names in the content, and this
processor will replace those strings with the actual attributes
themselves using python's native dictionary formatting methods:
import sys
import traceback
from java.nio.charset import StandardCharsets
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import StreamCallback
from org.python.core import PyFile
from org.python.core.util import FileUtil, StringUtil
class TransformCallback(StreamCallback):
def __init__(self, flowFile):
self.flowFile = flowFile
def process(self, inputStream, outputStream):
try:
attrs = self.flowFile.getAttributes()
pf = FileUtil().wrap(inputStream)
output = []
for line in pf.readlines():
line = line.format(**attrs).rstrip('\n')
output.append(line)
outputStream.write('\n'.join(output))
except:
traceback.print_exc(file=sys.stdout)
raise
flowFiles = session.get(10)
for flowFile in flowFiles:
if flowFile is None:
continue
flowFile = session.write(flowFile, TransformCallback(flowFile))
session.transfer(flowFile, REL_SUCCESS)
My real-world usage of this is to have a file on disk that contains a
set of SQL queries that will need to be executed and reference the
attribute names in that flat file, then run it through this processor to
do the string replacements for me. Here's a sample of what the input
would look like:
DROP TABLE IF EXISTS {output_table_name}
;
CREATE TABLE {output_table_name} AS
SELECT
am.some_id AS {col_some_id},
am.some_other_id AS {col_some_other_id},
input.*
FROM
{table_name} input
LEFT OUTER JOIN some_other_table sot
ON sot.data_hash =
f_hash_data(substring(input.{col_data_element_1}, 1, 70),
substring(input.{col_data_element_2}, 1, 32),
substring(input.{col_data_element_3}, 1, 2), input.{col_data_element_4},
input.{col_data_element_5})
When a FlowFile is created, I just make sure that the attributes that I
need are populated correctly and then the SQL script will have the
references to the correct column names in it.
Hope that helps,
Scott
James McMahon <mailto:[email protected]>
Friday, May 5, 2017 1:42 PM
Matt, here is what I have tried. No error is thrown, but I can't
reference attribute ABCD in the callback function. What am I
misunderstanding? -Jim
class PyStreamCallback(StreamCallback) :
def __init__(self, attrs):
self.attrs = attrs
def process(self,inputStream, outputStream):
outputStream.write(Unicode(json.dumps(self.getAttribute('ABCDE')))
class UpdateAttributes(Processor) :
def __init__(self) :
self.result = {}
self.__rel_success =
Relationship.Builder().name("success".description("Success").build()
( I def an initialize, getRelationships, validate,
getPropertyDescriptor, getPropertyDescriptors, and onPropertyModified)
def onTrigger(self, context, sessionFactory):
session = sessionFactory.createSession()
try :
flowfiles = session.get(20)
for flowfile in flowfiles :
if flowfile is none :
return
flowfile = session.putAttribute(flowfile,
"ABCDE", "LARRY_CURLEY_MOE")
flowfile =
session.write(flowfile,PyStreamCallback(PySet(flowfile.getAttributes())))
session.transfer(flowfile, self.__rel_success)
session.commit()
except:
session.rollback()
raise
processor = UpdateAttributes()
Matt Burgess <mailto:[email protected]>
Tuesday, May 2, 2017 12:26 PM
If you want to use attributes inside the callback, I recommend
building a dictionary from flowfile.getAttributes() and passing that
into the PyStreamCallback constructor:
class PyStreamCallback(StreamCallback):
def __init__(self, attrs):
self.attrs = attrs
# ... later on ...
session.write(flowfile, PyStreamCallback(PySet(flowfile.getAttributes())))
Or something like that. I thought I'd seen an example somewhere but
I can't find it.
Regards,
Matt
Andy LoPresto <mailto:[email protected]>
Tuesday, May 2, 2017 12:06 PM
I am not a Python expert, but if you set “self.result[“x”]” in one
class, can you reference it in a separate class? What is the exception
you are getting?
Andy LoPresto
[email protected] <mailto:[email protected]>
/[email protected] <mailto:[email protected]>/
PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4 BACE 3C6E F65B 2F7D EF69
James McMahon <mailto:[email protected]>
Tuesday, May 2, 2017 12:01 PM
Thanks for your reply Matt. I think this cuts to the heart of my
failure here. I have tried treating PyStreamCallback() as a second
class in my python file, like so:
class PyStreamCallback(StreamCallback) :
def __init__(self):
pass
def process(self, inputStream, outputStream) :
outputStream.write(Unicode(json.dumps(self.result['thisThing'])))
classUpdateAttributes(Processor) :
def __init__(self) :
self.result{}
self.__rel_success = Relationship......etc etc
.
.
.
def onTrigger(self, context, sessionFactory):
.
.
.
self.result['thisThing'] = flowfile.getAttribute("s3.key")
# this fails: flowfile =
session.write(flowfile,PyStreamCallback())
# this fails too: flowfile =
session.write(self,flowfile,PyStreamCallback())
Am I mistaken to configure PyStreamCallback as a second
independent class? Should it be a defined method within class
UpdateAttributes() ?
Matt Burgess <mailto:[email protected]>
Tuesday, May 2, 2017 11:45 AM
Jim, you still can/should use something like a PyStreamCallback().
ExecuteScript is basically an onTrigger() body, so you can use the
same approach inside your onTrigger() body in
InvokeScriptedProcessor. Pass an instance of your PyStreamCallback
into something like:
flowfile = session.write(flowfile, PyStreamCallback())
at some point before you transfer the flow file. If you need
variables/data from outside the PyStreamCallback() inside, you can
pass them into the constructor or (more dangerously) mess with the
scope of the variable(s).
Regards,
Matt