If you're seeing 'transfer relationship not specified' it means one or
more of the flowfiles you've pulled have not been acted upon for
transfer meaning you have a logic/code flow path whereby a flowfile
can be pulled from the queue but never transferred.

Thanks

On Fri, Apr 7, 2017 at 12:57 PM, Scott Wagner <[email protected]> wrote:
> Jim,
>
>     Here's the full script with unnecessary business logic removed:
>
> flowFiles = session.get(10)
> for flowFile in flowFiles:
>     if flowFile is None:
>         continue
>     s3_bucket = flowFile.getAttribute('job.s3_bucket')
>     s3_path = flowFile.getAttribute('job.s3_path')
>     # More stuff here....
>     errors = []
>     # More stuff here...
>     if len(errors) > 0:
>         flowFile = session.putAttribute(flowFile, 'job.error',
> ';'.join(errors))
>         session.transfer(flowFile, REL_FAILURE)
>     else:
>         flowFile = session.putAttribute(flowFile, 'job.number_csv_files',
> str(len(matches)))
>         flowFile = session.putAttribute(flowFile, 'job.total_file_size',
> str(total_size))
>         session.transfer(flowFile, REL_SUCCESS)
>
>     I'm not calling session.commit anywhere.
>
>     Here's another script (this one is the full file - no business secrets
> in here!) that creates N number of flowfiles from an input file based on the
> attributes defining a numeric range:
>
> import sys
> import traceback
> from java.nio.charset import StandardCharsets
> from org.apache.commons.io import IOUtils
> from org.apache.nifi.processor.io import StreamCallback
> from org.python.core.util import StringUtil
>
>
> flowFiles = session.get(10)
> for flowFile in flowFiles:
>     if flowFile is None:
>         continue
>     start = int(flowFile.getAttribute('range.start'))
>     stop = int(flowFile.getAttribute('range.stop'))
>     increment = int(flowFile.getAttribute('range.increment'))
>     for x in range(start, stop + 1, increment):
>         newFlowFile = session.clone(flowFile)
>         newFlowFile = session.putAttribute(newFlowFile, 'current', str(x))
>         session.transfer(newFlowFile, REL_SUCCESS)
>     session.remove(flowFile)
>
>     I hope these examples are helpful.
>
> - Scott
>
> James McMahon
> Friday, April 7, 2017 11:22 AM
> Scott, how did you refine your session.transfer and session.commit when you
> introduced the for loop?
>
> I am getting a "transfer relationship not specified" when I move my transfer
> and my commit into the "for flowFile" loop. Can you show the bottom closure
> to your # Do stuff here? Thank you sir.
>
> Jim
>
>
> Scott Wagner
> Wednesday, April 5, 2017 3:26 PM
> One of my experiences is that when using ExecuteScript and Python is that
> having an ExecuteScript that works on an individual FlowFile when you have
> multiple in the input queue is very inefficient, even when you set it to a
> timer of 0 sec.
>
> Instead, I have the following in all of my Python scripts:
>
> flowFiles = session.get(10)
> for flowFile in flowFiles:
>     if flowFile is None:
>         continue
>     # Do stuff here
>
> That seems to improve the throughput of the ExecuteScript processor
> dramatically.
>
> YMMV
>
> - Scott
>
> James McMahon
> Wednesday, April 5, 2017 12:48 PM
> I am receiving POSTs from a Pentaho process, delivering files to my NiFi
> 0.7.x workflow HandleHttpRequest processor. That processor hands the
> flowfile off to an ExecuteScript processor that runs a python script. This
> script is very, very simple: it takes an incoming JSO object and loads it
> into a Python dictionary, and verifies the presence of required fields using
> simple has_key checks on the dictionary. There are only eight fields in the
> incoming JSON object.
>
> The throughput for these two processes is not exceeding 100-150 files in
> five minutes. It seems very slow in light of the minimal processing going on
> in these two steps.
>
> I notice that there are configuration operations seemingly related to
> optimizing performance. "Concurrent tasks", for example,  is only set by
> default to 1 for each processor.
>
> What performance optimizations at the processor level do users recommend? Is
> it advisable to crank up the concurrent tasks for a processor, and is there
> an optimal performance point beyond which you should not crank up that
> value? Are there trade-offs?
>
> I am particularly interested in optimizations for HandleHttpRequest and
> ExecuteScript processors.
>
> Thanks in advance for your thoughts.
>
> cheers,
>
> Jim
>
>

Reply via email to