Thanks Matt and Joe for your input. I will go through your suggestions. On Mon, May 2, 2016 at 10:16 AM, Matt Burgess <[email protected]> wrote:
> Madhu, > > In addition to Joe's suggestions, currently ExecuteScript only allows > for one task at a time, which is currently a pretty bad bottleneck if > you are dealing with lots of throughput. However I have written up a > Jira [1] for this and issued a PR [2] to fix it, feel free to try that > out and/or review the code. > > Another option in the meantime is to use InvokeScriptedProcessor, > you'd just need some boilerplate to fill out the Processor > implementation, there is an example in the unit tests [3]. > InvokeScriptedProcessor can be run with multiple concurrent tasks, and > after NIFI-1822 is implemented, ExecuteScript will be too. > > [1] https://issues.apache.org/jira/browse/NIFI-1822 > [2] https://github.com/apache/nifi/pull/387 > [3] > https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/test_reader.py > > However as Joe pointed out, Jython itself will always be fairly slow. > If you don't want to code a full processor in Java you could port your > code to Groovy or Javascript for use in the ExecuteScript / > InvokeScriptedProcessor, they're MUCH faster than Jython. > > Regards, > Matt > > On Mon, May 2, 2016 at 10:07 AM, Joe Witt <[email protected]> wrote: > > Madhu, > > > > My testing showed the jython script performance to be poor as well. > > Couple of options to tackle this worth trying: > > 1) write the script such that it handles multiple flowfiles per > > process session (basically batching). This works presuming the > > slowness is the setup/teardown of the script execution environment. > > 2) have multiple instances of this processor running pulling from the > > same queue. Parallelize the processing. > > 3) might be worth simply coding this up in Java. Looks like it might > > be a straightforward processor so now that you've gotten the precise > > logic you want you can turn it into a full nifi processor and you'll > > get solid performance. > > > > Thanks > > Joe > > > > On Mon, May 2, 2016 at 10:03 AM, Madhukar Thota > > <[email protected]> wrote: > >> Hi > >> > >> I am using ExecuteScript Processor( using python/jython script pasted > below) > >> to process http querystring along with useragent parsing. The processor > is > >> very slow and not able to handle heavy load. Lot of them of getting > queued > >> and waiting for the processor to process it. How can i improve the > >> performance and processing? > >> > >> Script: > >> > >> import simplejson as json > >> import datetime > >> import time > >> from org.apache.nifi.processor.io import StreamCallback > >> from user_agents import parse > >> import urllib > >> import urlparse > >> > >> def query_dict(querystring): > >> if not querystring: > >> return {} > >> query = urllib.unquote(querystring).rstrip() > >> query = query.split('&') > >> query = [q.split('=') for q in query] > >> return dict([(q[0], ' '.join(q[1:])) for q in query]) > >> > >> def starPassword(route): > >> parsed = urlparse.urlsplit(route) > >> if '@' not in parsed.netloc: > >> return route > >> > >> userinfo, _, location = parsed.netloc.partition('@') > >> username, _, password = userinfo.partition(':') > >> if not password: > >> return route > >> > >> userinfo = ':'.join([username, '*****']) > >> netloc = '@'.join([userinfo, location]) > >> parsed = parsed._replace(netloc=netloc) > >> return urlparse.urlunsplit(parsed) > >> > >> > >> def num(s): > >> try: > >> return int(s) > >> except ValueError: > >> try: > >> return float(s) > >> except ValueError: > >> try: > >> return s > >> except ValueError: > >> raise ValueError('argument parsing error') > >> > >> > >> class PyStreamCallback(StreamCallback): > >> def __init__(self): > >> pass > >> > >> def process(self, inputStream, outputStream): > >> if flowFile.getAttribute('http.query.string'): > >> d = query_dict(flowFile.getAttribute('http.query.string')) > >> obj = {'timestamp': ltime, > >> 'browser': > >> > str(parse(flowFile.getAttribute('http.headers.User-Agent')).browser.family), > >> 'browser_version': > >> > str(parse(flowFile.getAttribute('http.headers.User-Agent')).browser.version_string), > >> 'os': > >> str(parse(flowFile.getAttribute('http.headers.User-Agent')).os.family), > >> 'os_version': > >> > str(parse(flowFile.getAttribute('http.headers.User-Agent')).os.version_string), > >> 'client_ip': > >> flowFile.getAttribute('http.remote.addr')} > >> > >> for key in d: > >> obj[key.replace(".", "_")] = num(starPassword(d[key])) > >> outputStream.write(bytearray(json.dumps(obj, > separators=(',', > >> ':')))) > >> else: > >> pass > >> > >> > >> flowFile = session.get() > >> if flowFile is not None: > >> flowFile = session.write(flowFile, PyStreamCallback()) > >> session.transfer(flowFile, REL_SUCCESS) > >> > >> Any help is appreciated. > >> > >> -Madhu >
