Madhu,

In addition to Joe's suggestions, currently ExecuteScript only allows
for one task at a time, which is currently a pretty bad bottleneck if
you are dealing with lots of throughput. However I have written up a
Jira [1] for this and issued a PR [2] to fix it, feel free to try that
out and/or review the code.

Another option in the meantime is to use InvokeScriptedProcessor,
you'd just need some boilerplate to fill out the Processor
implementation, there is an example in the unit tests [3].
InvokeScriptedProcessor can be run with multiple concurrent tasks, and
after NIFI-1822 is implemented, ExecuteScript will be too.

[1] https://issues.apache.org/jira/browse/NIFI-1822
[2] https://github.com/apache/nifi/pull/387
[3] 
https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/test_reader.py

However as Joe pointed out, Jython itself will always be fairly slow.
If you don't want to code a full processor in Java you could port your
code to Groovy or Javascript for use in the ExecuteScript /
InvokeScriptedProcessor, they're MUCH faster than Jython.

Regards,
Matt

On Mon, May 2, 2016 at 10:07 AM, Joe Witt <joe.w...@gmail.com> wrote:
> Madhu,
>
> My testing showed the jython script performance to be poor as well.
> Couple of options to tackle this worth trying:
> 1) write the script such that it handles multiple flowfiles per
> process session (basically batching).  This works presuming the
> slowness is the setup/teardown of the script execution environment.
> 2) have multiple instances of this processor running pulling from the
> same queue.  Parallelize the processing.
> 3) might be worth simply coding this up in Java.  Looks like it might
> be a straightforward processor so now that you've gotten the precise
> logic you want you can turn it into a full nifi processor and you'll
> get solid performance.
>
> Thanks
> Joe
>
> On Mon, May 2, 2016 at 10:03 AM, Madhukar Thota
> <madhukar.th...@gmail.com> wrote:
>> Hi
>>
>> I am using ExecuteScript Processor( using python/jython script pasted below)
>> to process http querystring along with useragent parsing. The processor is
>> very slow and not able to handle heavy load. Lot of them of getting queued
>> and waiting for the processor to process it. How can i improve the
>> performance and processing?
>>
>> Script:
>>
>> import simplejson as json
>> import datetime
>> import time
>> from org.apache.nifi.processor.io import StreamCallback
>> from user_agents import parse
>> import urllib
>> import urlparse
>>
>> def query_dict(querystring):
>>      if not querystring:
>>          return {}
>>      query = urllib.unquote(querystring).rstrip()
>>      query = query.split('&')
>>      query = [q.split('=') for q in query]
>>      return dict([(q[0], ' '.join(q[1:])) for q in query])
>>
>> def starPassword(route):
>>     parsed = urlparse.urlsplit(route)
>>     if '@' not in parsed.netloc:
>>         return route
>>
>>     userinfo, _, location = parsed.netloc.partition('@')
>>     username, _, password = userinfo.partition(':')
>>     if not password:
>>         return route
>>
>>     userinfo = ':'.join([username, '*****'])
>>     netloc = '@'.join([userinfo, location])
>>     parsed = parsed._replace(netloc=netloc)
>>     return urlparse.urlunsplit(parsed)
>>
>>
>> def num(s):
>>     try:
>>         return int(s)
>>     except ValueError:
>>         try:
>>             return float(s)
>>         except ValueError:
>>             try:
>>                 return s
>>             except ValueError:
>>                 raise ValueError('argument parsing error')
>>
>>
>> class PyStreamCallback(StreamCallback):
>>     def __init__(self):
>>         pass
>>
>>     def process(self, inputStream, outputStream):
>>         if flowFile.getAttribute('http.query.string'):
>>             d = query_dict(flowFile.getAttribute('http.query.string'))
>>             obj = {'timestamp': ltime,
>>                        'browser':
>> str(parse(flowFile.getAttribute('http.headers.User-Agent')).browser.family),
>>                        'browser_version':
>> str(parse(flowFile.getAttribute('http.headers.User-Agent')).browser.version_string),
>>                        'os':
>> str(parse(flowFile.getAttribute('http.headers.User-Agent')).os.family),
>>                        'os_version':
>> str(parse(flowFile.getAttribute('http.headers.User-Agent')).os.version_string),
>>                        'client_ip':
>> flowFile.getAttribute('http.remote.addr')}
>>
>>             for key in d:
>>                 obj[key.replace(".", "_")] = num(starPassword(d[key]))
>>             outputStream.write(bytearray(json.dumps(obj, separators=(',',
>> ':'))))
>>         else:
>>             pass
>>
>>
>> flowFile = session.get()
>> if flowFile is not None:
>>     flowFile = session.write(flowFile, PyStreamCallback())
>>     session.transfer(flowFile, REL_SUCCESS)
>>
>> Any help is appreciated.
>>
>> -Madhu

Reply via email to