Thank you Joe. Indeed, I employ a session.commit() that was being used inside the "for each". I changed that to keep the file-by-file transfer inside and at the end of the foreach, but I moved the commit() outside the loop and after. It works great, and just as Scott had mentioned this "grab multiple flowfiles" approach makes the ExecuteScript . In my case it reduced processing time by 80% in my case when I reduce the number of times I incur the initialization cost of my ExecuteScript processor.
On Fri, Apr 7, 2017 at 1:03 PM, Joe Witt <[email protected]> wrote: > If you're seeing 'transfer relationship not specified' it means one or > more of the flowfiles you've pulled have not been acted upon for > transfer meaning you have a logic/code flow path whereby a flowfile > can be pulled from the queue but never transferred. > > Thanks > > On Fri, Apr 7, 2017 at 12:57 PM, Scott Wagner <[email protected]> > wrote: > > Jim, > > > > Here's the full script with unnecessary business logic removed: > > > > flowFiles = session.get(10) > > for flowFile in flowFiles: > > if flowFile is None: > > continue > > s3_bucket = flowFile.getAttribute('job.s3_bucket') > > s3_path = flowFile.getAttribute('job.s3_path') > > # More stuff here.... > > errors = [] > > # More stuff here... > > if len(errors) > 0: > > flowFile = session.putAttribute(flowFile, 'job.error', > > ';'.join(errors)) > > session.transfer(flowFile, REL_FAILURE) > > else: > > flowFile = session.putAttribute(flowFile, 'job.number_csv_files', > > str(len(matches))) > > flowFile = session.putAttribute(flowFile, 'job.total_file_size', > > str(total_size)) > > session.transfer(flowFile, REL_SUCCESS) > > > > I'm not calling session.commit anywhere. > > > > Here's another script (this one is the full file - no business > secrets > > in here!) that creates N number of flowfiles from an input file based on > the > > attributes defining a numeric range: > > > > import sys > > import traceback > > from java.nio.charset import StandardCharsets > > from org.apache.commons.io import IOUtils > > from org.apache.nifi.processor.io import StreamCallback > > from org.python.core.util import StringUtil > > > > > > flowFiles = session.get(10) > > for flowFile in flowFiles: > > if flowFile is None: > > continue > > start = int(flowFile.getAttribute('range.start')) > > stop = int(flowFile.getAttribute('range.stop')) > > increment = int(flowFile.getAttribute('range.increment')) > > for x in range(start, stop + 1, increment): > > newFlowFile = session.clone(flowFile) > > newFlowFile = session.putAttribute(newFlowFile, 'current', > str(x)) > > session.transfer(newFlowFile, REL_SUCCESS) > > session.remove(flowFile) > > > > I hope these examples are helpful. > > > > - Scott > > > > James McMahon > > Friday, April 7, 2017 11:22 AM > > Scott, how did you refine your session.transfer and session.commit when > you > > introduced the for loop? > > > > I am getting a "transfer relationship not specified" when I move my > transfer > > and my commit into the "for flowFile" loop. Can you show the bottom > closure > > to your # Do stuff here? Thank you sir. > > > > Jim > > > > > > Scott Wagner > > Wednesday, April 5, 2017 3:26 PM > > One of my experiences is that when using ExecuteScript and Python is that > > having an ExecuteScript that works on an individual FlowFile when you > have > > multiple in the input queue is very inefficient, even when you set it to > a > > timer of 0 sec. > > > > Instead, I have the following in all of my Python scripts: > > > > flowFiles = session.get(10) > > for flowFile in flowFiles: > > if flowFile is None: > > continue > > # Do stuff here > > > > That seems to improve the throughput of the ExecuteScript processor > > dramatically. > > > > YMMV > > > > - Scott > > > > James McMahon > > Wednesday, April 5, 2017 12:48 PM > > I am receiving POSTs from a Pentaho process, delivering files to my NiFi > > 0.7.x workflow HandleHttpRequest processor. That processor hands the > > flowfile off to an ExecuteScript processor that runs a python script. > This > > script is very, very simple: it takes an incoming JSO object and loads it > > into a Python dictionary, and verifies the presence of required fields > using > > simple has_key checks on the dictionary. There are only eight fields in > the > > incoming JSON object. > > > > The throughput for these two processes is not exceeding 100-150 files in > > five minutes. It seems very slow in light of the minimal processing > going on > > in these two steps. > > > > I notice that there are configuration operations seemingly related to > > optimizing performance. "Concurrent tasks", for example, is only set by > > default to 1 for each processor. > > > > What performance optimizations at the processor level do users > recommend? Is > > it advisable to crank up the concurrent tasks for a processor, and is > there > > an optimal performance point beyond which you should not crank up that > > value? Are there trade-offs? > > > > I am particularly interested in optimizations for HandleHttpRequest and > > ExecuteScript processors. > > > > Thanks in advance for your thoughts. > > > > cheers, > > > > Jim > > > > >
