Madhu,

My testing showed the jython script performance to be poor as well.
Couple of options to tackle this worth trying:
1) write the script such that it handles multiple flowfiles per
process session (basically batching).  This works presuming the
slowness is the setup/teardown of the script execution environment.
2) have multiple instances of this processor running pulling from the
same queue.  Parallelize the processing.
3) might be worth simply coding this up in Java.  Looks like it might
be a straightforward processor so now that you've gotten the precise
logic you want you can turn it into a full nifi processor and you'll
get solid performance.

Thanks
Joe

On Mon, May 2, 2016 at 10:03 AM, Madhukar Thota
<madhukar.th...@gmail.com> wrote:
> Hi
>
> I am using ExecuteScript Processor( using python/jython script pasted below)
> to process http querystring along with useragent parsing. The processor is
> very slow and not able to handle heavy load. Lot of them of getting queued
> and waiting for the processor to process it. How can i improve the
> performance and processing?
>
> Script:
>
> import simplejson as json
> import datetime
> import time
> from org.apache.nifi.processor.io import StreamCallback
> from user_agents import parse
> import urllib
> import urlparse
>
> def query_dict(querystring):
>      if not querystring:
>          return {}
>      query = urllib.unquote(querystring).rstrip()
>      query = query.split('&')
>      query = [q.split('=') for q in query]
>      return dict([(q[0], ' '.join(q[1:])) for q in query])
>
> def starPassword(route):
>     parsed = urlparse.urlsplit(route)
>     if '@' not in parsed.netloc:
>         return route
>
>     userinfo, _, location = parsed.netloc.partition('@')
>     username, _, password = userinfo.partition(':')
>     if not password:
>         return route
>
>     userinfo = ':'.join([username, '*****'])
>     netloc = '@'.join([userinfo, location])
>     parsed = parsed._replace(netloc=netloc)
>     return urlparse.urlunsplit(parsed)
>
>
> def num(s):
>     try:
>         return int(s)
>     except ValueError:
>         try:
>             return float(s)
>         except ValueError:
>             try:
>                 return s
>             except ValueError:
>                 raise ValueError('argument parsing error')
>
>
> class PyStreamCallback(StreamCallback):
>     def __init__(self):
>         pass
>
>     def process(self, inputStream, outputStream):
>         if flowFile.getAttribute('http.query.string'):
>             d = query_dict(flowFile.getAttribute('http.query.string'))
>             obj = {'timestamp': ltime,
>                        'browser':
> str(parse(flowFile.getAttribute('http.headers.User-Agent')).browser.family),
>                        'browser_version':
> str(parse(flowFile.getAttribute('http.headers.User-Agent')).browser.version_string),
>                        'os':
> str(parse(flowFile.getAttribute('http.headers.User-Agent')).os.family),
>                        'os_version':
> str(parse(flowFile.getAttribute('http.headers.User-Agent')).os.version_string),
>                        'client_ip':
> flowFile.getAttribute('http.remote.addr')}
>
>             for key in d:
>                 obj[key.replace(".", "_")] = num(starPassword(d[key]))
>             outputStream.write(bytearray(json.dumps(obj, separators=(',',
> ':'))))
>         else:
>             pass
>
>
> flowFile = session.get()
> if flowFile is not None:
>     flowFile = session.write(flowFile, PyStreamCallback())
>     session.transfer(flowFile, REL_SUCCESS)
>
> Any help is appreciated.
>
> -Madhu

Reply via email to