Just to provide an update. I did rewrite the same logic in lua and used in
executescript processor. The performance is 5-10x faster compare to jython.
very pleased with performance of lua processor.

Next steps:

will checkout https://issues.apache.org/jira/browse/NIFI-1822 to test multiple
concurrent tasks using executescript processor.

-Madhu

On Mon, May 2, 2016 at 11:55 AM, Madhukar Thota <[email protected]>
wrote:

> Thanks Matt and Joe for your input. I will go through your suggestions.
>
> On Mon, May 2, 2016 at 10:16 AM, Matt Burgess <[email protected]> wrote:
>
>> Madhu,
>>
>> In addition to Joe's suggestions, currently ExecuteScript only allows
>> for one task at a time, which is currently a pretty bad bottleneck if
>> you are dealing with lots of throughput. However I have written up a
>> Jira [1] for this and issued a PR [2] to fix it, feel free to try that
>> out and/or review the code.
>>
>> Another option in the meantime is to use InvokeScriptedProcessor,
>> you'd just need some boilerplate to fill out the Processor
>> implementation, there is an example in the unit tests [3].
>> InvokeScriptedProcessor can be run with multiple concurrent tasks, and
>> after NIFI-1822 is implemented, ExecuteScript will be too.
>>
>> [1] https://issues.apache.org/jira/browse/NIFI-1822
>> [2] https://github.com/apache/nifi/pull/387
>> [3]
>> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/test_reader.py
>>
>> However as Joe pointed out, Jython itself will always be fairly slow.
>> If you don't want to code a full processor in Java you could port your
>> code to Groovy or Javascript for use in the ExecuteScript /
>> InvokeScriptedProcessor, they're MUCH faster than Jython.
>>
>> Regards,
>> Matt
>>
>> On Mon, May 2, 2016 at 10:07 AM, Joe Witt <[email protected]> wrote:
>> > Madhu,
>> >
>> > My testing showed the jython script performance to be poor as well.
>> > Couple of options to tackle this worth trying:
>> > 1) write the script such that it handles multiple flowfiles per
>> > process session (basically batching).  This works presuming the
>> > slowness is the setup/teardown of the script execution environment.
>> > 2) have multiple instances of this processor running pulling from the
>> > same queue.  Parallelize the processing.
>> > 3) might be worth simply coding this up in Java.  Looks like it might
>> > be a straightforward processor so now that you've gotten the precise
>> > logic you want you can turn it into a full nifi processor and you'll
>> > get solid performance.
>> >
>> > Thanks
>> > Joe
>> >
>> > On Mon, May 2, 2016 at 10:03 AM, Madhukar Thota
>> > <[email protected]> wrote:
>> >> Hi
>> >>
>> >> I am using ExecuteScript Processor( using python/jython script pasted
>> below)
>> >> to process http querystring along with useragent parsing. The
>> processor is
>> >> very slow and not able to handle heavy load. Lot of them of getting
>> queued
>> >> and waiting for the processor to process it. How can i improve the
>> >> performance and processing?
>> >>
>> >> Script:
>> >>
>> >> import simplejson as json
>> >> import datetime
>> >> import time
>> >> from org.apache.nifi.processor.io import StreamCallback
>> >> from user_agents import parse
>> >> import urllib
>> >> import urlparse
>> >>
>> >> def query_dict(querystring):
>> >>      if not querystring:
>> >>          return {}
>> >>      query = urllib.unquote(querystring).rstrip()
>> >>      query = query.split('&')
>> >>      query = [q.split('=') for q in query]
>> >>      return dict([(q[0], ' '.join(q[1:])) for q in query])
>> >>
>> >> def starPassword(route):
>> >>     parsed = urlparse.urlsplit(route)
>> >>     if '@' not in parsed.netloc:
>> >>         return route
>> >>
>> >>     userinfo, _, location = parsed.netloc.partition('@')
>> >>     username, _, password = userinfo.partition(':')
>> >>     if not password:
>> >>         return route
>> >>
>> >>     userinfo = ':'.join([username, '*****'])
>> >>     netloc = '@'.join([userinfo, location])
>> >>     parsed = parsed._replace(netloc=netloc)
>> >>     return urlparse.urlunsplit(parsed)
>> >>
>> >>
>> >> def num(s):
>> >>     try:
>> >>         return int(s)
>> >>     except ValueError:
>> >>         try:
>> >>             return float(s)
>> >>         except ValueError:
>> >>             try:
>> >>                 return s
>> >>             except ValueError:
>> >>                 raise ValueError('argument parsing error')
>> >>
>> >>
>> >> class PyStreamCallback(StreamCallback):
>> >>     def __init__(self):
>> >>         pass
>> >>
>> >>     def process(self, inputStream, outputStream):
>> >>         if flowFile.getAttribute('http.query.string'):
>> >>             d = query_dict(flowFile.getAttribute('http.query.string'))
>> >>             obj = {'timestamp': ltime,
>> >>                        'browser':
>> >>
>> str(parse(flowFile.getAttribute('http.headers.User-Agent')).browser.family),
>> >>                        'browser_version':
>> >>
>> str(parse(flowFile.getAttribute('http.headers.User-Agent')).browser.version_string),
>> >>                        'os':
>> >> str(parse(flowFile.getAttribute('http.headers.User-Agent')).os.family),
>> >>                        'os_version':
>> >>
>> str(parse(flowFile.getAttribute('http.headers.User-Agent')).os.version_string),
>> >>                        'client_ip':
>> >> flowFile.getAttribute('http.remote.addr')}
>> >>
>> >>             for key in d:
>> >>                 obj[key.replace(".", "_")] = num(starPassword(d[key]))
>> >>             outputStream.write(bytearray(json.dumps(obj,
>> separators=(',',
>> >> ':'))))
>> >>         else:
>> >>             pass
>> >>
>> >>
>> >> flowFile = session.get()
>> >> if flowFile is not None:
>> >>     flowFile = session.write(flowFile, PyStreamCallback())
>> >>     session.transfer(flowFile, REL_SUCCESS)
>> >>
>> >> Any help is appreciated.
>> >>
>> >> -Madhu
>>
>
>

Reply via email to