Re: Getting the number of logs
Hello Sai, I'm gonna paraphrase what I think your use-case is first, let me know if this is wrong. You want to keep track of the number of logs coming in and every hour you want to document how many came in in that hour. Currently NiFi doesn't handle this type of "stateful" event processing very well and with what NiFi currently offers you are very limited. That said, I've done some work in order to help NiFi into the "stateful" event processing space that may help you. I currently have an open PR[1] to add state to UpdateAttribute. This allows you keep stateful values (like a count) and even acts as a Stateful Rule Engine (using UpdateAttribute's 'Advanced Tab'). So in order to solve your use-case you can set up one stateful UpdateAttribute along your main flow that counts all your incoming FlowFiles. Then add a GenerateFlowFile processor running on an hourly cron job that is routed to the stateful UpdateAttribute to act as a trigger. When the Stateful UpdateAttribute is triggered it adds the count as an attribute of the triggering flowfile and resets the count. Then just do a RouteOnAttribute after the stateful UpdateAttribute to separate the triggering FlowFile from the incoming data and put it to ElasticSearch. That may not have been the best explanation and if not I can create a template and take screenshots tomorrow if you're interested. One thing to keep in mind though, this stateful processing does have a limitation in this PR in that it will only work with local state. So no tracking counts across a whole cluster, just per node. [1] https://github.com/apache/nifi/pull/319 Joe - - - - - - Joseph Percivalllinkedin.com/in/Percivalle: joeperciv...@yahoo.com On Wednesday, November 9, 2016 11:41 AM, "Peddy, Sai"wrote: Hi All, Previously posted this in the Dev listserv moving it over to the Users listserv I’m currently working on a use case to be able to track the number of individual logs that come in and put that information in ElasticSearch. I wanted to see if there is an easy way to do this and whether anyone had any good ideas? Current approach I am considering: Route the Log Files coming in – to a Split Text & Route Text Processor to make sure no empty logs get through and get the individual log count when files contain multiple logs – At the end of this the total number of logs are visible in the UI queue, where it displays the queueCount, but this information is not readily available to any processor. Current thought process is that I can use the ExecuteScript Processor and update a local file to keep track and insert the document into elastic search hourly. Any advice would be appreciated Thanks, Sai Peddy The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
NPE MergeContent processor
Hi, I saw this error after I upgraded to 1.0.0 but thought it was maybe due to the issues I had with that upgrade (entirely my fault it turns out!), but I have seen it a number of times since so I turned debugging on to get a better stacktrace. Relevant log section as below. Nothing out of the ordinary, and I never saw this in v0.6.1 or below. I would have raised a Jira issue, but after logging in to Jira it only let me create a service desk request (which didn’t sound right). Regards Conrad 2016-11-09 16:43:46,413 DEBUG [Timer-Driven Process Thread-5] o.a.n.processors.standard.MergeContent MergeContent[id=12c0bec7-68b7-3b60-a020-afcc7b4599e7] has chosen to yield its resources; will not be scheduled to run again for 1000 milliseconds 2016-11-09 16:43:46,414 DEBUG [Timer-Driven Process Thread-5] o.a.n.processors.standard.MergeContent MergeContent[id=8db3bb68-0354-3116-96c5-dc80854ef116] Binned 42 FlowFiles 2016-11-09 16:43:46,418 INFO [Timer-Driven Process Thread-5] o.a.n.processors.standard.MergeContent MergeContent[id=8db3bb68-0354-3116-96c5-dc80854ef116] Merged [StandardFlowFileRecord[uuid=5e846136-0a7a-46fb-be96-8200d5cdd33d,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1475059643340-275849, container=default, section=393], offset=567158, length=2337],offset=0,name=17453303363322987,size=2337], StandardFlowFileRecord[uuid=a5f4bd55-82e3-40cb-9fa9-86b9e6816f67,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1475059643340-275849, container=default, section=393], offset=573643, length=2279],offset=0,name=17453303351196175,size=2279], StandardFlowFileRecord[uuid=c1ca745b-660a-49cd-82e5-fa8b9a2f4165,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1475059643340-275849, container=default, section=393], offset=583957, length=2223],offset=0,name=17453303531879367,size=2223], StandardFlowFileRecord[uuid=,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1475059643340-275849, container=default, section=393], offset=595617, length=2356],offset=0,name=,size=2356], StandardFlowFileRecord[uuid=,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1475059643340-275849, container=default, section=393], offset=705637, length=2317],offset=0,name=,size=2317], StandardFlowFileRecord[uuid=,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1475059643340-275849, container=default, section=393], offset=725376, length=2333],offset=0,name=,size=2333], StandardFlowFileRecord[uuid=,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1475059643340-275849, container=default, section=393], offset=728703, length=2377],offset=0,name=,size=2377]] into StandardFlowFileRecord[uuid=1ef3e5a0-f8db-49eb-935d-ed3c991fd631,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1478709819819-416, container=default, section=416], offset=982498, length=4576],offset=0,name=3649103647775837,size=4576] 2016-11-09 16:43:46,418 ERROR [Timer-Driven Process Thread-5] o.a.n.processors.standard.MergeContent MergeContent[id=8db3bb68-0354-3116-96c5-dc80854ef116] MergeContent[id=8db3bb68-0354-3116-96c5-dc80854ef116] failed to process session due to java.lang.NullPointerException: java.lang.NullPointerException 2016-11-09 16:43:46,422 ERROR [Timer-Driven Process Thread-5] o.a.n.processors.standard.MergeContent java.lang.NullPointerException: null at org.apache.nifi.stream.io.DataOutputStream.writeUTF(DataOutputStream.java:300) ~[nifi-utils-1.0.0.jar:1.0.0] at org.apache.nifi.stream.io.DataOutputStream.writeUTF(DataOutputStream.java:281) ~[nifi-utils-1.0.0.jar:1.0.0] at org.apache.nifi.provenance.StandardRecordWriter.writeUUID(StandardRecordWriter.java:257) ~[na:na] at org.apache.nifi.provenance.StandardRecordWriter.writeUUIDs(StandardRecordWriter.java:266) ~[na:na] at org.apache.nifi.provenance.StandardRecordWriter.writeRecord(StandardRecordWriter.java:232) ~[na:na] at org.apache.nifi.provenance.PersistentProvenanceRepository.persistRecord(PersistentProvenanceRepository.java:766) ~[na:na] at org.apache.nifi.provenance.PersistentProvenanceRepository.registerEvents(PersistentProvenanceRepository.java:432) ~[na:na] at org.apache.nifi.controller.repository.StandardProcessSession.updateProvenanceRepo(StandardProcessSession.java:713) ~[nifi-framework-core-1.0.0.jar:1.0.0] at org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:311) ~[nifi-framework-core-1.0.0.jar:1.0.0] at org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:299) ~[nifi-framework-core-1.0.0.jar:1.0.0] at org.apache.nifi.processor.util.bin.BinFiles.processBins(BinFiles.java:256) ~[nifi-processor-utils-1.0.0.jar:1.0.0] at org.apache.nifi.processor.util.bin.BinFiles.onTrigger(BinFiles.java:190)
Getting the number of logs
Hi All, Previously posted this in the Dev listserv moving it over to the Users listserv I’m currently working on a use case to be able to track the number of individual logs that come in and put that information in ElasticSearch. I wanted to see if there is an easy way to do this and whether anyone had any good ideas? Current approach I am considering: Route the Log Files coming in – to a Split Text & Route Text Processor to make sure no empty logs get through and get the individual log count when files contain multiple logs – At the end of this the total number of logs are visible in the UI queue, where it displays the queueCount, but this information is not readily available to any processor. Current thought process is that I can use the ExecuteScript Processor and update a local file to keep track and insert the document into elastic search hourly. Any advice would be appreciated Thanks, Sai Peddy The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Host disconnected due to different workflow configuration
Hello all, I experienced "host out of the cluster which was no longer able to join", log reports configuration workflow has been changed and it's different from the one running into the cluster. Due to this issue there is not way to join again the cluster. To resolve this I stopped the whole cluster and copied the same configuration to every host. After the restart anything worked well. Is here a good way to prevent flow changes when all the host into the cluster are not connected ?
Nifi vs Sqoop
Hi all, I have the following requirements : * I need to load at day 1 a full SQL table, * And then need to incrementally load new data (using capture data change mechanism). Initially, I was thinking using Sqoop to do it. Looking at Nifi and especially the QueryDatabaseTable processor, I'm wondering if I could use Nifi instead. Has someone already compared both to do it and what were the outcomes ? I can't see however how to configure the QueryDatabaseTable to handle the new lines (for example, looking at a "lastmodificationdate" field and taking only the lines for which lastModificationDate > lastRequestDate) ? Thanks in advance BR Nicolas