Thanks Matt and Joe for your input. I will go through your suggestions.

On Mon, May 2, 2016 at 10:16 AM, Matt Burgess <[email protected]> wrote:

> Madhu,
>
> In addition to Joe's suggestions, currently ExecuteScript only allows
> for one task at a time, which is currently a pretty bad bottleneck if
> you are dealing with lots of throughput. However I have written up a
> Jira [1] for this and issued a PR [2] to fix it, feel free to try that
> out and/or review the code.
>
> Another option in the meantime is to use InvokeScriptedProcessor,
> you'd just need some boilerplate to fill out the Processor
> implementation, there is an example in the unit tests [3].
> InvokeScriptedProcessor can be run with multiple concurrent tasks, and
> after NIFI-1822 is implemented, ExecuteScript will be too.
>
> [1] https://issues.apache.org/jira/browse/NIFI-1822
> [2] https://github.com/apache/nifi/pull/387
> [3]
> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython/test_reader.py
>
> However as Joe pointed out, Jython itself will always be fairly slow.
> If you don't want to code a full processor in Java you could port your
> code to Groovy or Javascript for use in the ExecuteScript /
> InvokeScriptedProcessor, they're MUCH faster than Jython.
>
> Regards,
> Matt
>
> On Mon, May 2, 2016 at 10:07 AM, Joe Witt <[email protected]> wrote:
> > Madhu,
> >
> > My testing showed the jython script performance to be poor as well.
> > Couple of options to tackle this worth trying:
> > 1) write the script such that it handles multiple flowfiles per
> > process session (basically batching).  This works presuming the
> > slowness is the setup/teardown of the script execution environment.
> > 2) have multiple instances of this processor running pulling from the
> > same queue.  Parallelize the processing.
> > 3) might be worth simply coding this up in Java.  Looks like it might
> > be a straightforward processor so now that you've gotten the precise
> > logic you want you can turn it into a full nifi processor and you'll
> > get solid performance.
> >
> > Thanks
> > Joe
> >
> > On Mon, May 2, 2016 at 10:03 AM, Madhukar Thota
> > <[email protected]> wrote:
> >> Hi
> >>
> >> I am using ExecuteScript Processor( using python/jython script pasted
> below)
> >> to process http querystring along with useragent parsing. The processor
> is
> >> very slow and not able to handle heavy load. Lot of them of getting
> queued
> >> and waiting for the processor to process it. How can i improve the
> >> performance and processing?
> >>
> >> Script:
> >>
> >> import simplejson as json
> >> import datetime
> >> import time
> >> from org.apache.nifi.processor.io import StreamCallback
> >> from user_agents import parse
> >> import urllib
> >> import urlparse
> >>
> >> def query_dict(querystring):
> >>      if not querystring:
> >>          return {}
> >>      query = urllib.unquote(querystring).rstrip()
> >>      query = query.split('&')
> >>      query = [q.split('=') for q in query]
> >>      return dict([(q[0], ' '.join(q[1:])) for q in query])
> >>
> >> def starPassword(route):
> >>     parsed = urlparse.urlsplit(route)
> >>     if '@' not in parsed.netloc:
> >>         return route
> >>
> >>     userinfo, _, location = parsed.netloc.partition('@')
> >>     username, _, password = userinfo.partition(':')
> >>     if not password:
> >>         return route
> >>
> >>     userinfo = ':'.join([username, '*****'])
> >>     netloc = '@'.join([userinfo, location])
> >>     parsed = parsed._replace(netloc=netloc)
> >>     return urlparse.urlunsplit(parsed)
> >>
> >>
> >> def num(s):
> >>     try:
> >>         return int(s)
> >>     except ValueError:
> >>         try:
> >>             return float(s)
> >>         except ValueError:
> >>             try:
> >>                 return s
> >>             except ValueError:
> >>                 raise ValueError('argument parsing error')
> >>
> >>
> >> class PyStreamCallback(StreamCallback):
> >>     def __init__(self):
> >>         pass
> >>
> >>     def process(self, inputStream, outputStream):
> >>         if flowFile.getAttribute('http.query.string'):
> >>             d = query_dict(flowFile.getAttribute('http.query.string'))
> >>             obj = {'timestamp': ltime,
> >>                        'browser':
> >>
> str(parse(flowFile.getAttribute('http.headers.User-Agent')).browser.family),
> >>                        'browser_version':
> >>
> str(parse(flowFile.getAttribute('http.headers.User-Agent')).browser.version_string),
> >>                        'os':
> >> str(parse(flowFile.getAttribute('http.headers.User-Agent')).os.family),
> >>                        'os_version':
> >>
> str(parse(flowFile.getAttribute('http.headers.User-Agent')).os.version_string),
> >>                        'client_ip':
> >> flowFile.getAttribute('http.remote.addr')}
> >>
> >>             for key in d:
> >>                 obj[key.replace(".", "_")] = num(starPassword(d[key]))
> >>             outputStream.write(bytearray(json.dumps(obj,
> separators=(',',
> >> ':'))))
> >>         else:
> >>             pass
> >>
> >>
> >> flowFile = session.get()
> >> if flowFile is not None:
> >>     flowFile = session.write(flowFile, PyStreamCallback())
> >>     session.transfer(flowFile, REL_SUCCESS)
> >>
> >> Any help is appreciated.
> >>
> >> -Madhu
>

Reply via email to