You sir are a hero! 😊 Is this BTW documented anywhere or should I’ve just read the source?
I had no idea you could do a session.get(n) Cheers, Giovanni From: Scott Wagner [mailto:[email protected]] Sent: Friday, April 7, 2017 6:58 PM To: James McMahon <[email protected]> Cc: [email protected] Subject: Re: Options for increasing performance? Jim, Here's the full script with unnecessary business logic removed: flowFiles = session.get(10) for flowFile in flowFiles: if flowFile is None: continue s3_bucket = flowFile.getAttribute('job.s3_bucket') s3_path = flowFile.getAttribute('job.s3_path') # More stuff here.... errors = [] # More stuff here... if len(errors) > 0: flowFile = session.putAttribute(flowFile, 'job.error', ';'.join(errors)) session.transfer(flowFile, REL_FAILURE) else: flowFile = session.putAttribute(flowFile, 'job.number_csv_files', str(len(matches))) flowFile = session.putAttribute(flowFile, 'job.total_file_size', str(total_size)) session.transfer(flowFile, REL_SUCCESS) I'm not calling session.commit anywhere. Here's another script (this one is the full file - no business secrets in here!) that creates N number of flowfiles from an input file based on the attributes defining a numeric range: import sys import traceback from java.nio.charset import StandardCharsets from org.apache.commons.io import IOUtils from org.apache.nifi.processor.io import StreamCallback from org.python.core.util import StringUtil flowFiles = session.get(10) for flowFile in flowFiles: if flowFile is None: continue start = int(flowFile.getAttribute('range.start')) stop = int(flowFile.getAttribute('range.stop')) increment = int(flowFile.getAttribute('range.increment')) for x in range(start, stop + 1, increment): newFlowFile = session.clone(flowFile) newFlowFile = session.putAttribute(newFlowFile, 'current', str(x)) session.transfer(newFlowFile, REL_SUCCESS) session.remove(flowFile) I hope these examples are helpful. - Scott James McMahon<mailto:[email protected]> Friday, April 7, 2017 11:22 AM Scott, how did you refine your session.transfer and session.commit when you introduced the for loop? I am getting a "transfer relationship not specified" when I move my transfer and my commit into the "for flowFile" loop. Can you show the bottom closure to your # Do stuff here? Thank you sir. Jim Scott Wagner<mailto:[email protected]> Wednesday, April 5, 2017 3:26 PM One of my experiences is that when using ExecuteScript and Python is that having an ExecuteScript that works on an individual FlowFile when you have multiple in the input queue is very inefficient, even when you set it to a timer of 0 sec. Instead, I have the following in all of my Python scripts: flowFiles = session.get(10) for flowFile in flowFiles: if flowFile is None: continue # Do stuff here That seems to improve the throughput of the ExecuteScript processor dramatically. YMMV - Scott James McMahon<mailto:[email protected]> Wednesday, April 5, 2017 12:48 PM I am receiving POSTs from a Pentaho process, delivering files to my NiFi 0.7.x workflow HandleHttpRequest processor. That processor hands the flowfile off to an ExecuteScript processor that runs a python script. This script is very, very simple: it takes an incoming JSO object and loads it into a Python dictionary, and verifies the presence of required fields using simple has_key checks on the dictionary. There are only eight fields in the incoming JSON object. The throughput for these two processes is not exceeding 100-150 files in five minutes. It seems very slow in light of the minimal processing going on in these two steps. I notice that there are configuration operations seemingly related to optimizing performance. "Concurrent tasks", for example, is only set by default to 1 for each processor. What performance optimizations at the processor level do users recommend? Is it advisable to crank up the concurrent tasks for a processor, and is there an optimal performance point beyond which you should not crank up that value? Are there trade-offs? I am particularly interested in optimizations for HandleHttpRequest and ExecuteScript processors. Thanks in advance for your thoughts. cheers, Jim
