Thank you Joe. Indeed, I employ a session.commit() that was being used
inside the "for each". I changed that to keep the file-by-file transfer
inside and at the end of the foreach, but I moved the commit() outside the
loop and after. It works great, and just as Scott had mentioned this "grab
multiple flowfiles" approach makes the ExecuteScript . In my case it
reduced processing time by 80% in my case when I reduce the number of times
I incur the initialization cost of my ExecuteScript processor.

On Fri, Apr 7, 2017 at 1:03 PM, Joe Witt <[email protected]> wrote:

> If you're seeing 'transfer relationship not specified' it means one or
> more of the flowfiles you've pulled have not been acted upon for
> transfer meaning you have a logic/code flow path whereby a flowfile
> can be pulled from the queue but never transferred.
>
> Thanks
>
> On Fri, Apr 7, 2017 at 12:57 PM, Scott Wagner <[email protected]>
> wrote:
> > Jim,
> >
> >     Here's the full script with unnecessary business logic removed:
> >
> > flowFiles = session.get(10)
> > for flowFile in flowFiles:
> >     if flowFile is None:
> >         continue
> >     s3_bucket = flowFile.getAttribute('job.s3_bucket')
> >     s3_path = flowFile.getAttribute('job.s3_path')
> >     # More stuff here....
> >     errors = []
> >     # More stuff here...
> >     if len(errors) > 0:
> >         flowFile = session.putAttribute(flowFile, 'job.error',
> > ';'.join(errors))
> >         session.transfer(flowFile, REL_FAILURE)
> >     else:
> >         flowFile = session.putAttribute(flowFile, 'job.number_csv_files',
> > str(len(matches)))
> >         flowFile = session.putAttribute(flowFile, 'job.total_file_size',
> > str(total_size))
> >         session.transfer(flowFile, REL_SUCCESS)
> >
> >     I'm not calling session.commit anywhere.
> >
> >     Here's another script (this one is the full file - no business
> secrets
> > in here!) that creates N number of flowfiles from an input file based on
> the
> > attributes defining a numeric range:
> >
> > import sys
> > import traceback
> > from java.nio.charset import StandardCharsets
> > from org.apache.commons.io import IOUtils
> > from org.apache.nifi.processor.io import StreamCallback
> > from org.python.core.util import StringUtil
> >
> >
> > flowFiles = session.get(10)
> > for flowFile in flowFiles:
> >     if flowFile is None:
> >         continue
> >     start = int(flowFile.getAttribute('range.start'))
> >     stop = int(flowFile.getAttribute('range.stop'))
> >     increment = int(flowFile.getAttribute('range.increment'))
> >     for x in range(start, stop + 1, increment):
> >         newFlowFile = session.clone(flowFile)
> >         newFlowFile = session.putAttribute(newFlowFile, 'current',
> str(x))
> >         session.transfer(newFlowFile, REL_SUCCESS)
> >     session.remove(flowFile)
> >
> >     I hope these examples are helpful.
> >
> > - Scott
> >
> > James McMahon
> > Friday, April 7, 2017 11:22 AM
> > Scott, how did you refine your session.transfer and session.commit when
> you
> > introduced the for loop?
> >
> > I am getting a "transfer relationship not specified" when I move my
> transfer
> > and my commit into the "for flowFile" loop. Can you show the bottom
> closure
> > to your # Do stuff here? Thank you sir.
> >
> > Jim
> >
> >
> > Scott Wagner
> > Wednesday, April 5, 2017 3:26 PM
> > One of my experiences is that when using ExecuteScript and Python is that
> > having an ExecuteScript that works on an individual FlowFile when you
> have
> > multiple in the input queue is very inefficient, even when you set it to
> a
> > timer of 0 sec.
> >
> > Instead, I have the following in all of my Python scripts:
> >
> > flowFiles = session.get(10)
> > for flowFile in flowFiles:
> >     if flowFile is None:
> >         continue
> >     # Do stuff here
> >
> > That seems to improve the throughput of the ExecuteScript processor
> > dramatically.
> >
> > YMMV
> >
> > - Scott
> >
> > James McMahon
> > Wednesday, April 5, 2017 12:48 PM
> > I am receiving POSTs from a Pentaho process, delivering files to my NiFi
> > 0.7.x workflow HandleHttpRequest processor. That processor hands the
> > flowfile off to an ExecuteScript processor that runs a python script.
> This
> > script is very, very simple: it takes an incoming JSO object and loads it
> > into a Python dictionary, and verifies the presence of required fields
> using
> > simple has_key checks on the dictionary. There are only eight fields in
> the
> > incoming JSON object.
> >
> > The throughput for these two processes is not exceeding 100-150 files in
> > five minutes. It seems very slow in light of the minimal processing
> going on
> > in these two steps.
> >
> > I notice that there are configuration operations seemingly related to
> > optimizing performance. "Concurrent tasks", for example,  is only set by
> > default to 1 for each processor.
> >
> > What performance optimizations at the processor level do users
> recommend? Is
> > it advisable to crank up the concurrent tasks for a processor, and is
> there
> > an optimal performance point beyond which you should not crank up that
> > value? Are there trade-offs?
> >
> > I am particularly interested in optimizations for HandleHttpRequest and
> > ExecuteScript processors.
> >
> > Thanks in advance for your thoughts.
> >
> > cheers,
> >
> > Jim
> >
> >
>

Reply via email to