Jim,
Here's the full script with unnecessary business logic removed:
flowFiles = session.get(10)
for flowFile in flowFiles:
if flowFile is None:
continue
s3_bucket = flowFile.getAttribute('job.s3_bucket')
s3_path = flowFile.getAttribute('job.s3_path')
# More stuff here....
errors = []
# More stuff here...
if len(errors) > 0:
flowFile = session.putAttribute(flowFile, 'job.error',
';'.join(errors))
session.transfer(flowFile, REL_FAILURE)
else:
flowFile = session.putAttribute(flowFile,
'job.number_csv_files', str(len(matches)))
flowFile = session.putAttribute(flowFile,
'job.total_file_size', str(total_size))
session.transfer(flowFile, REL_SUCCESS)
I'm not calling session.commit anywhere.
Here's another script (this one is the full file - no business
secrets in here!) that creates N number of flowfiles from an input file
based on the attributes defining a numeric range:
import sys
import traceback
from java.nio.charset import StandardCharsets
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import StreamCallback
from org.python.core.util import StringUtil
flowFiles = session.get(10)
for flowFile in flowFiles:
if flowFile is None:
continue
start = int(flowFile.getAttribute('range.start'))
stop = int(flowFile.getAttribute('range.stop'))
increment = int(flowFile.getAttribute('range.increment'))
for x in range(start, stop + 1, increment):
newFlowFile = session.clone(flowFile)
newFlowFile = session.putAttribute(newFlowFile, 'current', str(x))
session.transfer(newFlowFile, REL_SUCCESS)
session.remove(flowFile)
I hope these examples are helpful.
- Scott
James McMahon <mailto:[email protected]>
Friday, April 7, 2017 11:22 AM
Scott, how did you refine your session.transfer and session.commit
when you introduced the for loop?
I am getting a "transfer relationship not specified" when I move my
transfer and my commit into the "for flowFile" loop. Can you show the
bottom closure to your # Do stuff here? Thank you sir.
Jim
Scott Wagner <mailto:[email protected]>
Wednesday, April 5, 2017 3:26 PM
One of my experiences is that when using ExecuteScript and Python is
that having an ExecuteScript that works on an individual FlowFile when
you have multiple in the input queue is very inefficient, even when
you set it to a timer of 0 sec.
Instead, I have the following in all of my Python scripts:
flowFiles = session.get(10)
for flowFile in flowFiles:
if flowFile is None:
continue
# Do stuff here
That seems to improve the throughput of the ExecuteScript processor
dramatically.
YMMV
- Scott
James McMahon <mailto:[email protected]>
Wednesday, April 5, 2017 12:48 PM
I am receiving POSTs from a Pentaho process, delivering files to my
NiFi 0.7.x workflow HandleHttpRequest processor. That processor hands
the flowfile off to an ExecuteScript processor that runs a python
script. This script is very, very simple: it takes an incoming JSO
object and loads it into a Python dictionary, and verifies the
presence of required fields using simple has_key checks on the
dictionary. There are only eight fields in the incoming JSON object.
The throughput for these two processes is not exceeding 100-150 files
in five minutes. It seems very slow in light of the minimal processing
going on in these two steps.
I notice that there are configuration operations seemingly related to
optimizing performance. "Concurrent tasks", for example, is only set
by default to 1 for each processor.
What performance optimizations at the processor level do users
recommend? Is it advisable to crank up the concurrent tasks for a
processor, and is there an optimal performance point beyond which you
should not crank up that value? Are there trade-offs?
I am particularly interested in optimizations for HandleHttpRequest
and ExecuteScript processors.
Thanks in advance for your thoughts.
cheers,
Jim