Just to provide an update. I did rewrite the same logic in lua and used in executescript processor. The performance is 5-10x faster compare to jython. very pleased with performance of lua processor.
Next steps: will checkout https://issues.apache.org/jira/browse/NIFI-1822 to test multiple concurrent tasks using executescript processor. -Madhu On Mon, May 2, 2016 at 11:55 AM, Madhukar Thota <[email protected]> wrote: > Thanks Matt and Joe for your input. I will go through your suggestions. > > On Mon, May 2, 2016 at 10:16 AM, Matt Burgess <[email protected]> wrote: > >> Madhu, >> >> In addition to Joe's suggestions, currently ExecuteScript only allows >> for one task at a time, which is currently a pretty bad bottleneck if >> you are dealing with lots of throughput. However I have written up a >> Jira [1] for this and issued a PR [2] to fix it, feel free to try that >> out and/or review the code. >> >> Another option in the meantime is to use InvokeScriptedProcessor, >> you'd just need some boilerplate to fill out the Processor >> implementation, there is an example in the unit tests [3]. >> InvokeScriptedProcessor can be run with multiple concurrent tasks, and >> after NIFI-1822 is implemented, ExecuteScript will be too. >> >> [1] https://issues.apache.org/jira/browse/NIFI-1822 >> [2] https://github.com/apache/nifi/pull/387 >> [3] >> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/test_reader.py >> >> However as Joe pointed out, Jython itself will always be fairly slow. >> If you don't want to code a full processor in Java you could port your >> code to Groovy or Javascript for use in the ExecuteScript / >> InvokeScriptedProcessor, they're MUCH faster than Jython. >> >> Regards, >> Matt >> >> On Mon, May 2, 2016 at 10:07 AM, Joe Witt <[email protected]> wrote: >> > Madhu, >> > >> > My testing showed the jython script performance to be poor as well. >> > Couple of options to tackle this worth trying: >> > 1) write the script such that it handles multiple flowfiles per >> > process session (basically batching). This works presuming the >> > slowness is the setup/teardown of the script execution environment. >> > 2) have multiple instances of this processor running pulling from the >> > same queue. Parallelize the processing. >> > 3) might be worth simply coding this up in Java. Looks like it might >> > be a straightforward processor so now that you've gotten the precise >> > logic you want you can turn it into a full nifi processor and you'll >> > get solid performance. >> > >> > Thanks >> > Joe >> > >> > On Mon, May 2, 2016 at 10:03 AM, Madhukar Thota >> > <[email protected]> wrote: >> >> Hi >> >> >> >> I am using ExecuteScript Processor( using python/jython script pasted >> below) >> >> to process http querystring along with useragent parsing. The >> processor is >> >> very slow and not able to handle heavy load. Lot of them of getting >> queued >> >> and waiting for the processor to process it. How can i improve the >> >> performance and processing? >> >> >> >> Script: >> >> >> >> import simplejson as json >> >> import datetime >> >> import time >> >> from org.apache.nifi.processor.io import StreamCallback >> >> from user_agents import parse >> >> import urllib >> >> import urlparse >> >> >> >> def query_dict(querystring): >> >> if not querystring: >> >> return {} >> >> query = urllib.unquote(querystring).rstrip() >> >> query = query.split('&') >> >> query = [q.split('=') for q in query] >> >> return dict([(q[0], ' '.join(q[1:])) for q in query]) >> >> >> >> def starPassword(route): >> >> parsed = urlparse.urlsplit(route) >> >> if '@' not in parsed.netloc: >> >> return route >> >> >> >> userinfo, _, location = parsed.netloc.partition('@') >> >> username, _, password = userinfo.partition(':') >> >> if not password: >> >> return route >> >> >> >> userinfo = ':'.join([username, '*****']) >> >> netloc = '@'.join([userinfo, location]) >> >> parsed = parsed._replace(netloc=netloc) >> >> return urlparse.urlunsplit(parsed) >> >> >> >> >> >> def num(s): >> >> try: >> >> return int(s) >> >> except ValueError: >> >> try: >> >> return float(s) >> >> except ValueError: >> >> try: >> >> return s >> >> except ValueError: >> >> raise ValueError('argument parsing error') >> >> >> >> >> >> class PyStreamCallback(StreamCallback): >> >> def __init__(self): >> >> pass >> >> >> >> def process(self, inputStream, outputStream): >> >> if flowFile.getAttribute('http.query.string'): >> >> d = query_dict(flowFile.getAttribute('http.query.string')) >> >> obj = {'timestamp': ltime, >> >> 'browser': >> >> >> str(parse(flowFile.getAttribute('http.headers.User-Agent')).browser.family), >> >> 'browser_version': >> >> >> str(parse(flowFile.getAttribute('http.headers.User-Agent')).browser.version_string), >> >> 'os': >> >> str(parse(flowFile.getAttribute('http.headers.User-Agent')).os.family), >> >> 'os_version': >> >> >> str(parse(flowFile.getAttribute('http.headers.User-Agent')).os.version_string), >> >> 'client_ip': >> >> flowFile.getAttribute('http.remote.addr')} >> >> >> >> for key in d: >> >> obj[key.replace(".", "_")] = num(starPassword(d[key])) >> >> outputStream.write(bytearray(json.dumps(obj, >> separators=(',', >> >> ':')))) >> >> else: >> >> pass >> >> >> >> >> >> flowFile = session.get() >> >> if flowFile is not None: >> >> flowFile = session.write(flowFile, PyStreamCallback()) >> >> session.transfer(flowFile, REL_SUCCESS) >> >> >> >> Any help is appreciated. >> >> >> >> -Madhu >> > >
