ListFile not fetching files when the source has future dated files vs local NiFi system time

2024-03-12 Thread Jeremy Pemberton-Pigott
Is there a way to get the ListFile processor to list files which are not
older than max but have a min file age that would be negative if using the
local system time?  Some of the source location files are in the future vs
the NiFi machine and they are not being listed.  I don't want to resort to
'No Tracking' if possible.  I'd like to somehow disable the minimum file
age check so that only the max file age was necessary as a filter.

Jeremy


Re: Apache Nifi Openshift Deployment Error

2023-09-06 Thread Jeremy Pemberton-Pigott
Are you mounting a folder to the host (from each container)? If yes, perhaps 
the permissions/user/group are not correct on the host mounted folder. 

Regards,

Jeremy


On 7 Sep 2023, at 01:34, chris.s.jos...@lmco.com wrote:


Hi,
 
I am new to apache nifi, and I am trying to deploy it to openshift. I am using 
apache/nifi:latest, but getting the following error:
 
File [/opt/nifi/nifi-current/conf/nifi.properties] replacing 
[nifi.web.https.port]
sed: couldn't open temporary file /opt/nifi/nifi-current/conf/sedjjC5qV: 
Permission denied
 
Any help would be greatly appreciated.
Thanks,
Chris Joseph
 
 

Remote Process Group URL from a variable

2023-09-01 Thread Jeremy Pemberton-Pigott
Can the Remote Process Group reference a Variable/Parameter?  I am trying
to deploy flows to different servers whose RPG is not the same fixed
destination but this seems to need a hard coded list of URLs.

Context: I'm using the NiFi Docker image to deploy NiFi with an included
flow on many standalone servers.  Each server has its own RPG destination
so I want to set an environment variable when the Docker image is launched
such that the RPG can reference that for the destination server. This way I
don't have to manually go and change the setting on each server's flow
after it starts up.

Jeremy


Re: Broken pipe: InvokeHTTP

2023-03-15 Thread Jeremy Pemberton-Pigott
You may also want to check if there's something in the middle terminating it for you, ngnix, proxy, OpenShift route, ...Regards,JeremyOn 15 Mar 2023, at 08:07, Patrick Timmins  wrote:
  

  
  
Hello Richard,
Do you have *any* requests from the
InvokeHTTP processor that are actually getting through to the
service running in the docker container?  Can you access the
service that's running in the docker container via another
method (eg: browser or curl) to verify that you have the docker
container configured properly for ingress from outside the
container?
Not knowing anything else, I would guess
this is a docker container networking ingress/egress issue.
Pat


On 3/14/2023 11:26 PM, Richard Beare
  wrote:


  
  I do. I didn't spot anything that looked related
last time, but will check again.
  
  
  
On Wed, Mar 15, 2023 at
  2:00 PM Joe Witt 
  wrote:


  Hello
  
  
  I believe this is the remote service killing
the socket.  Do you have logs for that service to check?
  
  
  Thanks
  

  On Tue, Mar 14, 2023 at
7:54 PM Richard Beare 
wrote:
  
  

  Hi Everyone,
  
  I have an InvokeHTTP processor experiencing the
error below on a small proportion of flowfiles. The
service it is accessing is running on another docker
on the same host, and I've adjusted the frequency of
requests to quite low - the processor is running
single threaded. The settings are pretty standard -
socket connect timeout 5 s, socket read timeout 600
sec, socket idle timeout 5mins, socket idle
connections 5. The processor is on a 1s schedule (I
don't want it to be that slow). My nifi setup is not
clustered.
  
  
  Googling suggests that the error could be caused
by lack of disk space, but that doesn't appear to be
the case (all the nifi storage is on a drive with
plenty of space). 
  
  
  What else should I be looking for? The operation
does take a while to run, but nowhere near
10minutes. I've configured the web service to
support several workers with the intention of
processing many flowfiles quickly, but this error is
limiting what I can do. Sending the unsuccessful
flowfiles to the service using curl does wotk, so
not a problem with the data.
  
  
  Any ideas?
  
  
  2023-03-14 01:48:46,334 ERROR [Timer-Driven Process
  Thread-36] o.a.nifi.processors.standard.InvokeHTTP
  InvokeHTTP[id=cb72d2e0-d5c0-36c1-19b6-13a542a56e60]
  Request Processing failed:
StandardFlowFileRecord[uuid=eff900e7-81ce-4312-abe2-218cb78d3ca1,claim=StandardContentClaim
[resourceClaim=StandardResourceClaim[id=1678758435916-5,
  container=default, section=5], offset=11133732,
length=125544],offset=0,name=eff900e7-81ce-4312-abe2-218cb78d3ca1,size=125544]
  org.apache.nifi.processor.exception.ProcessException:
  IOException thrown from
  InvokeHTTP[id=cb72d2e0-d5c0-36c1-19b6-13a542a56e60]:
  java.net.SocketException: Broken pipe (Write failed)
  at
org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2716)
  at
org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2661)
  at
org.apache.nifi.processors.standard.InvokeHTTP$1.writeTo(InvokeHTTP.java:1170)
  at
okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.kt:59)
  at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109)

  

  

  

  



Re: How do you use container-based NiFi ?

2023-02-09 Thread Jeremy Pemberton-Pigott
Hi Isha,

I agree with what Adam has said.

Our setups are small 4-16 node clusters, we run multiple single large flows
(real-time and batch flows).  For our environment we use OKD (OpenShift)
for container orchestration and to provide the routes/load balancing
through a service to the NiFi cluster.  We bind the conf, work, state,
logs, and 4 repos to local disks on each server.  The yaml files that
launch the containers are parameterized with variables and a customized
nifi start script (built into the NiFi image) that modifies the necessary
properties files so that we can keep the setup dynamic at the initial
cluster creation.  Nothing particularly complex and our clusters are very
compact working in both cloud and on-premise deployments.  The launch
script copies some necessary dynamically changing files to the target
servers, hbase site configs, truststores, and so on.

The hitch with the flow is that it has to be copied out from the
containers/off the running servers back to the launching server, in our
setup, to be reconciled (typically by file date since some containers can
be lost and no longer syncing), this is done when you tear down the NiFi
cluster or after recovering lost nodes.  Incremental changes are kept in
the NiFi Registry and images are updated to keep things synchronized.  Our
image is rebuilt with the latest updated flow only, that provides the
initial flow at launch, while the images are pulled automatically by the
running server on launch of the container (handled by OKD).

We don't dynamically scale the cluster after it is started, the number of
nodes is chosen before launching.  We run the latest 1.19.1 in this
environment and have dozens of standalone NiFi instances pushing large
volumes of data that are collecting from 100s of PCs and servers, that is
large for our use case (100Ks of logs, 10s of GBs/day), containing sensor
data and logs that are transformed and published to Kafka and HBase for
analysis by Spark streaming and Spark batch jobs.  This is our real-time
analytics flow, we have another NiFi cluster running in the same grouping
of servers for large batch processing (pulling data from S3 or other disk
storage systems) to avoid a big backlog on the real-time side of things,
both are pushing data to the same backend services also running in the same
cluster.

We've been scaling the system since v1.1.0 of NiFi and our setup is a fully
automated deployment with the gotchas previously mentioned.

On Thu, Feb 9, 2023 at 6:17 AM Adam Taft  wrote:

> Isha,
>
> Just some perspective from the field. I have had success with
> containerized NiFi and generally get along with it. That being said, I
> think there a few caveats and issues you might find going down this road.
>
> Standalone NiFi in a container works pretty much the way you would want
> and expect. You do need to be careful about where you are mounting your
> NiFi configuration directories, though. e.g. content_repository,
> database_repository, flowfile_repository, provenance_repository, state,
> logs and work. All of these directories are actively written by NiFi and
> it's good to have these exported as bind mounts external from the container.
>
> You will definitely want to bind mount the flow.xml.gz and flow.json.gz
> files as well, or you will lose your live dataflow configuration changes as
> you use NiFi. Any change to your nifi canvas gets written into flow.xml.gz,
> which means you need to keep a copy of it outside of your container. And
> there's potentially other files in the conf folder that you also want to
> keep around. NiFi unfortunately doesn't organize the location of all these
> directories into a single location by default, so you kind of have to
> reconfigure and/or bind mount a lot of different paths.
>
> I have found that NiFi clustering with a dockerized environment to be less
> desirable. Primarily the problem is that the definition of cluster nodes is
> mostly hard coded into the nifi.properties file. Usually in a containerized
> environment, you want the ability to dynamically bring nodes up/down as
> needed (with dynamic IP/network configuration), especially in container
> orchestration frameworks like kubernetes. There's been a lot of experiments
> and possibly even some reasonable solutions coming out to help with
> containerized clusters, but generally you're going to find you have to
> crack your knuckles a little bit to get this to work. If you're content
> with a mostly statically defined non-elastic cluster configuration, then a
> clustered NiFi on docker is possible.
>
> As an option, if you stick with standalone deployments, what you can
> instead do instead is front your individual NiFi node instances with a load
> balancer. This may be a poor-man's approach to load distribution, but it
> works reasonably well and I've seen it in action on large volume flows. If
> you have the option that your data source can deliver to a load balancer,
> then you can have the load balancer 

Re: Unable to start nifi service

2022-12-10 Thread Jeremy Pemberton-Pigott
Instead of the bootstrap log check the nifi-app.log. It might be out of memory 
or have the port it needs already in use. 

Regards,

Jeremy


On 11 Dec 2022, at 11:01, James McMahon  wrote:


I am trying to start nifi on an AWS EC2 instance. My bootstrap.conf says the 
service does not start, but I see no indication why. I am trying to start the 
nifi service using

sudo ../bin/nifi.sh start

How can I debug this? Here is the nifi-bootstrap log:

2022-12-11 02:54:02,932 INFO [main] org.apache.nifi.bootstrap.Command Command: 
java -classpath 
/opt/nifi/releases/nifi-1.18.0/./conf:/opt/nifi/releases/nifi-1.18.0/./lib/javax.servlet-api-3.1.0.jar:/
opt/nifi/releases/nifi-1.18.0/./lib/jetty-schemas-5.2.jar:/opt/nifi/releases/nifi-1.18.0/./lib/logback-classic-1.2.11.jar:/opt/nifi/releases/nifi-1.18.0/./lib/logback-core-1.2.11.jar:/opt/nifi/releas
es/nifi-1.18.0/./lib/jcl-over-slf4j-1.7.36.jar:/opt/nifi/releases/nifi-1.18.0/./lib/jul-to-slf4j-1.7.36.jar:/opt/nifi/releases/nifi-1.18.0/./lib/log4j-over-slf4j-1.7.36.jar:/opt/nifi/releases/nifi-1.
18.0/./lib/nifi-api-1.18.0.jar:/opt/nifi/releases/nifi-1.18.0/./lib/nifi-framework-api-1.18.0.jar:/opt/nifi/releases/nifi-1.18.0/./lib/nifi-server-api-1.18.0.jar:/opt/nifi/releases/nifi-1.18.0/./lib/
nifi-runtime-1.18.0.jar:/opt/nifi/releases/nifi-1.18.0/./lib/nifi-nar-utils-1.18.0.jar:/opt/nifi/releases/nifi-1.18.0/./lib/nifi-properties-1.18.0.jar:/opt/nifi/releases/nifi-1.18.0/./lib/nifi-proper
ty-utils-1.18.0.jar:/opt/nifi/releases/nifi-1.18.0/./lib/nifi-stateless-bootstrap-1.18.0.jar:/opt/nifi/releases/nifi-1.18.0/./lib/nifi-stateless-api-1.18.0.jar:/opt/nifi/releases/nifi-1.18.0/./lib/sl
f4j-api-1.7.36.jar:/opt/nifi/releases/nifi-1.18.0/./lib/java11/jakarta.activation-api-1.2.2.jar:/opt/nifi/releases/nifi-1.18.0/./lib/java11/jakarta.activation-1.2.2.jar:/opt/nifi/releases/nifi-1.18.0
/./lib/java11/jakarta.xml.bind-api-2.3.3.jar:/opt/nifi/releases/nifi-1.18.0/./lib/java11/jaxb-runtime-2.3.5.jar:/opt/nifi/releases/nifi-1.18.0/./lib/java11/txw2-2.3.5.jar:/opt/nifi/releases/nifi-1.18
.0/./lib/java11/istack-commons-runtime-3.0.12.jar:/opt/nifi/releases/nifi-1.18.0/./lib/java11/javax.annotation-api-1.3.2.jar
 -Dorg.apache.jasper.compiler.disablejsr199=true -Xmx512m -Xms512m -Dcurato
r-log-only-first-connection-issue-as-error-level=true 
-Djavax.security.auth.useSubjectCredsOnly=true 
-Djava.security.egd=file:/dev/urandom -Dzookeeper.admin.enableServer=false 
-Dsun.net.http.allowRes
trictedHeaders=true -Djava.net.preferIPv4Stack=true -Djava.awt.headless=true 
-Djava.protocol.handler.pkgs=sun.net.www.protocol 
-Dnifi.properties.file.path=/opt/nifi/releases/nifi-1.18.0/./conf/nifi.p
roperties -Dnifi.bootstrap.listen.port=40671 -Dapp=NiFi 
-Dorg.apache.nifi.bootstrap.config.log.dir=/opt/nifi/releases/nifi-1.18.0/logs 
org.apache.nifi.NiFi
2022-12-11 02:54:02,976 INFO [main] org.apache.nifi.bootstrap.Command Launched 
Apache NiFi with Process ID 26353
2022-12-11 02:54:03,979 INFO [main] org.apache.nifi.bootstrap.RunNiFi NiFi 
never started. Will not restart NiFi
2022-12-11 02:54:10,317 INFO [main] o.a.n.b.NotificationServiceManager 
Successfully loaded the following 0 services: []
2022-12-11 02:54:10,320 INFO [main] org.apache.nifi.bootstrap.RunNiFi 
Registered no Notification Services for Notification Type NIFI_STARTED
2022-12-11 02:54:10,320 INFO [main] org.apache.nifi.bootstrap.RunNiFi 
Registered no Notification Services for Notification Type NIFI_STOPPED
2022-12-11 02:54:10,320 INFO [main] org.apache.nifi.bootstrap.RunNiFi 
Registered no Notification Services for Notification Type NIFI_DIED
2022-12-11 02:54:10,337 INFO [main] org.apache.nifi.bootstrap.Command Apache 
NiFi is not running

ODBC with unixODBC in Linux

2022-11-22 Thread Jeremy Pemberton-Pigott
Hi, has anyone had success using unixODBC to make an ODBC connection to a
DB that supports this mode of connection in Linux (CentOS/Debian).  We are
trying to connect NiFi to Kx's kdb+ and this is the initial feedback that
they gave to connect to the time series database.  Or has anyone connected
to kdb from NiFi on Linux?

Reference page: https://code.kx.com/q/interfaces/q-client-for-odbc/

Jeremy


Re: MiNiFi C2 server 0.5.0 bug or not?

2021-09-21 Thread Jeremy Pemberton-Pigott
Hi Tom,

I think so, I sent a similar email recently to the list about this.  And
raised the issue here: https://issues.apache.org/jira/browse/NIFI-9075

Jeremy

On Tue, Sep 21, 2021 at 8:10 PM Tomislav Novosel <
tomislav.novo...@clearpeaks.com> wrote:

> Hi to all,
>
>
>
> I am using MiNiFi C2 server version 0.5.0 and MiNiFi version 1.14.0.
>
> C2 server is configured to pull templates from localhost NiFi installation
>
> and MiNiFi is configured with configuration change ingestor to pull config
> from C2 server.
>
>
>
> After I created MiNiFi flow on NiFi canvas including Remote Process Group
> pointed to the same
>
> NiFi installation (IP and port configured and the flow tested on
> localhost, flowfiles are coming
>
> on input port) I saved the flow as template.
>
>
>
> The template has the following ID's for the input port:
>
>
>
> 
>
> 
>
> 
>
>
> 1
>
> true
>
> true
>
> f43fd60b-017b-1000-ad38-01d2ac321927
>
> *b4fbf2a8-46d0-3ac7-b833-21533794f7a7*
>
> DataFromSensors
>
>
> *f43dd13c-017b-1000-df9a-acf01964ae4f*
>
>
>
> When C2 server pulls the template, it apparently sets wrong ID in
> config.yml file, instead of putting input port ,
>
> it sets  into config.yml file, causing MiNiFi agent is unable to
> accept that configuration and denies it, it is simply wrong.
>
>
>
> This is the Exception from MiNiFi log:
>
>
>
> Caused by:
> org.apache.nifi.minifi.bootstrap.exception.InvalidConfigurationException:
> Failed to transform config file due to:[Connection with id
> 8f6c9b69-5b31-313e-- has invalid destination id
> b4fbf2a8-46d0-3ac7-b833-21533794f7a7]
>
>at
> org.apache.nifi.minifi.bootstrap.util.ConfigTransformer.throwIfInvalid(ConfigTransformer.java:131)
>
>at
> org.apache.nifi.minifi.bootstrap.util.ConfigTransformer.transformConfigFile(ConfigTransformer.java:94)
>
>at
> org.apache.nifi.minifi.bootstrap.RunMiNiFi.performTransformation(RunMiNiFi.java:1693)
>
>
>
> To fix it, I need to go  to cached config file in ./cache folder of C2
> server, and set the right ID of the input port manually.
>
>
>
> *NOTE: I tried to convert template file manually using minifi tookit
> 1.14.0 and it sets correct  of the input port.*
>
>
>
> Is this a bug in C2 server or behaviour with purpose? I found the same
> case for minifi toolkit, but apparently it was fixed:
>
> https://stackoverflow.com/questions/59214373/failing-in-minifi-tutorial-toolkit-error-connection-with-id-has-invalid-de
>
>
>
> Thanks,
>
> Tom
>


Re: Minifi 1.14.0 exception - sensitive props key

2021-09-20 Thread Jeremy Pemberton-Pigott
Hi Tom,

I recall that I used the NiFi 1.14.0 binary to set the key and then copied
that over to the MiNiFi properties file to get it to work.

Linux only:
https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#updating-the-sensitive-properties-key
./bin/nifi.sh set-sensitive-properties-key 

Jeremy

On Mon, Sep 20, 2021 at 9:14 PM Tomislav Novosel <
tomislav.novo...@clearpeaks.com> wrote:

> According to migration guidance for NiFi property 'Sensitive Properties
> Key' should be generated at startup, I believe this is the same behaviour
> in case of MiNiFi since they merged the codebase. Why is this happening?
>
>
>
> BR,
>
> Tom
>
> *From:* Tomislav Novosel 
> *Sent:* 20 September 2021 11:05
> *To:* users@nifi.apache.org
> *Subject:* Minifi 1.14.0 exception - sensitive props key
>
>
>
> Hi to all,
>
>
>
> I was using MiNiFi 0.5.0 running on ubuntu, installed as a service.
>
> I switched now to MiNiFi 1.14.0 – I disabled minifi service, deleted
>
> installation folder and unpacked MiNiFi 1.14.0 folder at the same place
>
> where was 0.5.0 installed.
>
>
>
> I started new MiNiFi 1.14.0 and the service don't want to start properly
>
> with this exception:
>
>
>
> java.lang.Exception: Unable to load flow due to:
> java.lang.IllegalArgumentException: NiFi Sensitive Properties Key
> [nifi.sensitive.props.key] is required
>
>   at
> org.apache.nifi.headless.HeadlessNiFiServer.start(HeadlessNiFiServer.java:166)
>
>   at org.apache.nifi.minifi.MiNiFi.(MiNiFi.java:163)
>
>   at org.apache.nifi.minifi.MiNiFi.(MiNiFi.java:64)
>
>   at org.apache.nifi.minifi.MiNiFi.main(MiNiFi.java:265)
>
> Caused by: java.lang.IllegalArgumentException: NiFi Sensitive Properties
> Key [nifi.sensitive.props.key] is required
>
>   at
> org.apache.nifi.encrypt.PropertyEncryptorFactory.getPropertyEncryptor(PropertyEncryptorFactory.java:42)
>
>   at
> org.apache.nifi.headless.HeadlessNiFiServer.start(HeadlessNiFiServer.java:125)
>
>   ... 3 common frames omitted
>
>
>
> I tried to set the key manually, but the property in nifi.properties gets
> overwritten
>
> every time I am starting the service.
>
>
>
> Could not find anything about this exception, can someone help, why is
> this happening?
>
>
>
> Thanks, regards,
>
> Tom
>


Problem with 0.5.0 C2 server converting template in NiFi 1.12.1 for 1.14.0 MiNiFi

2021-08-22 Thread Jeremy Pemberton-Pigott
Hi, I'm trying to learn the MiNiFi + C2 NiFiRestConfigurationProvider
setup.  If I download my template and use the mini-toolkit-1.14.0 to
convert the template there is no issue with the remote process group target
port id so the template should be good.  But querying the C2 server config
api produces the incorrect yaml file ids, fails the toolkit validation,
also given as an error in the MiNiFi log.  Am I missing something?  From
the good vs bad yaml I can see that it picked up the port id rather than
the port target id when using the C2 server transformation.

Error from MiNiFi:
2021-08-22 23:46:38,452 ERROR [pool-3-thread-1]
o.apache.nifi.minifi.bootstrap.RunMiNiFi Unable to carry out reloading of
configuration on receipt of notification event
java.io.IOException: Unable to successfully transform the provided
configuration
Caused by:
org.apache.nifi.minifi.bootstrap.exception.InvalidConfigurationException:
Failed to transform config file due to:[Connection with id
caaf52c3-c29f-34d4-- has invalid destination id
5c035c8f-c4d4-3d9b-5a40-210870fced8c]

NiFi input port XML:




1
true
true
4cbe2f91-6d36-3327-3f49-6d7cc6b08340
5c035c8f-c4d4-3d9b-5a40-210870fced8c
MiNiFi Raw Data

2f5a4d17-0175-1000--50346b1e
true
false
false


Bad yaml:
- id: caaf52c3-c29f-34d4--
  name: Set Version Headers/success/5c035c8f-c4d4-3d9b-5a40-210870fced8c
  source id: 9b2797ba-c156-3c13--
  source relationship names:
  - success
  destination id: 5c035c8f-c4d4-3d9b-5a40-210870fced8c

Good yaml:
- id: caaf52c3-c29f-34d4--
  name: Set Version Headers/success/2f5a4d17-0175-1000--50346b1e
  source id: 9b2797ba-c156-3c13--
  source relationship names:
  - success
  destination id: 2f5a4d17-0175-1000--50346b1e

Jeremy


Re: Release from holding queue after timeout

2021-04-14 Thread Jeremy Pemberton-Pigott
Thanks Paul, that was an excellent idea and the line is added to the Groovy
code that was already in the flow.  Works now :)

Jeremy

On Thu, Apr 15, 2021 at 12:44 AM Paul Kelly  wrote:

> Whatever processor is putting it into the queue could set a penalty on
> failed flow files that enter your retry queue.  It's one of the properties
> on the General tab.
>
> If you can't set it there for whatever reason, you could put an
> ExecuteScript processor within the retry loop with the following Groovy
> code in it, and set that ExecuteScript's penalty duration to 20s:
>
> flowFile = session.get()
> if(!flowFile) return
> session.penalize(flowFile)
> session.transfer(flowFile, REL_SUCCESS)
>
> Paul
>
> On Wed, Apr 14, 2021 at 4:22 PM Jeremy Pemberton-Pigott <
> fuzzych...@gmail.com> wrote:
>
>> Yes it can. Is that an update attribute which can set that on the flow
>> file?
>>
>> Regards,
>>
>> Jeremy
>>
>>
>> On 15 Apr 2021, at 00:16, Paul Kelly  wrote:
>>
>> 
>> Are you able to penalize the flow file for 20s within the update retry
>> queue?  Penalizing without yielding would cause a flow file to sit in the
>> queue for 20s before the next processor will try acting on it.
>>
>> Paul
>>
>> On Wed, Apr 14, 2021 at 5:05 AM Jeremy Pemberton-Pigott <
>> fuzzych...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I have a parallel update process in 2 different flows running in a 3
>>> node cluster running 1.6.0, if the insert side has not completed yet the
>>> update side moves the flow file to a waiting queue for retry.  I want to
>>> retry every 20s all the flow files in the queue that have been waiting that
>>> long.  There may be 10,000s waiting so I don't want to do 1 flow file every
>>> 20s.  Any idea how I can achieve this?  I don't think that I can use the
>>> yield mechanism because after n retries it goes into a notification logging
>>> flow.  And wait/notify won't work because the notification may appear
>>> before the update flow file arrives.
>>>
>>> Jeremy
>>>
>>


Re: Release from holding queue after timeout

2021-04-14 Thread Jeremy Pemberton-Pigott
Yes it can. Is that an update attribute which can set that on the flow file?

Regards,

Jeremy


On 15 Apr 2021, at 00:16, Paul Kelly  wrote:


Are you able to penalize the flow file for 20s within the update retry queue?  
Penalizing without yielding would cause a flow file to sit in the queue for 20s 
before the next processor will try acting on it.

Paul

On Wed, Apr 14, 2021 at 5:05 AM Jeremy Pemberton-Pigott  
wrote:
> Hi,
> 
> I have a parallel update process in 2 different flows running in a 3 node 
> cluster running 1.6.0, if the insert side has not completed yet the update 
> side moves the flow file to a waiting queue for retry.  I want to retry every 
> 20s all the flow files in the queue that have been waiting that long.  There 
> may be 10,000s waiting so I don't want to do 1 flow file every 20s.  Any idea 
> how I can achieve this?  I don't think that I can use the yield mechanism 
> because after n retries it goes into a notification logging flow.  And 
> wait/notify won't work because the notification may appear before the update 
> flow file arrives.
> 
> Jeremy


Release from holding queue after timeout

2021-04-13 Thread Jeremy Pemberton-Pigott
Hi,

I have a parallel update process in 2 different flows running in a 3 node
cluster running 1.6.0, if the insert side has not completed yet the update
side moves the flow file to a waiting queue for retry.  I want to retry
every 20s all the flow files in the queue that have been waiting that
long.  There may be 10,000s waiting so I don't want to do 1 flow file every
20s.  Any idea how I can achieve this?  I don't think that I can use the
yield mechanism because after n retries it goes into a notification logging
flow.  And wait/notify won't work because the notification may appear
before the update flow file arrives.

Jeremy


Re: Detect duplicate record reader

2021-02-15 Thread Jeremy Pemberton-Pigott
Thanks for the replies guys.  Yes, NIFI-6047 is pretty much exactly what
I'm looking for.

Jeremy

On Mon, Feb 15, 2021 at 2:37 PM Chris Sampson 
wrote:

> NIFI-6047 [1] is possibly what you're after, but that won't help you just
> now because it appears to remain unfinished.
>
>
> [1] https://issues.apache.org/jira/browse/NIFI-6047
>
> Cheers,
>
> Chris Sampson
>
> On Mon, 15 Feb 2021, 06:27 Jorge Machado,  wrote:
>
>> Hey Jeremy,
>>
>> Something linke this
>> https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.5.0/org.apache.nifi.processors.standard.DetectDuplicate/index.html
>>  ?
>>
>>
>> On 15. Feb 2021, at 04:45, Jeremy Pemberton-Pigott 
>> wrote:
>>
>> Hi everyone, I'm wondering if there is a Detect Duplicate processor that
>> can read records from a flow file and as output gives just the
>> non-duplicates (can be single records or a group of non-duplicates would be
>> better).  I want to use a record reader to avoid splitting the json content
>> into 1s of flow files to detect the duplicates.  Immediately after this
>> flow is a record reader/writer going to HBase.
>>
>> Jeremy
>>
>>
>>


Detect duplicate record reader

2021-02-14 Thread Jeremy Pemberton-Pigott
Hi everyone, I'm wondering if there is a Detect Duplicate processor that
can read records from a flow file and as output gives just the
non-duplicates (can be single records or a group of non-duplicates would be
better).  I want to use a record reader to avoid splitting the json content
into 1s of flow files to detect the duplicates.  Immediately after this
flow is a record reader/writer going to HBase.

Jeremy


Re: TLS ConsumeMQTT processor causes NullPointerException exception

2020-10-07 Thread Jeremy Pemberton-Pigott
Created https://issues.apache.org/jira/browse/NIFI-7895

Jeremy

On Mon, Sep 28, 2020 at 11:17 PM Bryan Bende  wrote:

> Seems like a bug in AbstractMQTTProcessor.transformSSLContextService
> which assumes there will always be a keystore provided by the
> SSLContextService, which is not true.
>
> Can you create a jira for this?
>
> On Mon, Sep 28, 2020 at 10:43 AM Jeremy Pemberton-Pigott
>  wrote:
> >
> > Hi,
> >
> > I'm trying to set up an MQTT TLS consumer in 1.12.0 with ConsumeMQTT.  I
> am initiating the connection to a remote EMQX MQTT TLS broker.  If I
> configure the StandardRestrictedSSLContextService for a JKS truststore only
> (no keystore settings since it isn't required) and start the ConsumeMQTT I
> get the below stack trace in the NiFi logs and the processor fails to
> start.  Unless I fill in the details for a keystore it won't work, so if
> both are filled in then it connects fine.  Is this expected or a bug?
> >
> > I was expecting not to even have to set up the SSL context service
> because I don't need to locally verify the remote server certificates
> (InvokeHTTP works this way), I also don't want to maintain them, so I
> thought that I shouldn't even need a local truststore file but the
> ConsumeMQTT processor with a broker URI of ssl://x:38883 requires me to set
> it up.
> >
> > Processor error:
> > ConsumeMQTT[id=d4f3133f-0174-1000-2d37-2fbf855ebf83] Failed to properly
> initialize Processor. If still scheduled to run, NiFi will attempt to
> initialize and run the Processor again after the 'Administrative Yield
> Duration' has elapsed. Failure is due to java.lang.NullPointerException:
> java.lang.NullPointerException
> >
> > Log:
> > 2020-09-28 21:51:00,665 ERROR [Timer-Driven Process Thread-1]
> o.a.nifi.processors.mqtt.ConsumeMQTT
> ConsumeMQTT[id=d4f3133f-0174-1000-2d37-2fbf855ebf83] Failed to properly
> initialize Processor. If still scheduled to run, NiFi will attempt to
> initialize and run the Processor again after the 'Administrative Yield
> Duration' has elapsed. Failure is due to java.lang.NullPointerException:
> java.lang.NullPointerException
> > java.lang.NullPointerException: null
> > at java.util.Hashtable.put(Hashtable.java:459)
> > at java.util.Properties.setProperty(Properties.java:166)
> > at
> org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor.transformSSLContextService(AbstractMQTTProcessor.java:292)
> > at
> org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor.onScheduled(AbstractMQTTProcessor.java:317)
> > at
> org.apache.nifi.processors.mqtt.ConsumeMQTT.onScheduled(ConsumeMQTT.java:222)
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > at java.lang.reflect.Method.invoke(Method.java:498)
> > at
> org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:142)
> > at
> org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:130)
> > at
> org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:75)
> > at
> org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotation(ReflectionUtils.java:52)
> > at
> org.apache.nifi.controller.StandardProcessorNode.lambda$initiateStart$4(StandardProcessorNode.java:1526)
> > at org.apache.nifi.engine.FlowEngine$3.call(FlowEngine.java:123)
> > at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> > at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> > at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> > at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> > at java.lang.Thread.run(Thread.java:748)
> > 2020-09-28 21:51:00,669 ERROR [Timer-Driven Process Thread-1]
> org.apache.nifi.util.ReflectionUtils Failed while invoking annotated method
> 'public void
> org.apache.nifi.processors.mqtt.ConsumeMQTT.onUnscheduled(org.apache.nifi.processor.ProcessContext)'
> with arguments '[org.apache.nifi.processor.StandardProcessContext@64f9d3ca
> ]'.
> > java.lang.reflect.InvocationTargetException: null
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMeth

TLS ConsumeMQTT processor causes NullPointerException exception

2020-09-28 Thread Jeremy Pemberton-Pigott
Hi,

I'm trying to set up an MQTT TLS consumer in 1.12.0 with ConsumeMQTT.  I am
initiating the connection to a remote EMQX MQTT TLS broker.  If I configure
the StandardRestrictedSSLContextService for a JKS truststore only (no
keystore settings since it isn't required) and start the ConsumeMQTT I get
the below stack trace in the NiFi logs and the processor fails to start.
Unless I fill in the details for a keystore it won't work, so if both are
filled in then it connects fine.  Is this expected or a bug?

I was expecting not to even have to set up the SSL context service because
I don't need to locally verify the remote server certificates (InvokeHTTP
works this way), I also don't want to maintain them, so I thought that I
shouldn't even need a local truststore file but the ConsumeMQTT processor
with a broker URI of ssl://x:38883 requires me to set it up.

Processor error:
ConsumeMQTT[id=d4f3133f-0174-1000-2d37-2fbf855ebf83] Failed to properly
initialize Processor. If still scheduled to run, NiFi will attempt to
initialize and run the Processor again after the 'Administrative Yield
Duration' has elapsed. Failure is due to java.lang.NullPointerException:
java.lang.NullPointerException

Log:
2020-09-28 21:51:00,665 ERROR [Timer-Driven Process Thread-1]
o.a.nifi.processors.mqtt.ConsumeMQTT
ConsumeMQTT[id=d4f3133f-0174-1000-2d37-2fbf855ebf83] Failed to properly
initialize Processor. If still scheduled to run, NiFi will attempt to
initialize and run the Processor again after the 'Administrative Yield
Duration' has elapsed. Failure is due to java.lang.NullPointerException:
java.lang.NullPointerException
java.lang.NullPointerException: null
at java.util.Hashtable.put(Hashtable.java:459)
at java.util.Properties.setProperty(Properties.java:166)
at
org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor.transformSSLContextService(AbstractMQTTProcessor.java:292)
at
org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor.onScheduled(AbstractMQTTProcessor.java:317)
at
org.apache.nifi.processors.mqtt.ConsumeMQTT.onScheduled(ConsumeMQTT.java:222)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:142)
at
org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:130)
at
org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:75)
at
org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotation(ReflectionUtils.java:52)
at
org.apache.nifi.controller.StandardProcessorNode.lambda$initiateStart$4(StandardProcessorNode.java:1526)
at org.apache.nifi.engine.FlowEngine$3.call(FlowEngine.java:123)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2020-09-28 21:51:00,669 ERROR [Timer-Driven Process Thread-1]
org.apache.nifi.util.ReflectionUtils Failed while invoking annotated method
'public void
org.apache.nifi.processors.mqtt.ConsumeMQTT.onUnscheduled(org.apache.nifi.processor.ProcessContext)'
with arguments '[org.apache.nifi.processor.StandardProcessContext@64f9d3ca
]'.
java.lang.reflect.InvocationTargetException: null
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:142)
at
org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:130)
at
org.apache.nifi.util.ReflectionUtils.quietlyInvokeMethodsWithAnnotations(ReflectionUtils.java:268)
at
org.apache.nifi.util.ReflectionUtils.quietlyInvokeMethodsWithAnnotation(ReflectionUtils.java:90)
at
org.apache.nifi.controller.StandardProcessorNode.lambda$initiateStart$4(StandardProcessorNode.java:1569)
at org.apache.nifi.engine.FlowEngine$3.call(FlowEngine.java:123)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at

Calling DLL functions on a Windows PC

2020-08-25 Thread Jeremy Pemberton-Pigott
Hi,

Is there a processor that allows me to make functions in a local DLL?
Jython or maybe Python script perhaps or I would have to write a custom jar
with JNI calls?  NodeJS Addons has the ability to do it and it works, now I
want to make the same calls from a NiFi flow without NodeJS.

Jeremy


Re: MiNiFi version

2020-07-23 Thread Jeremy Pemberton-Pigott
Thanks for the links, good read.

I implemented the C++ version of MiNiFi and got it all working, but found
out that the ExecuteStreamCommand processor doesn't exist.  Do I have to
write my own version into the C++ MiNiFi or is there another way to run OS
commands?  MiNiFi is running on a Windows machine btw.

Jeremy

On Wed, Jul 22, 2020 at 8:45 PM Pierre Villard 
wrote:

> I don't recall the specific details but when I used MQTT processors [1]
> with latest version of MiNiFi Java, I *think* I could not use the latest
> version of NiFi NARs, I had to use code from a previous NiFi version and
> cherry-pick some code (open pull request) I needed for my use case [2].
> Having said that, I'm not 100% sure - I might be mixing things up with an
> even older version of MiNiFi. If there is an issue, it should not be too
> hard to solve though, and happy to give pointers here.
>
> [1]
> https://medium.com/google-cloud/running-visual-quality-inspection-at-the-edge-with-google-cloud-and-apache-nifi-minifi-45282ce7af2d
> [2] https://github.com/apache/nifi/pull/3392
>
> Le mar. 21 juil. 2020 à 19:35, Joe Witt  a écrit :
>
>> ...right nars should generally port well.
>>
>> On Tue, Jul 21, 2020 at 10:25 AM Jeremy Pemberton-Pigott <
>> fuzzych...@gmail.com> wrote:
>>
>>> Thanks Joe, that's really helpful, I have an immediate use for the Java
>>> version.  If I wanted to use the ConsumeMQTT processor from say 1.11.4, so
>>> that I can get the UUID Client ID feature, is that easy to do by just
>>> moving the jar over or something else has to be done or I'm limited to a
>>> specific NiFi version?
>>>
>>> Jeremy
>>>
>>> On Wed, Jul 22, 2020 at 12:52 AM Joe Witt  wrote:
>>>
>>>> Jeremy
>>>>
>>>> MiNiFi Java is basically a headless version of NiFi with some added
>>>> configuration ease of use items appropriate to that model.  It has proven
>>>> too hard to slice time for that versus NiFi so I think what will end up
>>>> happening is that minifi java as a standalone source repository will just
>>>> fade out and instead we'll make 'MiNiFi Java' an assembly/convenience
>>>> binary that builds from the same source repository as NiFi itself.  Matt
>>>> Burgess is doing a lot of work here in this regard.
>>>>
>>>> So in short - yeah this is still very actively developed.  Just that
>>>> 'where it is done' is shifting back into the mothership of NiFi itself so
>>>> to speak.
>>>>
>>>> MiNiFi CPP is another one to look at depending on your needs/scenario.
>>>> Also under very active development.
>>>>
>>>> Thanks
>>>>
>>>> On Tue, Jul 21, 2020 at 9:39 AM Jeremy Pemberton-Pigott <
>>>> fuzzych...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Can someone help out with the version of NiFi that MiNiFi tracks?  The
>>>>> Java version seems to be at 0.5 for a long time while the C++ version is
>>>>> 0.7.  I'm also wondering if MiNiFi is actively developed or is it risky to
>>>>> include it in new projects?
>>>>>
>>>>> Jeremy
>>>>>
>>>>


Re: MiNiFi version

2020-07-21 Thread Jeremy Pemberton-Pigott
Thanks Joe, that's really helpful, I have an immediate use for the Java
version.  If I wanted to use the ConsumeMQTT processor from say 1.11.4, so
that I can get the UUID Client ID feature, is that easy to do by just
moving the jar over or something else has to be done or I'm limited to a
specific NiFi version?

Jeremy

On Wed, Jul 22, 2020 at 12:52 AM Joe Witt  wrote:

> Jeremy
>
> MiNiFi Java is basically a headless version of NiFi with some added
> configuration ease of use items appropriate to that model.  It has proven
> too hard to slice time for that versus NiFi so I think what will end up
> happening is that minifi java as a standalone source repository will just
> fade out and instead we'll make 'MiNiFi Java' an assembly/convenience
> binary that builds from the same source repository as NiFi itself.  Matt
> Burgess is doing a lot of work here in this regard.
>
> So in short - yeah this is still very actively developed.  Just that
> 'where it is done' is shifting back into the mothership of NiFi itself so
> to speak.
>
> MiNiFi CPP is another one to look at depending on your needs/scenario.
> Also under very active development.
>
> Thanks
>
> On Tue, Jul 21, 2020 at 9:39 AM Jeremy Pemberton-Pigott <
> fuzzych...@gmail.com> wrote:
>
>> Hi,
>>
>> Can someone help out with the version of NiFi that MiNiFi tracks?  The
>> Java version seems to be at 0.5 for a long time while the C++ version is
>> 0.7.  I'm also wondering if MiNiFi is actively developed or is it risky to
>> include it in new projects?
>>
>> Jeremy
>>
>


MiNiFi version

2020-07-21 Thread Jeremy Pemberton-Pigott
Hi,

Can someone help out with the version of NiFi that MiNiFi tracks?  The Java
version seems to be at 0.5 for a long time while the C++ version is 0.7.
I'm also wondering if MiNiFi is actively developed or is it risky to
include it in new projects?

Jeremy


Re: Session state in cluster HandleHttpRequest and HandleHttpReponse

2020-06-30 Thread Jeremy Pemberton-Pigott
Thanks Peter that makes sense. I'll try a wait/notify using an identifier for 
the node in the Spark messages being monitored so that the same node will 
receive the reply from Spark and respond to the client that initiated the 
connection. 

Regards,

Jeremy


On 30 Jun 2020, at 22:41, Peter Turcsanyi  wrote:


Hi Jeremy,

I don't think you can accept the request in one node and send back the response 
from another node. 
There is an open HTTP connection between the client and the NiFi node while the 
HandleHttpRequest -> ... -> HandleHttpResponse flow is running.
Even if we passed the request/response context object between the NiFi nodes 
via a distributed cache, it would not be possible to send back the HTTP 
response to a client that originally connected and sent the request to another 
node. At least I don't know any solution to it in/outside NiFi.

Best,
Peter

> On Tue, Jun 30, 2020 at 6:04 AM Jeremy Pemberton-Pigott 
>  wrote:
> Hi,
> 
> I have a cluster of 3 nodes and the incoming request on one node's 
> HandleHttpRequest may be replied to by a different node's HandleHttpResponse, 
> in between there is a Spark streaming job process.  Is there any example how 
> to do that, maybe with DistributeMapCacheService?  So that I can still reply 
> back to the client with the necessary data in the response.
> 
> Jeremy


Session state in cluster HandleHttpRequest and HandleHttpReponse

2020-06-29 Thread Jeremy Pemberton-Pigott
Hi,

I have a cluster of 3 nodes and the incoming request on one node's
HandleHttpRequest may be replied to by a different node's
HandleHttpResponse, in between there is a Spark streaming job process.  Is
there any example how to do that, maybe with DistributeMapCacheService?  So
that I can still reply back to the client with the necessary data in the
response.

Jeremy


Re: Listing a folder with millions of files

2020-03-09 Thread Jeremy Pemberton-Pigott
Thanks for the suggestions guys. The pre-filtered list is possibly one I can 
use. 

Regards,

Jeremy


> On 9 Mar 2020, at 20:16, Shawn Weeks  wrote:
> 


When I’ve had to do this I just skip trying to use ListFile and instead create 
a text file containing a list of all the files that can be used with the 
SplitFile and FetchFile processors to pull things in in batches. Even with 
filtering ListFile will iterate through a lot of files.
 
Thanks
 
From: Edward Armes 
Reply-To: "users@nifi.apache.org" 
Date: Monday, March 9, 2020 at 4:43 AM
To: "users@nifi.apache.org" 
Subject: Re: Listing a folder with millions of files
 
Hi Jeremy,
 
In this case I don't think there is an easy answer here.
 
You may have some luck with adjusting the max runtime of the processor but 
without checking the the processors implementation I couldn't know for certain 
if that would have any effect.
 
Edward

On Mon, 9 Mar 2020, 06:34 Jeremy Pemberton-Pigott,  wrote:
Hi,
 
I need to list a sub-set (few 100,000) of files in a folder with millions of 
files (to do some historical processing).  What's the best way I can do that?  
ListFiles is taking way too long and seems to try to dump the entire list to 
the flow when I test it on a smaller folder list.  It would be good if the 
listing emitted files in smaller chunks so that the flow can start working on 
them.
 
Regards,
 
Jeremy

Listing a folder with millions of files

2020-03-09 Thread Jeremy Pemberton-Pigott
Hi,

I need to list a sub-set (few 100,000) of files in a folder with millions
of files (to do some historical processing).  What's the best way I can do
that?  ListFiles is taking way too long and seems to try to dump the entire
list to the flow when I test it on a smaller folder list.  It would be good
if the listing emitted files in smaller chunks so that the flow can start
working on them.

Regards,

Jeremy


1.6.0 odd queue behavior

2019-11-14 Thread Jeremy Pemberton-Pigott
Hi,

I have an oversized queue due to some unknown problem.  That's not the
issue at the moment.  I am restarting the cluster of 3 nodes (all data is
stuck on 1 node) each time the queues stop processing data.  What I observe
is that from a queue of about 2 million flow files it will proceed through
the first few processors and then eventually no changes will happen.  The
queues show as full, ultimately 0 in and 0 out, the debug log shows that
the processors have stopped from back pressure but the queues which are
causing the back pressure are empty when you list them.  If some queues
have 5000 files in the them they will show as empty and if the processor
actually processes any data it will show say 15000 then drop back to 5000.

Is this a known issue? Can I adjust something to make it able to process
all the backlogged data?  Been restarting this for a week and got it from 9
million down to 2 million but it seems to be getting slower.  Drive and
folder space is no issue.  Heap space is around 24GB/node and only 1 node
with the data is about 50% heap.

Regards,

Jeremy


Re: Oversized queue between process groups

2019-09-17 Thread Jeremy Pemberton-Pigott
I checked the logs that I can find but nothing useful, most of it is gone
because the drive was full of previous logs from it generating errors about
it not being able to write or update any files.  The logs are all gone now
as part of the recovery process removed all the logs.  What I do notice is
that if it received a large number of files from a list processor for
example, say 30 files, it isn't swapping the files back in after
swapping them out of the queue.  The queue will show files to be processed
and the next processor in the flow will show a very high tasks count in the
millions but not processing anything.  I experience the same problem with
1.9.2 and I'm trying to build a replicable flow to help clarify the
problem.  For this issue between groups it could a similar problem but I've
not been able to replicate it yet as I'm a little busy at the moment.

Regards,

Jeremy

On Sat, Aug 31, 2019 at 10:58 PM Mark Payne  wrote:

> Jeremy,
>
> Thanks for the details & history - I am indeed interested :) Do you see
> any errors in the logs? Particularly around a failure to update the
> FlowFile Repository? I am now thinking that you may be running into
> NIFI-5997 [1]. This appears to have affected at least all 1.x versions
> prior to 1.9. When a queue reaches a certain size (20,000 FlowFiles by
> default), NiFi will swap out the flowfiles to disk to avoid running out of
> memory. It does this in a batch of 10,000 FlowFiles at a time. If there's a
> problem updating the FlowFile repository, before NIFI-5997 was addressed,
> the issue is that the data would be written to a swap file as well as
> staying in the queue. And the next time that a FlowFile came in, this would
> happen again. So you'd quickly see the queue become huge and a lot of data
> written to swap files, with many duplicates of the data.
>
> Not 100% sure that this is what you're hitting, but it's my best hunch at
> the moment. Please do see if you have any errors logged around updating the
> FlowFile Repository.
>
> Thanks
> -Mark
>
>
> [1] https://issues.apache.org/jira/browse/NIFI-5997
>
>
>
> On Aug 30, 2019, at 11:20 PM, Jeremy Pemberton-Pigott <
> fuzzych...@gmail.com> wrote:
>
> Thanks for your reply Mark.
>
> The flow was in sync between nodes, no one edited it as it was started
> from a Docker image and left running. It was running about a month. A
> restart didn't clear the queue. Only 1 node had an issue the others where
> clear. The flow file repository swap directory was about 630 GB in size on
> the full node. It's running on CentOS 7.5.
>
> Below is just a bit of history if your interested otherwise skip it.
>
> The cluster is running CentOS 7.5 on those 3 nodes. Nifi was configured
> with 4GB of heap in the bootstrap. It's run on a partition with 1TB of free
> space (16 thread and 64GB RAM nodes). It had been running for almost a
> month before something happened and then started a backlog for about 1 week
> before someone noticed something was up. The partition was totally full on
> 1 node but Nifi was running, not processing anything of course on the full
> node, the other node was running, and the 3rd had lost its network
> connection on 1 card I think precipitating the problem so that node was not
> connected to the cluster.
>
> I could see the queue was about 210 million in the UI before I shut Nifi
> down to fix things. I cleared out the log folder of the full node (around
> 200 GB of Nifi app logs, for some reason it's not rolling it correctly in
> this case but other nodes are fine) and restarted but the large queue node
> was giving OOM errors on the Jetty start up so I increased the heap to 24
> GB on all nodes to get things started. It could run and the queue showed it
> was correct (I have encountered the queue clearing on restart before with
> small queues).
>
> It began processing the queue so I left it for 2 days to recover while
> clearing out the log folder periodically to keep some drive space available
> (it was generating about 40GB of logs every few hours) and the flow file
> repository swap folder size started off at about 640 GB (normally it's just
> a few MB when it's running). But I noticed that the node would stop
> processing after a short period of time with an update attribute showing a
> partially full queue of 4000 going into a funnel and the whole flow hanging
> with zero in/out everywhere I checked. Each time I restarted Nifi those
> small queues would clear but the same thing would happen.
>
> The large queue is not critical this time so I started clearing the queue
> from the NCM and it's going at a rate of about 75k flow files per minute so
> I'll leave it running over the weekend to see how far it gets while
> everything else is still running to clear other p

Implement heartbeat from remote servers

2019-09-03 Thread Jeremy Pemberton-Pigott
Does anyone have an idea of how to implement a heartbeat message posted
from remote Nifi servers to another server running Nifi? This is so that I
can tell if within say 1 hour there was no heartbeat received from a known
list it could generate a log message with an error message of the server
that no heartbeat was received from.

Thanks,

Jeremy


Re: Oversized queue between process groups

2019-08-30 Thread Jeremy Pemberton-Pigott
Thanks for your reply Mark.

The flow was in sync between nodes, no one edited it as it was started from a 
Docker image and left running. It was running about a month. A restart didn't 
clear the queue. Only 1 node had an issue the others where clear. The flow file 
repository swap directory was about 630 GB in size on the full node. It's 
running on CentOS 7.5.

Below is just a bit of history if your interested otherwise skip it. 

The cluster is running CentOS 7.5 on those 3 nodes. Nifi was configured with 
4GB of heap in the bootstrap. It's run on a partition with 1TB of free space 
(16 thread and 64GB RAM nodes). It had been running for almost a month before 
something happened and then started a backlog for about 1 week before someone 
noticed something was up. The partition was totally full on 1 node but Nifi was 
running, not processing anything of course on the full node, the other node was 
running, and the 3rd had lost its network connection on 1 card I think 
precipitating the problem so that node was not connected to the cluster.

I could see the queue was about 210 million in the UI before I shut Nifi down 
to fix things. I cleared out the log folder of the full node (around 200 GB of 
Nifi app logs, for some reason it's not rolling it correctly in this case but 
other nodes are fine) and restarted but the large queue node was giving OOM 
errors on the Jetty start up so I increased the heap to 24 GB on all nodes to 
get things started. It could run and the queue showed it was correct (I have 
encountered the queue clearing on restart before with small queues). 

It began processing the queue so I left it for 2 days to recover while clearing 
out the log folder periodically to keep some drive space available (it was 
generating about 40GB of logs every few hours) and the flow file repository 
swap folder size started off at about 640 GB (normally it's just a few MB when 
it's running). But I noticed that the node would stop processing after a short 
period of time with an update attribute showing a partially full queue of 4000 
going into a funnel and the whole flow hanging with zero in/out everywhere I 
checked. Each time I restarted Nifi those small queues would clear but the same 
thing would happen. 

The large queue is not critical this time so I started clearing the queue from 
the NCM and it's going at a rate of about 75k flow files per minute so I'll 
leave it running over the weekend to see how far it gets while everything else 
is still running to clear other parallel queues on that node. 

Other than the one node having a large queue it is still running and the other 
nodes are working fine now. No new data is streaming in until Tuesday so I hope 
to clear the backlog on the one node by then. 

Regards,

Jeremy


On 31 Aug 2019, at 03:19, Mark Payne  wrote:

Jeremy,

I'm not sure of any bugs off the top of my head that would necessarily cause 
this, but version 1.6.0 is getting fairly old, so there may well be something 
that I've forgotten about. That being said, there are two "types of bugs" that 
I think are most probable here: (1) There isn't really that much data queued up 
and NiFi is actually reporting the wrong size for the queue; or (2) perhaps one 
node in the cluster got out of sync in terms of the flow and one node actually 
is configured without backpressure being applied?

So there are two things that I would recommend checking out to help diagnose 
what is going on here. Firstly, is the huge backlog spread across all nodes or 
just on one node in the cluster? To determine this, you can go to the "Global 
menu" / Hamburger menu, and go to the Summary Page. From there, if you go to 
the Connections tab and find the connection in there (should be easy  if you 
sort the table based on queue size), you can click the button on the far-right 
that shows the Cluster view, which will break down the size of the connection 
per-node, so you know if all nodes in the cluster have a huge queue size or 
just one.

Secondly, I would be curious to know what happens if you restart the node(s) 
with the huge backlog? Do the FlowFiles magically disappear on restart, with 
the queue showing a small number (indicative of the queue size just being 
wrong), or are they still there (indicative of the queue size being correct)?

Also, what operating system are you running? There was a bug recently about 
data not being properly swapped back in on Windows but I think that was 
introduced after 1.6.0 and then fixed quickly.

This should help to know where to focus energy on finding the problem.

Thanks
-Mark

> On Aug 30, 2019, at 12:24 PM, Jeremy Pemberton-Pigott  
> wrote:
> 
> Yes there is one but not near the output port of the split json processor 
> it's shortly after the input port of a child PG. The output is actually 
> connected to 3 child PGs and each of those has an update attribute processor 
> on their output port. The other PG input po

Re: Oversized queue between process groups

2019-08-30 Thread Jeremy Pemberton-Pigott
Yes there is one but not near the output port of the split json processor it's 
shortly after the input port of a child PG. The output is actually connected to 
3 child PGs and each of those has an update attribute processor on their output 
port. The other PG input port on the left is connected to a route on attribute 
processor inside it. 

Queue of PG1 input-> input port to processors -> connection to 3 child PGs -> 
each PG has split json after input port -> processors -> update attribute -> 
queue to output port of child PG -> queue to output port of PG1 -> queue to PG2 
input (100s of millions in queue) -> input port to route on attribute -> ...

Regards,

Jeremy


On 30 Aug 2019, at 20:45, Bryan Bende  wrote:

Can you show what is happening inside the first process group? Is there a 
SplitText processor with line count of 1? 

> On Fri, Aug 30, 2019 at 4:21 AM Jeremy Pemberton-Pigott 
>  wrote:
> Hi Pierre,
> 
> I'm using Nifi version 1.6.0.
> 04/03/2018 08:16:22 UTC
> 
> Tagged nifi-1.6.0-RC3
> 
> From 7c0ee01 on branch NIFI-4995-RC3
> 
> FlowFile expiration = 0
> Back pressure object threshold = 2
> Back pressure data size threshold = 1GB
> 
> The connection is just from the output port of 1 PG to the input port of 
> another PG.  Inside the PG all the connections are using the same settings 
> between processors.
> 
> Regards,
> 
> Jeremy
> 
>> On Fri, Aug 30, 2019 at 4:14 PM Pierre Villard  
>> wrote:
>> Hi Jeremy,
>> 
>> It seems very weird that you get 200M flow files in a relationship that 
>> should have backpressure set at 20k flow files. While backpressure is not a 
>> hard limit you should not get to such numbers. Can you give us more details? 
>> What version of NiFi are you using? What's the configuration of your 
>> relationship between your two process groups?
>> 
>> Thanks,
>> Pierre
>> 
>>> Le ven. 30 août 2019 à 07:46, Jeremy Pemberton-Pigott 
>>>  a écrit :
>>> Hi,
>>> 
>>> I have a 3 node Nifi 1.6.0 cluster.  It ran out of disk space when there 
>>> was a log jam of flow files (from slow HBase lookups).  My queue is 
>>> configured for 20,000 but 1 node has over 206 million flow files stuck in 
>>> the queue.  I managed to clear up some disk space to get things going again 
>>> but it seems that after a few mins of processing all the processors in the 
>>> Log Parser process group will stop processing and show zero in/out.
>>> 
>>> Is this a bug fixed in a later version?
>>> 
>>> Each time I have to tear down the Docker containers running Nifi and 
>>> restart it to process a few 10,000s and repeat every few mins.  Any idea 
>>> what I should do to keep it processing the data (nifi-app.log doesn't show 
>>> my anything unusual about the stop or delay) until the 1 node can clear the 
>>> backlog?
>>> 
>>> 
>>> 
>>> Regards,
>>> 
>>> Jeremy
-- 
Sent from Gmail Mobile

Re: Oversized queue between process groups

2019-08-30 Thread Jeremy Pemberton-Pigott
Hi Pierre,

I'm using Nifi version 1.6.0.

04/03/2018 08:16:22 UTC

Tagged nifi-1.6.0-RC3

>From 7c0ee01 on branch NIFI-4995-RC3
FlowFile expiration = 0
Back pressure object threshold = 2
Back pressure data size threshold = 1GB

The connection is just from the output port of 1 PG to the input port of
another PG.  Inside the PG all the connections are using the same settings
between processors.

Regards,

Jeremy

On Fri, Aug 30, 2019 at 4:14 PM Pierre Villard 
wrote:

> Hi Jeremy,
>
> It seems very weird that you get 200M flow files in a relationship that
> should have backpressure set at 20k flow files. While backpressure is not a
> hard limit you should not get to such numbers. Can you give us more
> details? What version of NiFi are you using? What's the configuration of
> your relationship between your two process groups?
>
> Thanks,
> Pierre
>
> Le ven. 30 août 2019 à 07:46, Jeremy Pemberton-Pigott <
> fuzzych...@gmail.com> a écrit :
>
>> Hi,
>>
>> I have a 3 node Nifi 1.6.0 cluster.  It ran out of disk space when there
>> was a log jam of flow files (from slow HBase lookups).  My queue is
>> configured for 20,000 but 1 node has over 206 million flow files stuck in
>> the queue.  I managed to clear up some disk space to get things going again
>> but it seems that after a few mins of processing all the processors in the
>> Log Parser process group will stop processing and show zero in/out.
>>
>> Is this a bug fixed in a later version?
>>
>> Each time I have to tear down the Docker containers running Nifi and
>> restart it to process a few 10,000s and repeat every few mins.  Any idea
>> what I should do to keep it processing the data (nifi-app.log doesn't show
>> my anything unusual about the stop or delay) until the 1 node can clear the
>> backlog?
>>
>> [image: image.png]
>>
>> Regards,
>>
>> Jeremy
>>
>


Oversized queue between process groups

2019-08-29 Thread Jeremy Pemberton-Pigott
Hi,

I have a 3 node Nifi 1.6.0 cluster.  It ran out of disk space when there
was a log jam of flow files (from slow HBase lookups).  My queue is
configured for 20,000 but 1 node has over 206 million flow files stuck in
the queue.  I managed to clear up some disk space to get things going again
but it seems that after a few mins of processing all the processors in the
Log Parser process group will stop processing and show zero in/out.

Is this a bug fixed in a later version?

Each time I have to tear down the Docker containers running Nifi and
restart it to process a few 10,000s and repeat every few mins.  Any idea
what I should do to keep it processing the data (nifi-app.log doesn't show
my anything unusual about the stop or delay) until the 1 node can clear the
backlog?

[image: image.png]

Regards,

Jeremy


Cluster remote process group load balancing under OpenShift

2019-07-31 Thread Jeremy Pemberton-Pigott
I've setup a 3 node Nifi cluster with 1.6.0 running in containers with
OpenShift and using an external Zookeeper, been running for about a year
just fine.  Setup and added Remote Process Groups with HTTP transport for
load balancing but 100,000s of flow files always end up on only 1 server
instead of doing any kind of load balancing across the available nodes.
I'm not sure where to start to troubleshoot the problem.  Any ideas?

An additional issue I have is that even though I have extended the timeout
the NCM frequently times out with a "java.net.SocketTimeoutException: Read
timed out" if I move more than 7 items on the canvas at a time.

Logs on the servers show messages like this:





1b81554a-8ef8-4f94-9012-7ba4b9328962
nifi-node-3.nifi-node
30180
nifi-node-3.nifi-node
master1.company.com

9998
30180
false

CONNECTED
3


nifi-node-3.nifi-node
30180
1b81554a-8ef8-4f94-9012-7ba4b9328962
master1.company.com
30180
9998
false
nifi-node-3.nifi-node


xyz



curl -G http://nifi-node-3.nifi-node:30180/nifi-api/controller/cluster


{

"controller": {

"id": "1f71f850-015c-1000-f050-706cdf417279",

"name": "NiFi Flow",

"comments": "",

"runningCount": 671,

"stoppedCount": 1,

"invalidCount": 0,

"disabledCount": 14,

"inputPortCount": 2,

"outputPortCount": 1,

"remoteSiteListeningPort": 9998,

"siteToSiteSecure": false,

"instanceId": "a6776171-f2f6-4e8e-9204-b596076d3c88",

"inputPorts": [{

"id": "97c84ad0-8f43-317b-bd73-da3e8ca860da",

"name": "Incoming Data",

"comments": "",

"state": "RUNNING",

"validationErrors": []

}, {

"id": "4778ff9e-016c-1000--b00ef890",

"name": "Log Balancer",

"state": "RUNNING",

"validationErrors": []

}

],

"outputPorts": [{

"id": "47d51740-016c-1000--238edc9b",

"name": "Logs Out",

"state": "RUNNING"

}

]

}

}


Node 1 and 3 are busy but node 2 is not.

Cluster info (run inside container): curl -G
http://nifi-node-3.nifi-node:30180/nifi-api/controller/cluster


{

"cluster": {

"nodes": [{

"nodeId": "611dd5d5-6a71-4175-98a3-197c90d5677a",

"address": "nifi-node-2.nifi-node",

"apiPort": 30180,

"status": "CONNECTED",

"heartbeat": "07/31/2019 10:40:22 UTC",

"roles": [],

"activeThreadCount": 78,

"queued": "0 / 0 bytes",

"events": [{

"timestamp": "07/31/2019 09:16:47 UTC",

"category": "INFO",

"message": "Received first heartbeat from connecting node. Node connected."

}, {

"timestamp": "07/31/2019 09:16:34 UTC",

"category": "INFO",

"message": "Connection requested from existing node. Setting status to
connecting."

}

],

"nodeStartTime": "07/31/2019 09:16:12 UTC"

}, {

"nodeId": "1b81554a-8ef8-4f94-9012-7ba4b9328962",

"address": "nifi-node-3.nifi-node",

"apiPort": 30180,

"status": "CONNECTED",

"heartbeat": "07/31/2019 10:40:20 UTC",

"roles": ["Primary Node", "Cluster Coordinator"],

"activeThreadCount": 90,

"queued": "12,649 / 1.51 GB",

"events": [{

"timestamp": "07/31/2019 09:16:37 UTC",

"category": "INFO",

"message": "Received first heartbeat from connecting node. Node connected."

}, {

"timestamp": "07/31/2019 09:16:24 UTC",

"category": "INFO",

"message": "Connection requested from existing node. Setting status to
connecting."

}

],

"nodeStartTime": "07/31/2019 09:16:09 UTC"

}, {

"nodeId": "3632f9d3-ecd1-4f6b-89aa-bf071f596de8",

"address": "nifi-node-1.nifi-node",

"apiPort": 30180,

"status": "CONNECTED",

"heartbeat": "07/31/2019 10:40:20 UTC",

"roles": [],

"activeThreadCount": 124,

"queued": "68,612 / 24.57 MB",

"events": [{

"timestamp": "07/31/2019 09:17:07 UTC",

"category": "INFO",

"message": "Received first heartbeat from connecting node. Node connected."

}, {

"timestamp": "07/31/2019 09:16:53 UTC",

"category": "INFO",

"message": "Connection requested from existing node. Setting status to
connecting."

}

],

"nodeStartTime": "07/31/2019 09:16:38 UTC"

}

],

"generated": "10:40:24 UTC"

}

}