James McMahon <mailto:[email protected]>
Tuesday, May 9, 2017 9:01 AM
Scott, you don't seem to use the onTrigger() method in a processor
class of an InvokeScriptedProcessor (ISP) script. Am I correct in
assuming that you are doing this in an ExecuteScript processor?
Have you incorporated a Python Stream Callback as a second class in a
python script that already has a processor class for ISP, in which the
onTrigger() method for the ISP processor class calls the Python Stream
Callback class? That is what I cannot get to work. -Jim
Scott Wagner <mailto:[email protected]>
Monday, May 8, 2017 10:09 AM
James,
Please find below a script that takes all of the attribute from a
FlowFile, and then iterates the content of the input flowfile, and
calls python's format() method on the entire content and replaces it
with the output. The use case for this is I can make a FlowFile with
content that contains references to attribute names in the content,
and this processor will replace those strings with the actual
attributes themselves using python's native dictionary formatting methods:
import sys
import traceback
from java.nio.charset import StandardCharsets
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import StreamCallback
from org.python.core import PyFile
from org.python.core.util import FileUtil, StringUtil
class TransformCallback(StreamCallback):
def __init__(self, flowFile):
self.flowFile = flowFile
def process(self, inputStream, outputStream):
try:
attrs = self.flowFile.getAttributes()
pf = FileUtil().wrap(inputStream)
output = []
for line in pf.readlines():
line = line.format(**attrs).rstrip('\n')
output.append(line)
outputStream.write('\n'.join(output))
except:
traceback.print_exc(file=sys.stdout)
raise
flowFiles = session.get(10)
for flowFile in flowFiles:
if flowFile is None:
continue
flowFile = session.write(flowFile, TransformCallback(flowFile))
session.transfer(flowFile, REL_SUCCESS)
My real-world usage of this is to have a file on disk that contains a
set of SQL queries that will need to be executed and reference the
attribute names in that flat file, then run it through this processor
to do the string replacements for me. Here's a sample of what the
input would look like:
DROP TABLE IF EXISTS {output_table_name}
;
CREATE TABLE {output_table_name} AS
SELECT
am.some_id AS {col_some_id},
am.some_other_id AS {col_some_other_id},
input.*
FROM
{table_name} input
LEFT OUTER JOIN some_other_table sot
ON sot.data_hash =
f_hash_data(substring(input.{col_data_element_1}, 1, 70),
substring(input.{col_data_element_2}, 1, 32),
substring(input.{col_data_element_3}, 1, 2),
input.{col_data_element_4}, input.{col_data_element_5})
When a FlowFile is created, I just make sure that the attributes that
I need are populated correctly and then the SQL script will have the
references to the correct column names in it.
Hope that helps,
Scott
James McMahon <mailto:[email protected]>
Friday, May 5, 2017 1:42 PM
Matt, here is what I have tried. No error is thrown, but I can't
reference attribute ABCD in the callback function. What am I
misunderstanding? -Jim
class PyStreamCallback(StreamCallback) :
def __init__(self, attrs):
self.attrs = attrs
def process(self,inputStream, outputStream):
outputStream.write(Unicode(json.dumps(self.getAttribute('ABCDE')))
class UpdateAttributes(Processor) :
def __init__(self) :
self.result = {}
self.__rel_success =
Relationship.Builder().name("success".description("Success").build()
( I def an initialize, getRelationships, validate,
getPropertyDescriptor, getPropertyDescriptors, and onPropertyModified)
def onTrigger(self, context, sessionFactory):
session = sessionFactory.createSession()
try :
flowfiles = session.get(20)
for flowfile in flowfiles :
if flowfile is none :
return
flowfile = session.putAttribute(flowfile,
"ABCDE", "LARRY_CURLEY_MOE")
flowfile =
session.write(flowfile,PyStreamCallback(PySet(flowfile.getAttributes())))
session.transfer(flowfile, self.__rel_success)
session.commit()
except:
session.rollback()
raise
processor = UpdateAttributes()
Matt Burgess <mailto:[email protected]>
Tuesday, May 2, 2017 12:26 PM
If you want to use attributes inside the callback, I recommend
building a dictionary from flowfile.getAttributes() and passing that
into the PyStreamCallback constructor:
class PyStreamCallback(StreamCallback):
def __init__(self, attrs):
self.attrs = attrs
# ... later on ...
session.write(flowfile, PyStreamCallback(PySet(flowfile.getAttributes())))
Or something like that. I thought I'd seen an example somewhere but
I can't find it.
Regards,
Matt
Andy LoPresto <mailto:[email protected]>
Tuesday, May 2, 2017 12:06 PM
I am not a Python expert, but if you set “self.result[“x”]” in one
class, can you reference it in a separate class? What is the exception
you are getting?
Andy LoPresto
[email protected] <mailto:[email protected]>
/[email protected] <mailto:[email protected]>/
PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4 BACE 3C6E F65B 2F7D EF69