Re: Options for increasing performance?

2017-04-07 Thread Sebastian Lagemann
Jim,

we experienced 2k flowfiles per second on HandleHTTPRequest with 50 threads on 
the processor without issues, the issue was later in processors down the flow 
and primarily related to slow Disk-IO.

Best,

Seb

> Am 06.04.2017 um 12:00 schrieb James McMahon :
> 
> Intriguing. I'm one of those who have employed the "single flowfile" 
> approach. I'm certainly willing to test out this refinement.
> So to press your point, this is more efficient than setting the processor's 
> "Concurrent tasks" to 10 because it assumes the burden of initialization for 
> ExecuteScript once, rather than using the processor configuration parm (which 
> presumably assumes that initialization burden ten times)?
> 
> I currently set "Concurrent tasks" to 50.  The logjam I am seeing is not in 
> my ExecuteScript processor. My delay is definitely a non-steady, "non-fast" 
> stream of data at my HandleHttpRequest processor, the first processor in my 
> workflow. Why that is the case is a mystery we've yet to resolve.
> 
> One thing I'd welcome is some idea of what is a reasonable expectation for 
> requests handled by HandleHttpRequest in an hour? Maybe 1500 in an hour is 
> low, high, or perhaps it is entirely reasonable. We really have little 
> insight. Any empirical data from user practical experience would be most 
> welcome. 
> 
> Also, I added a second HandleHttpRequest fielding requests on a second port. 
> I did not see any level of improved throughput. Why might that be? My 
> expectation was that with two doors open rather than one, I'd see some more 
> influx of data.
> 
> Thank you.
> - Jim
> 
>> On Wed, Apr 5, 2017 at 4:26 PM, Scott Wagner  
>> wrote:
>> One of my experiences is that when using ExecuteScript and Python is that 
>> having an ExecuteScript that works on an individual FlowFile when you have 
>> multiple in the input queue is very inefficient, even when you set it to a 
>> timer of 0  sec.
>> 
>> Instead, I have the following in all of my Python scripts:
>> 
>> flowFiles = session.get(10)
>> for flowFile in flowFiles:
>> if flowFile is None:
>> continue
>> # Do stuff here
>> 
>> That seems to improve the throughput of the ExecuteScript processor 
>> dramatically.
>> 
>> YMMV
>> 
>> - Scott
>>> James McMahon Wednesday, April 5, 2017 12:48 PM
>>> I am receiving POSTs from a Pentaho process, delivering files to my NiFi 
>>> 0.7.x workflow HandleHttpRequest processor. That processor hands the 
>>> flowfile off to an ExecuteScript processor that runs a python script. This 
>>> script is very, very simple: it takes an incoming JSO object and loads it 
>>> into a Python dictionary, and verifies the presence of required fields 
>>> using simple has_key checks on the dictionary. There are only eight fields 
>>> in the incoming JSON object.
>>> 
>>> The throughput for these two processes is not exceeding 100-150 files in 
>>> five minutes. It seems very slow in light of the minimal processing going 
>>> on in these two steps.
>>> 
>>> I notice that there are configuration operations seemingly related to 
>>> optimizing performance. "Concurrent tasks", for example,  is only set by 
>>> default to 1 for each processor.
>>> 
>>> What performance optimizations at the processor level do users recommend? 
>>> Is it advisable to crank up the concurrent tasks for a processor, and is 
>>> there an optimal performance point beyond which you should not crank up 
>>> that value? Are there trade-offs?
>>> 
>>> I am particularly interested in optimizations for HandleHttpRequest and 
>>> ExecuteScript processors.
>>> 
>>> Thanks in advance for your thoughts.
>>> 
>>> cheers,
>>> 
>>> Jim
>> 
> 


Cassandra query and custom types as JSON string instead object

2017-01-06 Thread Sebastian Lagemann
Hey,

we’re using a custom type in a map in a cassandra table. We query the data with 
the QueryCassandra processor and try to store the result in JSON. Unfortunately 
the custom type value (source in our example) is stored as string instead of an 
object as expected.
Is it supposed to work in that way and if so, how can we convert that string to 
a proper JSON object with Nifi?
 
An example looks like the following:

CREATE TYPE "dateIdTuple" (
  "date" TIMESTAMP,
  id TEXT
);
CREATE TABLE users (
  deviceId TEXT,
 source MAP>,
 PRIMARY KEY (deviceId)
);
INSERT INTO users (deviceId, source) VALUES ('a1234', {"batch": {"id": 'a1234', 
"date": '2017-01-06 17:46:35'}})
The JSON contains something like this:
{
  "deviceid": "a1234",
  "source": {
"batch": "{id:'a1234',date:1483726401000}"
  }
}
And it should be like the following:

{
  "deviceid": "a1234",
  "source": {
"batch": {
  "id": "a1234",
  "date": 1483726401000
}
  }
}
Any help would be appreciated ;-).

Thanks,

Seb



how to update nifi to the latest jolt version?

2016-11-10 Thread Sebastian Lagemann
Hello,

I’m trying to use the latest jolt specification, especially the operation 
"modify-overwrite-beta“, introduced with jolt version 0.0.22. It seems that 
currently only version 0.0.21 is used by nifi (see 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml). I 
tried to update to the latest jolt version (from 0.0.21 to 0.0.24) in the 
corresponding pom.xml and packaged the whole package (after I removed one 
failing test in 
nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapperSpec.groovy,
 not the way to go but need some fast results :-)).
Nifi is using the new .nar file but unfortunately the jolt processor is still 
complaining that the jolt specification is invalid (see below).

Does anyone has a hint/idea how to use the latest jolt version, I had already 
in mind of extracting the processor from the nifi standard bar bundle and 
create a new named processor for testing, but I guess there must be an easier 
solution?

The jolt spec I’m using:
[
  {
"operation": "modify-overwrite",
"spec": {
  "lastElementOfType": "=lastElement(@(1,type))"
}
  },
  {
"operation": "shift",
"spec": {
  "value": "@(1,lastElementOfType)",
  "_meta": {
"appId": "appId",
"userId": "userId"
  }
}
  }
]

The data I’m using:
  {
"type":["user","profile","personalInfo", "firstName"],
"value":"testname",
"_meta":{"appId":"test","userId":"56c1614b677c35cc3f28fbd0"}
  }

The expected flow file content afterwards:
{
  "appId" : "test",
  "firstName" : "testname",
  "playerId" : "56c1614b677c35cc3f28fbd0"
}

The jolt demo from http://jolt-demo.appspot.com/#inception 
 delivers the expected results.

Thanks,

Sebastian



Problem with HandleHttpRequest/HandleHttpResponse

2015-10-13 Thread Sebastian Lagemann | iQU
Hi,

We experience currently the problem that we got an „Failed to export 
StandardFlowFileRecord to HttpOutput due to org.eclipse.jetty.io.EofException“ 
exception which blocks incoming HTTP requests and with queued flow files 
between the HandleHttpRequest and HandleHttpResponse connection until we 
restart the corresponding node.  and the event stream in total.

We have the following configuration:
[cid:9E4253D4-B4FD-4F50-922C-743D9B781936]

We get the following exception:

org.apache.nifi.processor.exception.FlowFileAccessException: Failed to export 
StandardFlowFileRecord[uuid=fa00a5b5-7e54-4688-b389-7dcd012607b8,claim=StandardContentClaim
 [resourceClaim=StandardResourceClaim[id=1444734001395-1,
container=default, section=1], offset=0, 
length=147356],offset=0,name=10200723618730079,size=147356] to 
HttpOutput@19fff77e{OPEN} due to org.eclipse.jetty.io.EofException
at 
org.apache.nifi.controller.repository.StandardProcessSession.exportTo(StandardProcessSession.java:2305)
 ~[nifi-framework-core-0.3.0.jar:0.3.0]
at 
org.apache.nifi.processors.standard.HandleHttpResponse.onTrigger(HandleHttpResponse.java:153)
 ~[na:na]
at 
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
 ~[nifi-api-0.3.0.jar:0.3.0]
at 
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1077)
 ~[nifi-framework-core-0.3.0.jar:0.3.0]
at 
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:127)
 [nifi-framework-core-0.3.0.jar:0.3.0]
at 
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:49)
 [nifi-framework-core-0.3.0.jar:0.3.0]
at 
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:119)
 [nifi-framework-core-0.3.0.jar:0.3.0]
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) 
[na:1.7.0_79]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) 
[na:1.7.0_79]
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
 [na:1.7.0_79]
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 [na:1.7.0_79]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
[na:1.7.0_79]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
[na:1.7.0_79]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]
Caused by: org.eclipse.jetty.io.EofException: null
at org.eclipse.jetty.io.ChannelEndPoint.flush(ChannelEndPoint.java:192) 
~[jetty-io-9.2.11.v20150529.jar:9.2.11.v20150529]
at org.eclipse.jetty.io.WriteFlusher.flush(WriteFlusher.java:408) 
~[jetty-io-9.2.11.v20150529.jar:9.2.11.v20150529]
at org.eclipse.jetty.io.WriteFlusher.write(WriteFlusher.java:302) 
~[jetty-io-9.2.11.v20150529.jar:9.2.11.v20150529]
at 
org.eclipse.jetty.io.AbstractEndPoint.write(AbstractEndPoint.java:129) 
~[jetty-io-9.2.11.v20150529.jar:9.2.11.v20150529]
at 
org.eclipse.jetty.server.HttpConnection$SendCallback.process(HttpConnection.java:690)
 ~[jetty-server-9.2.11.v20150529.jar:9.2.11.v20150529]
at 
org.eclipse.jetty.util.IteratingCallback.processing(IteratingCallback.java:246) 
~[jetty-util-9.2.11.v20150529.jar:9.2.11.v20150529]
at 
org.eclipse.jetty.util.IteratingCallback.iterate(IteratingCallback.java:208) 
~[jetty-util-9.2.11.v20150529.jar:9.2.11.v20150529]
at 
org.eclipse.jetty.server.HttpConnection.send(HttpConnection.java:480) 
~[jetty-server-9.2.11.v20150529.jar:9.2.11.v20150529]
at 
org.eclipse.jetty.server.HttpChannel.sendResponse(HttpChannel.java:768) 
~[jetty-server-9.2.11.v20150529.jar:9.2.11.v20150529]
at org.eclipse.jetty.server.HttpChannel.write(HttpChannel.java:801) 
~[jetty-server-9.2.11.v20150529.jar:9.2.11.v20150529]
at org.eclipse.jetty.server.HttpOutput.write(HttpOutput.java:147) 
~[jetty-server-9.2.11.v20150529.jar:9.2.11.v20150529]
at org.eclipse.jetty.server.HttpOutput.write(HttpOutput.java:140) 
~[jetty-server-9.2.11.v20150529.jar:9.2.11.v20150529]
at org.eclipse.jetty.server.HttpOutput.write(HttpOutput.java:355) 
~[jetty-server-9.2.11.v20150529.jar:9.2.11.v20150529]
at org.apache.nifi.stream.io.StreamUtils.copy(StreamUtils.java:36) 
~[nifi-utils-0.3.0.jar:0.3.0]
at 
org.apache.nifi.controller.repository.FileSystemRepository.exportTo(FileSystemRepository.java:752)
 ~[nifi-framework-core-0.3.0.jar:0.3.0]
at 
org.apache.nifi.controller.repository.FileSystemRepository.exportTo(FileSystemRepository.java:766)
 ~[nifi-framework-core-0.3.0.jar:0.3.0]
at 
org.apache.nifi.controller.repository.StandardProcessSession.exportTo(StandardProcessSession.java:2300)