If you're seeing 'transfer relationship not specified' it means one or more of the flowfiles you've pulled have not been acted upon for transfer meaning you have a logic/code flow path whereby a flowfile can be pulled from the queue but never transferred.
Thanks On Fri, Apr 7, 2017 at 12:57 PM, Scott Wagner <[email protected]> wrote: > Jim, > > Here's the full script with unnecessary business logic removed: > > flowFiles = session.get(10) > for flowFile in flowFiles: > if flowFile is None: > continue > s3_bucket = flowFile.getAttribute('job.s3_bucket') > s3_path = flowFile.getAttribute('job.s3_path') > # More stuff here.... > errors = [] > # More stuff here... > if len(errors) > 0: > flowFile = session.putAttribute(flowFile, 'job.error', > ';'.join(errors)) > session.transfer(flowFile, REL_FAILURE) > else: > flowFile = session.putAttribute(flowFile, 'job.number_csv_files', > str(len(matches))) > flowFile = session.putAttribute(flowFile, 'job.total_file_size', > str(total_size)) > session.transfer(flowFile, REL_SUCCESS) > > I'm not calling session.commit anywhere. > > Here's another script (this one is the full file - no business secrets > in here!) that creates N number of flowfiles from an input file based on the > attributes defining a numeric range: > > import sys > import traceback > from java.nio.charset import StandardCharsets > from org.apache.commons.io import IOUtils > from org.apache.nifi.processor.io import StreamCallback > from org.python.core.util import StringUtil > > > flowFiles = session.get(10) > for flowFile in flowFiles: > if flowFile is None: > continue > start = int(flowFile.getAttribute('range.start')) > stop = int(flowFile.getAttribute('range.stop')) > increment = int(flowFile.getAttribute('range.increment')) > for x in range(start, stop + 1, increment): > newFlowFile = session.clone(flowFile) > newFlowFile = session.putAttribute(newFlowFile, 'current', str(x)) > session.transfer(newFlowFile, REL_SUCCESS) > session.remove(flowFile) > > I hope these examples are helpful. > > - Scott > > James McMahon > Friday, April 7, 2017 11:22 AM > Scott, how did you refine your session.transfer and session.commit when you > introduced the for loop? > > I am getting a "transfer relationship not specified" when I move my transfer > and my commit into the "for flowFile" loop. Can you show the bottom closure > to your # Do stuff here? Thank you sir. > > Jim > > > Scott Wagner > Wednesday, April 5, 2017 3:26 PM > One of my experiences is that when using ExecuteScript and Python is that > having an ExecuteScript that works on an individual FlowFile when you have > multiple in the input queue is very inefficient, even when you set it to a > timer of 0 sec. > > Instead, I have the following in all of my Python scripts: > > flowFiles = session.get(10) > for flowFile in flowFiles: > if flowFile is None: > continue > # Do stuff here > > That seems to improve the throughput of the ExecuteScript processor > dramatically. > > YMMV > > - Scott > > James McMahon > Wednesday, April 5, 2017 12:48 PM > I am receiving POSTs from a Pentaho process, delivering files to my NiFi > 0.7.x workflow HandleHttpRequest processor. That processor hands the > flowfile off to an ExecuteScript processor that runs a python script. This > script is very, very simple: it takes an incoming JSO object and loads it > into a Python dictionary, and verifies the presence of required fields using > simple has_key checks on the dictionary. There are only eight fields in the > incoming JSON object. > > The throughput for these two processes is not exceeding 100-150 files in > five minutes. It seems very slow in light of the minimal processing going on > in these two steps. > > I notice that there are configuration operations seemingly related to > optimizing performance. "Concurrent tasks", for example, is only set by > default to 1 for each processor. > > What performance optimizations at the processor level do users recommend? Is > it advisable to crank up the concurrent tasks for a processor, and is there > an optimal performance point beyond which you should not crank up that > value? Are there trade-offs? > > I am particularly interested in optimizations for HandleHttpRequest and > ExecuteScript processors. > > Thanks in advance for your thoughts. > > cheers, > > Jim > >
