how to execute code when processor is stopping
Hi guys, I'm trying to execute some code in my processor when the processor is asked to stop in the NIFI UI by the user, I checked the developer guide and only find OnUnscheduled will be called when the processor is no long scheduled to run. I've tested this OnUnscheduled, it will also be called after timed out executing OnScheduled task. So is there a way to execute some code only when the processor is stopping? Regards, Ben
Re: get controller service's configuration
Thanks for your explanation Bryan, it seems that I could only get the DBCP controller service's driver name after something like getDrivername() is added into the current DBCPService interface. Regards, Ben 2017-08-10 22:01 GMT+08:00 Bryan Bende <bbe...@gmail.com>: > The way controller services are setup you have the following... > > - DBCPService interface (provides getConnection()) extends > ControllerService interface (empty interface to indicate it is a CS) > - DBCPConnectionPool extends AbstractControllerService implements > DBCPService > - Processor XYZ depends on DBCPService interface > > The DBCPService interface is the common point between the processor > and the implementations. The processor XYZ classpath only knows about > the DBCPService interface, it doesn't know anything about the classes > that implement it... there could actually be several implementations > in different NARs, but it is up to the framework to provide access to > these. > > Since the processor only depends on the interface, which in this case > only exposes getConnection(), you can't really assume the service has > certain properties because DBCPConnectionPool.DB_DRIVERNAME is > specific to the DBCPConnectionPool implementation... another > implementation may not have that property, or may call it something > different. The interface would have to provide getDriverName() so that > each implementation could provide that. > > -Bryan > > > On Thu, Aug 10, 2017 at 4:33 AM, 尹文才 <batman...@gmail.com> wrote: > > Thanks Andy, I've tried your approach, in my case the controller service > is > > a DBCPConnectionPool and when I tried to get driver class name property > > through context.getProperty(DBCPConnectionPool.DB_ > DRIVERNAME).getValue(), > > but I the value is null. The AbstractControllerService class does have a > > method getConfigurationContext() to get configuration context, but the > > method is protected. So I still didn't find a feasible way to get the > > controller service's properties. > > > > Regards, > > Ben > > > > 2017-08-10 12:18 GMT+08:00 Andy LoPresto <alopre...@apache.org>: > > > >> You can get the current property values of a controller service from the > >> processor by using the ProcessContext object. For example, in GetHTTP > [1], > >> in the @OnScheduled method, you could do: > >> > >> context.getControllerServiceLookup().getControllerService("my- > >> controller-service-id”); > >> > >> context.getProperty("controller-service-property-name"); > >> context.getProperty(SomeControllerService. > CONSTANT_PROPERTY_DESCRIPTOR); > >> > >> I forget if context.getProperty() will give the controller service > >> properties as well as the processor properties. If it doesn’t, you can > cast > >> the retrieved ControllerService into AbstractControllerService or the > >> concrete class and access available properties directly from the > >> encapsulated ConfigurationContext. > >> > >> [1] https://github.com/apache/nifi/blob/master/nifi-nar- > >> bundles/nifi-standard-bundle/nifi-standard-processors/src/ > >> main/java/org/apache/nifi/processors/standard/GetHTTP.java#L295 > >> > >> Andy LoPresto > >> alopre...@apache.org > >> *alopresto.apa...@gmail.com <alopresto.apa...@gmail.com>* > >> PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4 BACE 3C6E F65B 2F7D EF69 > >> > >> On Aug 9, 2017, at 6:57 PM, 尹文才 <batman...@gmail.com> wrote: > >> > >> Thanks Koji, I checked the link you provided and I think getting a > >> DataSource is no different than getting the DBCP service(they could just > >> get the connection). Actually I was trying to get the configured driver > >> class to check the database type. > >> > >> Regards, > >> Ben > >> > >> 2017-08-10 9:29 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>: > >> > >> Hi Ben, > >> > >> I'm not aware of ways to obtain configurations of a controller from a > >> processor. Those should be encapsulated inside a controller service. > >> If you'd like to create DataSource instance instead of just obtaining > >> a connection, this discussion might be helpful: > >> https://github.com/apache/nifi/pull/1417 > >> > >> Although I would not recommend, if you really need to obtain all > >> configurations, you can do so by calling NiFi REST API from your > >> processor. > >> > >> Thanks, > >> Koji > >> > >> On Thu, Aug 10, 2017 at 10:09 AM, 尹文才 <batman...@gmail.com> wrote: > >> > >> Hi guys, I have a customized processor with a DBCP controller service as > >> > >> a > >> > >> property. I could get the DBCP controller service in my code, but does > >> anyone know how to obtain all the configurations of the DBCP controller > >> service in java code(e.g. Database Connection URL, Database Driver > >> Location, etc) Thanks. > >> > >> Regards, > >> Ben > >> > >> > >> > >> >
Re: how to execute code when processor is stopping
Hi Pierre, I've checked the developer guide before I sent the email and according to the developer guide, the method annotated with OnUnScheduled will be called in 2 cases according to my understanding, please correct me if I'm wrong: 1. when user tries to stop the processor in the NIFI UI, thus the processor is no longer scheduled to run in this case, and the method will be called. 2. when method annotated with OnScheduled throws exceptions, for example time out execution exception, the OnUnScheduled method will also be called. My question is how to tell the first scenario from the second one? Thanks. Regards, Ben 2017-08-11 15:51 GMT+08:00 Pierre Villard <pierre.villard...@gmail.com>: > Hi Ben, > > You might want to have a look here: > https://nifi.apache.org/docs/nifi-docs/html/developer- > guide.html#component-lifecycle > > Pierre > > 2017-08-11 9:06 GMT+02:00 尹文才 <batman...@gmail.com>: > > > Hi guys, I'm trying to execute some code in my processor when the > processor > > is asked to stop in the NIFI UI by the user, I checked the developer > guide > > and only find OnUnscheduled will be called when the processor is no long > > scheduled to run. I've tested this OnUnscheduled, it will also be called > > after timed out executing OnScheduled task. So is there a way to execute > > some code only when the processor is stopping? > > > > Regards, > > Ben > > >
Re: get controller service's configuration
Thanks Andy, I've tried your approach, in my case the controller service is a DBCPConnectionPool and when I tried to get driver class name property through context.getProperty(DBCPConnectionPool.DB_DRIVERNAME).getValue(), but I the value is null. The AbstractControllerService class does have a method getConfigurationContext() to get configuration context, but the method is protected. So I still didn't find a feasible way to get the controller service's properties. Regards, Ben 2017-08-10 12:18 GMT+08:00 Andy LoPresto <alopre...@apache.org>: > You can get the current property values of a controller service from the > processor by using the ProcessContext object. For example, in GetHTTP [1], > in the @OnScheduled method, you could do: > > context.getControllerServiceLookup().getControllerService("my- > controller-service-id”); > > context.getProperty("controller-service-property-name"); > context.getProperty(SomeControllerService.CONSTANT_PROPERTY_DESCRIPTOR); > > I forget if context.getProperty() will give the controller service > properties as well as the processor properties. If it doesn’t, you can cast > the retrieved ControllerService into AbstractControllerService or the > concrete class and access available properties directly from the > encapsulated ConfigurationContext. > > [1] https://github.com/apache/nifi/blob/master/nifi-nar- > bundles/nifi-standard-bundle/nifi-standard-processors/src/ > main/java/org/apache/nifi/processors/standard/GetHTTP.java#L295 > > Andy LoPresto > alopre...@apache.org > *alopresto.apa...@gmail.com <alopresto.apa...@gmail.com>* > PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4 BACE 3C6E F65B 2F7D EF69 > > On Aug 9, 2017, at 6:57 PM, 尹文才 <batman...@gmail.com> wrote: > > Thanks Koji, I checked the link you provided and I think getting a > DataSource is no different than getting the DBCP service(they could just > get the connection). Actually I was trying to get the configured driver > class to check the database type. > > Regards, > Ben > > 2017-08-10 9:29 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>: > > Hi Ben, > > I'm not aware of ways to obtain configurations of a controller from a > processor. Those should be encapsulated inside a controller service. > If you'd like to create DataSource instance instead of just obtaining > a connection, this discussion might be helpful: > https://github.com/apache/nifi/pull/1417 > > Although I would not recommend, if you really need to obtain all > configurations, you can do so by calling NiFi REST API from your > processor. > > Thanks, > Koji > > On Thu, Aug 10, 2017 at 10:09 AM, 尹文才 <batman...@gmail.com> wrote: > > Hi guys, I have a customized processor with a DBCP controller service as > > a > > property. I could get the DBCP controller service in my code, but does > anyone know how to obtain all the configurations of the DBCP controller > service in java code(e.g. Database Connection URL, Database Driver > Location, etc) Thanks. > > Regards, > Ben > > > >
Re: Assigning JIRAs to myself
Hi Aldrin, I would also like to be added to the list of NIFI contributors to start working on some easy JIRAS, thanks. Regards, Ben 2017-08-11 20:03 GMT+08:00 Aldrin Piri: > Hi Arun, > > I've added you to the list of contributors and you should be able to self > assign issues. Please let us know if you are having any issues. > > Thanks and look forward to the contributions! > > On Fri, Aug 11, 2017 at 7:59 AM, Arun Manivannan wrote: > > > Hi, > > > > Good morning. > > > > I would like to try to solve some beginner level issues. > > > > Have read the developer's guide and the contributor guide. > > > > How can I assign an issue to myself on the JIRA? Great work on the > > "beginner" tag. Thanks ! > > > > > > Cheers, > > Arun > > >
Re: how to execute code when processor is stopping
Hi Bryan and Matt, thanks for all your suggestions, I was trying to make sure that the OnUnscheduled method was not called too frequently when the connection is offline. You guys were right, these sort of logic should not be placed inside the scheduling methods, I need to refactor my code to place them into onTrigger. Regards, Ben 2017-08-12 0:53 GMT+08:00 Matt Burgess <mattyb...@apache.org>: > I'm a fan of Bryan's last suggestion. For dynamic/automatic retry > (such as database connection retries), I recommend putting the > connection logic in the onTrigger() method. If you can check > connectivity, then your onTrigger() would know whether it needs to try > to reconnect before it does any work. If it tries to reconnect and is > unsuccessful, you can yield the processor if you want, so as not to > hammer the DB with connection attempts. The CaptureChangeMySQL > processor does this, it has a retry loop for trying various nodes in a > MySQL cluster, but once it's connected, it goes on about its work, and > if a connection fails, it will retry the connection loop before it > does any more work. It only uses onTrigger and none of the scheduling > stuff. > > Regards, > Matt > > On Fri, Aug 11, 2017 at 11:06 AM, Bryan Bende <bbe...@gmail.com> wrote: > > Ben, > > > > I apologize if I am not understanding the situation, but... > > > > In the case where your OnScheduled code is in a retry loop, if someone > > stops the processor it will call your OnUnscheduled code which will > > set the flag to bounce out of the loop. This sounds like what you > > want, right? > > > > In the case where OnScheduled times out, the framework is calling > > OnUnscheduled which would call your code to set the flag, but wouldn't > > that not matter at this point because you aren't looping anymore > > anyway? > > > > If the framework calls OnScheduled again, your code should set the > > flag back to whatever it needs to be to start looping again right? > > > > An alternative that might avoid some of this would be to lazily > > initialize the connection in the onTrigger method of the processor. > > > > -Bryan > > > > > > On Fri, Aug 11, 2017 at 9:16 AM, 尹文才 <batman...@gmail.com> wrote: > >> thanks Pierre, my case is that I need to implement a database connection > >> retry logic inside my OnScheduled method, when the database is not > >> available I will retry until the connection is back online. > >> The problem is when the database is offline it will throw timed out > >> execution exception inside OnScheduled and then call OnUnscheduled. But > >> when I manually stop the processor the OnUnsheduled > >> will also get called. I know my logic sounds a little weird but I need > to > >> set some flag in the OnUnscheduled method to stop the retry logic inside > >> OnScheduled in order to be able to stop the processor, > >> otherwise the processor is not able to be stopped unless I restart the > >> whole NIFI. > >> > >> Regards, > >> Ben > >> > >> 2017-08-11 17:18 GMT+08:00 Pierre Villard <pierre.villard...@gmail.com > >: > >> > >>> Oh OK, get it now! > >>> > >>> Not sure what's your use case, but I don't think you can do that > unless you > >>> set some information when the process actually executes onTrigger for > the > >>> first time and you then check this value in your OnUnscheduled > annotated > >>> method. > >>> > >>> Pierre > >>> > >>> 2017-08-11 10:11 GMT+02:00 尹文才 <batman...@gmail.com>: > >>> > >>> > Hi Pierre, I've checked the developer guide before I sent the email > and > >>> > according to the developer guide, the method annotated with > OnUnScheduled > >>> > will be called in 2 cases according to my understanding, please > correct > >>> me > >>> > if I'm wrong: > >>> > 1. when user tries to stop the processor in the NIFI UI, thus the > >>> processor > >>> > is no longer scheduled to run in this case, and the method will be > >>> called. > >>> > 2. when method annotated with OnScheduled throws exceptions, for > example > >>> > time out execution exception, the OnUnScheduled method will also be > >>> called. > >>> > > >>> > My question is how to tell the first scenario from the second one? > >>> Thanks. > >>> > > >>> > Regards, > >>> > Ben > >&
Re: how to execute code when processor is stopping
Thanks Koji, this is exactly what I'm looking for. Regards, Ben 2017-08-14 12:21 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>: > Hi Ben, > > AbstractSessionFactoryProcessor has a protected isScheduled() method, > that can be used by a processor implementation class to check whether > it is still being scheduled (not being stopped). > For an example, ConsumeKafka_0_10.onTrigger uses it with while loop: > https://github.com/apache/nifi/blob/master/nifi-nar- > bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/ > src/main/java/org/apache/nifi/processors/kafka/pubsub/ > ConsumeKafka_0_10.java#L316 > > Thanks, > Koji > > On Mon, Aug 14, 2017 at 11:12 AM, 尹文才 <batman...@gmail.com> wrote: > > Hi guys, about my case, I have another question, if I implement the retry > > logic inside the ontrigger method and I need to retry until the database > > connection is back online, in case user needs to stop the processor in > NIFI > > UI while the database is still offline, according to my understanding > > ontrigger will keep executing the retry logic and the processor couldn't > be > > stopped even if user tries to stop it, is there any way to solve this > > problem? Thanks. > > > > Regards, > > Ben > > > > 2017-08-12 6:30 GMT+08:00 尹文才 <batman...@gmail.com>: > > > >> Hi Bryan and Matt, thanks for all your suggestions, I was trying to make > >> sure that the OnUnscheduled method was not called too frequently when > the > >> connection is offline. > >> You guys were right, these sort of logic should not be placed inside the > >> scheduling methods, I need to refactor my code to place them into > onTrigger. > >> > >> Regards, > >> Ben > >> > >> 2017-08-12 0:53 GMT+08:00 Matt Burgess <mattyb...@apache.org>: > >> > >>> I'm a fan of Bryan's last suggestion. For dynamic/automatic retry > >>> (such as database connection retries), I recommend putting the > >>> connection logic in the onTrigger() method. If you can check > >>> connectivity, then your onTrigger() would know whether it needs to try > >>> to reconnect before it does any work. If it tries to reconnect and is > >>> unsuccessful, you can yield the processor if you want, so as not to > >>> hammer the DB with connection attempts. The CaptureChangeMySQL > >>> processor does this, it has a retry loop for trying various nodes in a > >>> MySQL cluster, but once it's connected, it goes on about its work, and > >>> if a connection fails, it will retry the connection loop before it > >>> does any more work. It only uses onTrigger and none of the scheduling > >>> stuff. > >>> > >>> Regards, > >>> Matt > >>> > >>> On Fri, Aug 11, 2017 at 11:06 AM, Bryan Bende <bbe...@gmail.com> > wrote: > >>> > Ben, > >>> > > >>> > I apologize if I am not understanding the situation, but... > >>> > > >>> > In the case where your OnScheduled code is in a retry loop, if > someone > >>> > stops the processor it will call your OnUnscheduled code which will > >>> > set the flag to bounce out of the loop. This sounds like what you > >>> > want, right? > >>> > > >>> > In the case where OnScheduled times out, the framework is calling > >>> > OnUnscheduled which would call your code to set the flag, but > wouldn't > >>> > that not matter at this point because you aren't looping anymore > >>> > anyway? > >>> > > >>> > If the framework calls OnScheduled again, your code should set the > >>> > flag back to whatever it needs to be to start looping again right? > >>> > > >>> > An alternative that might avoid some of this would be to lazily > >>> > initialize the connection in the onTrigger method of the processor. > >>> > > >>> > -Bryan > >>> > > >>> > > >>> > On Fri, Aug 11, 2017 at 9:16 AM, 尹文才 <batman...@gmail.com> wrote: > >>> >> thanks Pierre, my case is that I need to implement a database > >>> connection > >>> >> retry logic inside my OnScheduled method, when the database is not > >>> >> available I will retry until the connection is back online. > >>> >> The problem is when the database is offline it will throw timed out > >>> >> execution exce
Re: how to execute code when processor is stopping
Hi guys, about my case, I have another question, if I implement the retry logic inside the ontrigger method and I need to retry until the database connection is back online, in case user needs to stop the processor in NIFI UI while the database is still offline, according to my understanding ontrigger will keep executing the retry logic and the processor couldn't be stopped even if user tries to stop it, is there any way to solve this problem? Thanks. Regards, Ben 2017-08-12 6:30 GMT+08:00 尹文才 <batman...@gmail.com>: > Hi Bryan and Matt, thanks for all your suggestions, I was trying to make > sure that the OnUnscheduled method was not called too frequently when the > connection is offline. > You guys were right, these sort of logic should not be placed inside the > scheduling methods, I need to refactor my code to place them into onTrigger. > > Regards, > Ben > > 2017-08-12 0:53 GMT+08:00 Matt Burgess <mattyb...@apache.org>: > >> I'm a fan of Bryan's last suggestion. For dynamic/automatic retry >> (such as database connection retries), I recommend putting the >> connection logic in the onTrigger() method. If you can check >> connectivity, then your onTrigger() would know whether it needs to try >> to reconnect before it does any work. If it tries to reconnect and is >> unsuccessful, you can yield the processor if you want, so as not to >> hammer the DB with connection attempts. The CaptureChangeMySQL >> processor does this, it has a retry loop for trying various nodes in a >> MySQL cluster, but once it's connected, it goes on about its work, and >> if a connection fails, it will retry the connection loop before it >> does any more work. It only uses onTrigger and none of the scheduling >> stuff. >> >> Regards, >> Matt >> >> On Fri, Aug 11, 2017 at 11:06 AM, Bryan Bende <bbe...@gmail.com> wrote: >> > Ben, >> > >> > I apologize if I am not understanding the situation, but... >> > >> > In the case where your OnScheduled code is in a retry loop, if someone >> > stops the processor it will call your OnUnscheduled code which will >> > set the flag to bounce out of the loop. This sounds like what you >> > want, right? >> > >> > In the case where OnScheduled times out, the framework is calling >> > OnUnscheduled which would call your code to set the flag, but wouldn't >> > that not matter at this point because you aren't looping anymore >> > anyway? >> > >> > If the framework calls OnScheduled again, your code should set the >> > flag back to whatever it needs to be to start looping again right? >> > >> > An alternative that might avoid some of this would be to lazily >> > initialize the connection in the onTrigger method of the processor. >> > >> > -Bryan >> > >> > >> > On Fri, Aug 11, 2017 at 9:16 AM, 尹文才 <batman...@gmail.com> wrote: >> >> thanks Pierre, my case is that I need to implement a database >> connection >> >> retry logic inside my OnScheduled method, when the database is not >> >> available I will retry until the connection is back online. >> >> The problem is when the database is offline it will throw timed out >> >> execution exception inside OnScheduled and then call OnUnscheduled. But >> >> when I manually stop the processor the OnUnsheduled >> >> will also get called. I know my logic sounds a little weird but I need >> to >> >> set some flag in the OnUnscheduled method to stop the retry logic >> inside >> >> OnScheduled in order to be able to stop the processor, >> >> otherwise the processor is not able to be stopped unless I restart the >> >> whole NIFI. >> >> >> >> Regards, >> >> Ben >> >> >> >> 2017-08-11 17:18 GMT+08:00 Pierre Villard <pierre.villard...@gmail.com >> >: >> >> >> >>> Oh OK, get it now! >> >>> >> >>> Not sure what's your use case, but I don't think you can do that >> unless you >> >>> set some information when the process actually executes onTrigger for >> the >> >>> first time and you then check this value in your OnUnscheduled >> annotated >> >>> method. >> >>> >> >>> Pierre >> >>> >> >>> 2017-08-11 10:11 GMT+02:00 尹文才 <batman...@gmail.com>: >> >>> >> >>> > Hi Pierre, I've checked the developer guide before I sent the email >> and >> >>> > accordin
Trigger timer-driven processor to run from outside NIFI
Hi guys, is it possible for a Java application outside the NIFI environment to trigger a timer-driven processor to do its work(I mean its ontrigger method will be called) when the processor is not yet due to be triggered? The reason why I'm asking about this is because I have a Java applicatiion with UI outside NIFI and there're some configuration data that could be updated into a database, and my processor in NIFI need to get the updated configuration data from that database as soon as possible, but my processor is configured to be timer driven of 5 mins. I hope the processor could be triggered to run after the configuration is updated by the Java application when it's not yet reached the time for it to be triggered. Thanks. Regards, Ben
Re: Trigger timer-driven processor to run from outside NIFI
Thanks Koji, I checked the NIFI REST API and it seems that I need to use Groovy to do it(I don't understand Groovy), is there any Java related examples which interact with NIFI via REST API? Thanks. Regards, Ben 2017-07-14 13:49 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>: > Hi Ben, > > Just an idea, using ListenHTTP or HandleHTTPRequest (or whatever > listener type processor you can use) in front of your processor might > be helpful. You also need to change your processor to support incoming > FlowFile as well if it doesn't currently. This way, the outside > application can send a simple HTTP request to do your processor its > job. > > Another possible way would be using NiFi REST API, stop the processor > and then restart it. When the processor is restarted, its onTrigger > will be called, and it will wait for next time to be scheduled (next > 5min in your case). > > Thanks, > Koji > > On Wed, Jul 12, 2017 at 5:04 PM, 尹文才 <batman...@gmail.com> wrote: > > Hi guys, is it possible for a Java application outside the NIFI > environment > > to trigger a timer-driven processor to do its work(I mean its ontrigger > > method will be called) when the processor is not yet due to be triggered? > > The reason why I'm asking about this is because I have a Java > applicatiion > > with UI outside NIFI and there're some configuration data that could be > > updated into a database, and my processor in NIFI need to get the updated > > configuration > > data from that database as soon as possible, but my processor is > configured > > to be timer driven of 5 mins. I hope the processor could be triggered to > > run after the configuration is updated by the Java application when it's > > not yet reached > > the time for it to be triggered. Thanks. > > > > Regards, > > Ben >
Re: Trigger timer-driven processor to run from outside NIFI
Thanks very much Koji for your quick response and your example, I will look into your example. Regards, Ben 2017-07-14 17:27 GMT+08:00 Koji Kawamura <ijokaruma...@apache.org>: > Hi Ben, > > If the processor is running, stop will wait for the thread to complete. > Please see stop() method here. > https://github.com/apache/nifi/blob/master/nifi-nar- > bundles/nifi-framework-bundle/nifi-framework/nifi-framework- > core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java > > To stop or start a processor, you just need to update its state. I have an > example here. Please refer updateProcessorState function. > https://github.com/ijokarumawak/nifi-api-client-js/blob/master/nifi-api- > client.js > > Trying to reply from mobile at best effort. Excuse me for typos if any.. > > Thanks, > Koji > > > On Jul 14, 2017 5:29 PM, "尹文才" <batman...@gmail.com> wrote: > > Hi Koji, I tried to play with the NIFI REST API with chrome postman app, > when you mentioned about restart the processor via REST API, did you mean > first sending stop command and then send start command via the API "PUT > /processors/{id}"? By the way, when I send the stop command to the > processor, will it finish its work > and then stop the processor or will it simply kill the task directly? > Thanks. > > Regards, > Ben > > 2017-07-14 15:13 GMT+08:00 尹文才 <batman...@gmail.com>: > > > Thanks Koji, I checked the NIFI REST API and it seems that I need to use > > Groovy to do it(I don't understand Groovy), is there any Java related > > examples which interact with NIFI via REST API? Thanks. > > > > Regards, > > Ben > > > > 2017-07-14 13:49 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>: > > > >> Hi Ben, > >> > >> Just an idea, using ListenHTTP or HandleHTTPRequest (or whatever > >> listener type processor you can use) in front of your processor might > >> be helpful. You also need to change your processor to support incoming > >> FlowFile as well if it doesn't currently. This way, the outside > >> application can send a simple HTTP request to do your processor its > >> job. > >> > >> Another possible way would be using NiFi REST API, stop the processor > >> and then restart it. When the processor is restarted, its onTrigger > >> will be called, and it will wait for next time to be scheduled (next > >> 5min in your case). > >> > >> Thanks, > >> Koji > >> > >> On Wed, Jul 12, 2017 at 5:04 PM, 尹文才 <batman...@gmail.com> wrote: > >> > Hi guys, is it possible for a Java application outside the NIFI > >> environment > >> > to trigger a timer-driven processor to do its work(I mean its > ontrigger > >> > method will be called) when the processor is not yet due to be > >> triggered? > >> > The reason why I'm asking about this is because I have a Java > >> applicatiion > >> > with UI outside NIFI and there're some configuration data that could > be > >> > updated into a database, and my processor in NIFI need to get the > >> updated > >> > configuration > >> > data from that database as soon as possible, but my processor is > >> configured > >> > to be timer driven of 5 mins. I hope the processor could be triggered > to > >> > run after the configuration is updated by the Java application when > it's > >> > not yet reached > >> > the time for it to be triggered. Thanks. > >> > > >> > Regards, > >> > Ben > >> > > > > >
FlowFile position when transferred to Relationship.SELF
Hi guys, I have written a customized processor whose functionality is similar to the NIFI's Wait processor, the difference is my processor needs to wait a batch of data and when the batch end flag is found, it will transfer the batch of data to destinations. I checked the source code of Wait processor and also transferred the flowfiles to Relationship.SELF which is the incoming queue when the batch of data is not yet complete. The problem I found was sometimes I could see the sequence of the FlowFiles transferred from my processor to destinations were not in order. I then added sequence attribute(number starting from 1) to all FlowFiles coming into my processor and I could verify that this problem happen from time to time, but I couldn't find the stable way to reproduce it. My question is how does NIFI handle the FlowFile when it's being transferred to Relationship.SELF, does it put back to its original position in the incoming queue? Thanks. Regards, Ben
Re: FlowFile position when transferred to Relationship.SELF
Hi Koji, thanks for the explanation, I checked the NIFI documentation you provided, do you mean I should use the FIFO prioritizer in my case? Because as you mentioned the FlowFiles would be put back into their original positions, so as I understand using FIFO should make the FlowFiles in consistent order. Regards, Ben 2017-07-10 17:06 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>: > Hi, > > I think it puts back a FlowFile to its original position but update > queued date as implemented here: > https://github.com/apache/nifi/blob/master/nifi-nar- > bundles/nifi-framework-bundle/nifi-framework/nifi-framework- > core/src/main/java/org/apache/nifi/controller/repository/ > StandardProcessSession.java#L1851 > > In order to pull FlowFiles from a queue in consistent order, you need > to specify a prioritizer. > https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#prioritization > > I'm just curious about the functionality you added. Wait processor has > 'Releasable FlowFile Count' and it could be used to make a batch of > FlowFiles wait and go. Or Notify's 'Signal Counter Delta' could be > useful, too. > > Regards, > Koji > > On Mon, Jul 10, 2017 at 4:43 PM, 尹文才 <batman...@gmail.com> wrote: > > Hi guys, I have written a customized processor whose functionality is > > similar to the NIFI's Wait processor, the difference is my processor > needs > > to wait a batch of data and when the batch end flag is found, it will > > transfer the batch of data to destinations. > > > > I checked the source code of Wait processor and also transferred the > > flowfiles to Relationship.SELF which is the incoming queue when the batch > > of data is not yet complete. The problem I found was sometimes I could > see > > the sequence of the FlowFiles transferred from my processor to > destinations > > were not in order. > > I then added sequence attribute(number starting from 1) to all FlowFiles > > coming into my processor and I could verify that this problem happen from > > time to time, but I couldn't find the stable way to reproduce it. > > > > My question is how does NIFI handle the FlowFile when it's being > > transferred to Relationship.SELF, does it put back to its original > position > > in the incoming queue? Thanks. > > > > Regards, > > Ben >
how to submit bug found in NIFI code
Hi guys, I wonder if any of you knows the correct way to submit bugs found in current NIFI's code, I checked the JIRA page of NIFI and it seems that I need to login to be able to submit an issue. Thanks. Regards, Ben
Re: how to submit bug found in NIFI code
Thanks Janosch, I managed to submit a NIFI processor bug after creating an account of JIRA. The bug pertains to a new NIFI processor called PutDatabaseRecord and the link of the issue is as blow, please correct me if anything is wrong since this is the first time I submit a bug for NIFI: https://issues.apache.org/jira/browse/NIFI-4228 Regards, Ben 2017-07-25 19:29 GMT+08:00 Woschitz, Janosch < janosch.wosch...@thinkbiganalytics.com>: > Hi Ben, > > A submission via JIRA would be the best way to submit a bug. All Apache > projects using the same JIRA installation but you need to sign up in order > to use them. > > You can create an account via https://issues.apache.org/ > jira/secure/Signup!default.jspa > > If this should not be possible you can still share you bug report on the > dev list but I would highly recommend to file it via JIRA. > > Regards, > Janosch > > > > On 25.07.17, 11:30, "尹文才" <batman...@gmail.com> wrote: > > >Hi guys, I wonder if any of you knows the correct way to submit bugs found > >in current NIFI's code, I checked the JIRA page of NIFI and it seems that > I > >need to login to be able to submit an issue. Thanks. > > > >Regards, > >Ben >
get ControllerService inside Advanced custom UI
Hi guys, I' m currently creating a custom UI for my processor and I need to get the DBCP Connection ControllerService in the backend code for the UI, the ControllerService is a property defined in the processor properties. I saw there's one method in class NiFiWebConfigurationContext as below: ontrollerService getControllerService(String serviceIdentifier, String componentId); However this method requires the serviceIdentifier which I don't know how to obtain. How exactly should I get the ControllerService? Thanks. Regards, Ben
Re: get ControllerService inside Advanced custom UI
Thanks Matt, how should I get the service identifier with the method getComponentDetails, do you mean I should add a service identifier property in my processor to let user to specify it for the controller service? In that case how could the service identifier configured associate with the configured controller service? Regards, Ben 2017-07-01 21:35 GMT+08:00 Matt Gilman <matt.c.gil...@gmail.com>: > Ben, > > That is the correct method to invoke. The serviceIdentifier will be the > value of the Property that identifies the service in question. The > componentId will be the identifier of Processor. You'll need to invoke > > ComponentDetails getComponentDetails(NiFiWebRequestContext requestContext) > > in order to get the service identifier from the configured properties. > > Thanks > > Matt > > On Sat, Jul 1, 2017 at 4:17 AM, 尹文才 <batman...@gmail.com> wrote: > > > Hi guys, I' m currently creating a custom UI for my processor and I need > to > > get the DBCP Connection ControllerService in the backend code for the UI, > > the ControllerService is a property defined in the processor properties. > I > > saw there's one method in class NiFiWebConfigurationContext as below: > > > > ontrollerService getControllerService(String serviceIdentifier, String > > componentId); > > > > > > However this method requires the serviceIdentifier which I don't know > > > > how to obtain. How exactly should I get the ControllerService? > > > > Thanks. > > > > > > Regards, > > > > Ben > > >
Re: get ControllerService inside Advanced custom UI
Thanks Matt, I was having difficulty getting the serviceIdentifier from that controller service property defined in my processor, now I got it. Regards, Ben 2017-07-02 21:40 GMT+08:00 Matt Gilman <matt.c.gil...@gmail.com>: > Ben, > > Yes, configuring the Processor with a Controller Service is what I meant. > If you define a Property that identifies a Controller Service (for instance > here [1]) the value of that property when accessed through > getComponentDetails will be the serviceIdentifier. > > Matt > > [1] > https://github.com/apache/nifi/blob/master/nifi-nar- > bundles/nifi-standard-bundle/nifi-standard-processors/src/ > main/java/org/apache/nifi/processors/standard/InvokeHTTP.java#L205 > > On Sun, Jul 2, 2017 at 12:15 AM, 尹文才 <batman...@gmail.com> wrote: > > > Thanks Matt, how should I get the service identifier with the method > > getComponentDetails, do you mean I should add a service identifier > property > > in my processor to let user to specify it for the controller service? In > > that case how could the service identifier configured associate with the > > configured controller service? > > > > Regards, > > Ben > > > > 2017-07-01 21:35 GMT+08:00 Matt Gilman <matt.c.gil...@gmail.com>: > > > > > Ben, > > > > > > That is the correct method to invoke. The serviceIdentifier will be the > > > value of the Property that identifies the service in question. The > > > componentId will be the identifier of Processor. You'll need to invoke > > > > > > ComponentDetails getComponentDetails(NiFiWebRequestContext > > requestContext) > > > > > > in order to get the service identifier from the configured properties. > > > > > > Thanks > > > > > > Matt > > > > > > On Sat, Jul 1, 2017 at 4:17 AM, 尹文才 <batman...@gmail.com> wrote: > > > > > > > Hi guys, I' m currently creating a custom UI for my processor and I > > need > > > to > > > > get the DBCP Connection ControllerService in the backend code for the > > UI, > > > > the ControllerService is a property defined in the processor > > properties. > > > I > > > > saw there's one method in class NiFiWebConfigurationContext as below: > > > > > > > > ontrollerService getControllerService(String serviceIdentifier, > String > > > > componentId); > > > > > > > > > > > > However this method requires the serviceIdentifier which I don't know > > > > > > > > how to obtain. How exactly should I get the ControllerService? > > > > > > > > Thanks. > > > > > > > > > > > > Regards, > > > > > > > > Ben > > > > > > > > > >
get controller service's configuration
Hi guys, I have a customized processor with a DBCP controller service as a property. I could get the DBCP controller service in my code, but does anyone know how to obtain all the configurations of the DBCP controller service in java code(e.g. Database Connection URL, Database Driver Location, etc) Thanks. Regards, Ben
Re: get controller service's configuration
Thanks Koji, I checked the link you provided and I think getting a DataSource is no different than getting the DBCP service(they could just get the connection). Actually I was trying to get the configured driver class to check the database type. Regards, Ben 2017-08-10 9:29 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>: > Hi Ben, > > I'm not aware of ways to obtain configurations of a controller from a > processor. Those should be encapsulated inside a controller service. > If you'd like to create DataSource instance instead of just obtaining > a connection, this discussion might be helpful: > https://github.com/apache/nifi/pull/1417 > > Although I would not recommend, if you really need to obtain all > configurations, you can do so by calling NiFi REST API from your > processor. > > Thanks, > Koji > > On Thu, Aug 10, 2017 at 10:09 AM, 尹文才 <batman...@gmail.com> wrote: > > Hi guys, I have a customized processor with a DBCP controller service as > a > > property. I could get the DBCP controller service in my code, but does > > anyone know how to obtain all the configurations of the DBCP controller > > service in java code(e.g. Database Connection URL, Database Driver > > Location, etc) Thanks. > > > > Regards, > > Ben >
Re: NIFI templates in template folder in sync with templates inside NIFI UI
Thanks Matt, I could make use of the rest api to automate the template sync work. 2017-09-15 1:00 GMT+08:00 Matt Gilman <matt.c.gil...@gmail.com>: > Ben, > > In the 0.x baseline, the templates were stored in the templates directory > that you're referring to. Starting in the 1.x baseline, the templates were > migrated to become part of the flow.xml.gz. In order to support users > upgrading from 0.x to 1.x, templates in the directory are automatically > moved to the flow.xml.gz. If you're looking to automate template > importing/removal, I would recommend using the REST API. If you open up > your browser you should be able to see these requests in action. > Additionally, you can find documentation for them here [1]. The > import/upload endpoints are under Process Groups and the removal endpoints > are under Templates. > > Thanks > > Matt > > [1] https://nifi.apache.org/docs/nifi-docs/rest-api/index.html > > On Thu, Sep 14, 2017 at 2:40 AM, 尹文才 <batman...@gmail.com> wrote: > > > Hi guys, I put all my flow template xml files inside the nifi template > > directory specified inside the nifi.properties config file so that nifi > > could read in all my templates. But sometimes I need to clear all the > > templates currently inside nifi and then place in a new template, I would > > remove all the template xml files inside the template folder and put the > > new template xml file into it. But when I restart nifi, there're 5 > > templates available, which are 4 old templates and 1 new template. > > > > I know I could manually remove all templates inside nifi, but I wish to > do > > this all programmatically. I also know nifi keeps the templates inside > the > > flow.xml.gz file. > > So does nifi simply adds the new templates in the template folder into > the > > flow.xml.gz file without removing the old ones? > > > > I have used beyond compare trying to compare the template xml file > content > > with the template section inside the flow.xml extracted from flow.xml.gz, > > they're slightly differently and most content are the same, does anyone > > know the relationship between these two? > > > > What I want to achieve is to keep the templates in the template folder > and > > the templates in nifi in sync without having to manually do anything in > > nifi UI, does > > anyone know if it's possible and if so how? Thanks. > > > > /Ben > > >
NIFI templates in template folder in sync with templates inside NIFI UI
Hi guys, I put all my flow template xml files inside the nifi template directory specified inside the nifi.properties config file so that nifi could read in all my templates. But sometimes I need to clear all the templates currently inside nifi and then place in a new template, I would remove all the template xml files inside the template folder and put the new template xml file into it. But when I restart nifi, there're 5 templates available, which are 4 old templates and 1 new template. I know I could manually remove all templates inside nifi, but I wish to do this all programmatically. I also know nifi keeps the templates inside the flow.xml.gz file. So does nifi simply adds the new templates in the template folder into the flow.xml.gz file without removing the old ones? I have used beyond compare trying to compare the template xml file content with the template section inside the flow.xml extracted from flow.xml.gz, they're slightly differently and most content are the same, does anyone know the relationship between these two? What I want to achieve is to keep the templates in the template folder and the templates in nifi in sync without having to manually do anything in nifi UI, does anyone know if it's possible and if so how? Thanks. /Ben
Re: route flow based on variable
Thanks Bryan, I tried after reading your reply, I could use the variable directly in RouteOnAttribute, I thought I could only use attributes of a FlowFile in RouteOnAttribute, thanks. /Ben 2017-10-09 19:24 GMT+08:00 Bryan Bende <bbe...@gmail.com>: > Ben, > > 1) Yes, the variables are hierarchical, so a variable at the root group > would be visible to all components, unless there is a variable with the > same name at a lower level which would override it. > > 2) I haven’t tried this, but I would expect that you should still be able > to use RouteOnAttribute to route on a variable… Lets say that your root > group has a variable “env” and in one environment you have this set to > “dev” and in another environment you have it set to “prod”. You might have > a part of your flow that you only run in “prod” so you put a > RouteOnAttribute with something like ${env:equals(“prod”)} which would only > enable this path in prod. > > Variables on their own are not associated with flow files, so if you > wanted to do something more specific per-flow file, then the only way would > be what you described with using UpdateAttribute. > > Thanks, > > Bryan > > > On Oct 8, 2017, at 10:44 PM, 尹文才 <batman...@gmail.com> wrote: > > > > Hi guys, I've played around with the latest NIFI 1.4.0 release for a > while > > and I think the new variable registry feature is great, however I have 2 > > questions about this feature: > > > > 1. It seems that I could only add variables to a processor group, could I > > add a global variable in the NIFI root processor group so it could be > used > > anywhere inside NIFI? > > > > 2. I want to route to different flows based on the variables I added, but > > currently the only way I know that could make this work is like this: > > myprocessor->updateAttribute(add the variable into FlowFile attribute)-> > > routeOnAttribute->different flows based on the variable value > > > > I didn't find any routeOnVariable processor, is there any easier way > that I > > could use to implement conditional flow in NIFI? Thanks > > > > /Ben > >
route flow based on variable
Hi guys, I've played around with the latest NIFI 1.4.0 release for a while and I think the new variable registry feature is great, however I have 2 questions about this feature: 1. It seems that I could only add variables to a processor group, could I add a global variable in the NIFI root processor group so it could be used anywhere inside NIFI? 2. I want to route to different flows based on the variables I added, but currently the only way I know that could make this work is like this: myprocessor->updateAttribute(add the variable into FlowFile attribute)-> routeOnAttribute->different flows based on the variable value I didn't find any routeOnVariable processor, is there any easier way that I could use to implement conditional flow in NIFI? Thanks /Ben
Re: save variables to template when creating a template
Thanks Matt, how should I transfer the variable registry, are these variables saved in a file somewhere? /Ben 2017-10-12 0:58 GMT+08:00 Matt Burgess <mattyb...@apache.org>: > Ben, > > The variables are separate from the template by design, this is so you > can provide a variable registry in dev/test and a different one for > production, for example. If you want to keep the variable you would > likely want to transfer the variable registry along with the template > to the other NiFi instance. > > Regards, > Matt > > On Tue, Oct 10, 2017 at 11:45 PM, 尹文才 <batman...@gmail.com> wrote: > > Hi guys, I've been using the new variable registry feature in 1.4.0 and I > > like it very much. But when I define a variable in a processor group and > > try to create a template out out it, I noticed that the variable I > defined > > is not included in the template after I import the template to another > NIFI > > instance. Does anyone know if it's possible to keep the variable with the > > template? Thanks. > > > > /Ben >
save variables to template when creating a template
Hi guys, I've been using the new variable registry feature in 1.4.0 and I like it very much. But when I define a variable in a processor group and try to create a template out out it, I noticed that the variable I defined is not included in the template after I import the template to another NIFI instance. Does anyone know if it's possible to keep the variable with the template? Thanks. /Ben
Re: Unable to checkpoint FlowFile Repository (No space left on device)
Thanks Michael, I could try turning off the content repository archive feature. One thing I'm curious though is when I watched the content repository folder when my nifi flow is still actively running, it kept writing new empty files to the archive folder and some of them are old flowfiles' content after I checked(only a very small portion, most are empty files, I used the default archive option of 12 hours and 50%), the speed of creating these files differ slightly when I tune the interval of my data extracting processor. The most important thing is My custom processor that extracts data has no more data to extract at the moment, so I didn't know what's going on inside NiFI that kept increasing the use of inodes. /Ben 2017-12-15 0:27 GMT+08:00 Michael Moser <moser...@gmail.com>: > Maximum number of inodes is defined when you build a file system on a > device. You would have to backup your data, rebuild your file system and > tell it to allocate more inodes than the default (an mkfs option, I > think?), then restore your data. > > You can turn off the NiFi content_repository archive, if you don't need > that feature, by setting nifi.content.repository.archive.enabled=false in > your nifi.properties. > > -- Mike > > > On Thu, Dec 14, 2017 at 2:22 AM, 尹文才 <batman...@gmail.com> wrote: > > > One Strange thing one of our testers found was that we're using the > default > > 12hours archive and 50% disk space configuration, he noticed that when > nifi > > removed the archived files inside the content_repository, he checked the > > inodes ' count didn't go down, then he tried to remove some of the > archived > > files inside using the rm command, the inodes's count did go down. > > > > /Ben > > > > 2017-12-14 9:49 GMT+08:00 尹文才 <batman...@gmail.com>: > > > > > Hi Michael, the no space left on device occurred again and I checked > the > > > inodes at the time and found it was indeed full, why would the inodes > > > become full and are there any solutions to get around this problem? > > Thanks. > > > > > > /Ben > > > > > > 2017-12-13 13:36 GMT+08:00 尹文才 <batman...@gmail.com>: > > > > > >> Hi Michael, I checked the system available inodes by running df -i > > >> command and there're quite enough inodes in the system. I then removed > > all > > >> the files in all repository folders and restarted > > >> the system, I couldn't see the error again. I will continue to track > the > > >> problem to see what's causing it, but it seems not relevant to the > inode > > >> use-up reason you mentioned. Thanks. > > >> > > >> /Ben > > >> > > >> 2017-12-12 23:45 GMT+08:00 Michael Moser <moser...@gmail.com>: > > >> > > >>> Greetings Ben, > > >>> > > >>> The "No space left on device" error can also be caused by running out > > of > > >>> inodes on your device. You can check this with "df -i". > > >>> > > >>> -- Mike > > >>> > > >>> > > >>> On Tue, Dec 12, 2017 at 1:36 AM, 尹文才 <batman...@gmail.com> wrote: > > >>> > > >>> > sorry that I forgot to mention the environment that caused this > > >>> problem, > > >>> > I'm using the latest nifi 1.4.0 release and installed it on centos > 7. > > >>> > > > >>> > 2017-12-12 14:35 GMT+08:00 尹文才 <batman...@gmail.com>: > > >>> > > > >>> > > Hi guys, I'm running into a very weird problem, I wrote a > processor > > >>> > > specifically to extract some data > > >>> > > and I found starting from yesterday it kept showing errors in the > > >>> log, as > > >>> > > below: > > >>> > > > > >>> > > 2017-12-12 14:01:04,661 INFO [pool-10-thread-1] o.a.n.c.r. > > >>> > WriteAheadFlowFileRepository > > >>> > > Initiating checkpoint of FlowFile Repository > > >>> > > 2017-12-12 14:01:04,676 ERROR [pool-10-thread-1] o.a.n.c.r. > > >>> > WriteAheadFlowFileRepository > > >>> > > Unable to checkpoint FlowFile Repository due to > > >>> > > java.io.FileNotFoundException: ../flowfile_repository/ > > >>> > partition-5/96.journal > > >>> > > (No space left on device) > > >>> > > java.io.FileNotFoundException: ../flowfile_reposito
Re: Unable to checkpoint FlowFile Repository (No space left on device)
One Strange thing one of our testers found was that we're using the default 12hours archive and 50% disk space configuration, he noticed that when nifi removed the archived files inside the content_repository, he checked the inodes ' count didn't go down, then he tried to remove some of the archived files inside using the rm command, the inodes's count did go down. /Ben 2017-12-14 9:49 GMT+08:00 尹文才 <batman...@gmail.com>: > Hi Michael, the no space left on device occurred again and I checked the > inodes at the time and found it was indeed full, why would the inodes > become full and are there any solutions to get around this problem? Thanks. > > /Ben > > 2017-12-13 13:36 GMT+08:00 尹文才 <batman...@gmail.com>: > >> Hi Michael, I checked the system available inodes by running df -i >> command and there're quite enough inodes in the system. I then removed all >> the files in all repository folders and restarted >> the system, I couldn't see the error again. I will continue to track the >> problem to see what's causing it, but it seems not relevant to the inode >> use-up reason you mentioned. Thanks. >> >> /Ben >> >> 2017-12-12 23:45 GMT+08:00 Michael Moser <moser...@gmail.com>: >> >>> Greetings Ben, >>> >>> The "No space left on device" error can also be caused by running out of >>> inodes on your device. You can check this with "df -i". >>> >>> -- Mike >>> >>> >>> On Tue, Dec 12, 2017 at 1:36 AM, 尹文才 <batman...@gmail.com> wrote: >>> >>> > sorry that I forgot to mention the environment that caused this >>> problem, >>> > I'm using the latest nifi 1.4.0 release and installed it on centos 7. >>> > >>> > 2017-12-12 14:35 GMT+08:00 尹文才 <batman...@gmail.com>: >>> > >>> > > Hi guys, I'm running into a very weird problem, I wrote a processor >>> > > specifically to extract some data >>> > > and I found starting from yesterday it kept showing errors in the >>> log, as >>> > > below: >>> > > >>> > > 2017-12-12 14:01:04,661 INFO [pool-10-thread-1] o.a.n.c.r. >>> > WriteAheadFlowFileRepository >>> > > Initiating checkpoint of FlowFile Repository >>> > > 2017-12-12 14:01:04,676 ERROR [pool-10-thread-1] o.a.n.c.r. >>> > WriteAheadFlowFileRepository >>> > > Unable to checkpoint FlowFile Repository due to >>> > > java.io.FileNotFoundException: ../flowfile_repository/ >>> > partition-5/96.journal >>> > > (No space left on device) >>> > > java.io.FileNotFoundException: ../flowfile_repository/ >>> > partition-5/96.journal >>> > > (No space left on device) >>> > > at java.io.FileOutputStream.open0(Native Method) >>> > > at java.io.FileOutputStream.open(FileOutputStream.java:270) >>> > > at java.io.FileOutputStream.>> >(FileOutputStream.java:213) >>> > > at java.io.FileOutputStream.>> >(FileOutputStream.java:162) >>> > > at org.wali.MinimalLockingWriteAheadLog$Partition.rollover( >>> > > MinimalLockingWriteAheadLog.java:779) >>> > > at org.wali.MinimalLockingWriteAheadLog.checkpoint( >>> > > MinimalLockingWriteAheadLog.java:528) >>> > > at org.apache.nifi.controller.repository. >>> > > WriteAheadFlowFileRepository.checkpoint(WriteAheadFlowFileRe >>> pository. >>> > > java:451) >>> > > at org.apache.nifi.controller.repository. >>> > > WriteAheadFlowFileRepository$1.run(WriteAheadFlowFileRepository. >>> > java:423) >>> > > at java.util.concurrent.Executors$RunnableAdapter. >>> > > call(Executors.java:511) >>> > > at java.util.concurrent.FutureTask.runAndReset( >>> > > FutureTask.java:308) >>> > > at java.util.concurrent.ScheduledThreadPoolExecutor$ >>> > > ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) >>> > > at java.util.concurrent.ScheduledThreadPoolExecutor$ >>> > > ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) >>> > > at java.util.concurrent.ThreadPoolExecutor.runWorker( >>> > > ThreadPoolExecutor.java:1142) >>> > > at java.util.concurrent.ThreadPoolExecutor$Worker.run( >>> > > ThreadPoolExecutor.java:617) >>> > > at java.lang.Thread.run(Thread.java:745) >>> > > >>> > > >>> > > I noticed the log mentioned no space left on device and I went to >>> check >>> > > the available space and found 33G left. Does anyone know what could >>> > > possibly cause this and how to resolve this problem, thanks >>> > > >>> > > /Ben >>> > > >>> > >>> >> >> >
Re: Unable to checkpoint FlowFile Repository (No space left on device)
Hi Michael, the no space left on device occurred again and I checked the inodes at the time and found it was indeed full, why would the inodes become full and are there any solutions to get around this problem? Thanks. /Ben 2017-12-13 13:36 GMT+08:00 尹文才 <batman...@gmail.com>: > Hi Michael, I checked the system available inodes by running df -i command > and there're quite enough inodes in the system. I then removed all the > files in all repository folders and restarted > the system, I couldn't see the error again. I will continue to track the > problem to see what's causing it, but it seems not relevant to the inode > use-up reason you mentioned. Thanks. > > /Ben > > 2017-12-12 23:45 GMT+08:00 Michael Moser <moser...@gmail.com>: > >> Greetings Ben, >> >> The "No space left on device" error can also be caused by running out of >> inodes on your device. You can check this with "df -i". >> >> -- Mike >> >> >> On Tue, Dec 12, 2017 at 1:36 AM, 尹文才 <batman...@gmail.com> wrote: >> >> > sorry that I forgot to mention the environment that caused this problem, >> > I'm using the latest nifi 1.4.0 release and installed it on centos 7. >> > >> > 2017-12-12 14:35 GMT+08:00 尹文才 <batman...@gmail.com>: >> > >> > > Hi guys, I'm running into a very weird problem, I wrote a processor >> > > specifically to extract some data >> > > and I found starting from yesterday it kept showing errors in the >> log, as >> > > below: >> > > >> > > 2017-12-12 14:01:04,661 INFO [pool-10-thread-1] o.a.n.c.r. >> > WriteAheadFlowFileRepository >> > > Initiating checkpoint of FlowFile Repository >> > > 2017-12-12 14:01:04,676 ERROR [pool-10-thread-1] o.a.n.c.r. >> > WriteAheadFlowFileRepository >> > > Unable to checkpoint FlowFile Repository due to >> > > java.io.FileNotFoundException: ../flowfile_repository/ >> > partition-5/96.journal >> > > (No space left on device) >> > > java.io.FileNotFoundException: ../flowfile_repository/ >> > partition-5/96.journal >> > > (No space left on device) >> > > at java.io.FileOutputStream.open0(Native Method) >> > > at java.io.FileOutputStream.open(FileOutputStream.java:270) >> > > at java.io.FileOutputStream.(FileOutputStream.java:213) >> > > at java.io.FileOutputStream.(FileOutputStream.java:162) >> > > at org.wali.MinimalLockingWriteAheadLog$Partition.rollover( >> > > MinimalLockingWriteAheadLog.java:779) >> > > at org.wali.MinimalLockingWriteAheadLog.checkpoint( >> > > MinimalLockingWriteAheadLog.java:528) >> > > at org.apache.nifi.controller.repository. >> > > WriteAheadFlowFileRepository.checkpoint(WriteAheadFlowFileRepository. >> > > java:451) >> > > at org.apache.nifi.controller.repository. >> > > WriteAheadFlowFileRepository$1.run(WriteAheadFlowFileRepository. >> > java:423) >> > > at java.util.concurrent.Executors$RunnableAdapter. >> > > call(Executors.java:511) >> > > at java.util.concurrent.FutureTask.runAndReset( >> > > FutureTask.java:308) >> > > at java.util.concurrent.ScheduledThreadPoolExecutor$ >> > > ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) >> > > at java.util.concurrent.ScheduledThreadPoolExecutor$ >> > > ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) >> > > at java.util.concurrent.ThreadPoolExecutor.runWorker( >> > > ThreadPoolExecutor.java:1142) >> > > at java.util.concurrent.ThreadPoolExecutor$Worker.run( >> > > ThreadPoolExecutor.java:617) >> > > at java.lang.Thread.run(Thread.java:745) >> > > >> > > >> > > I noticed the log mentioned no space left on device and I went to >> check >> > > the available space and found 33G left. Does anyone know what could >> > > possibly cause this and how to resolve this problem, thanks >> > > >> > > /Ben >> > > >> > >> > >
Re: Unable to checkpoint FlowFile Repository (No space left on device)
Hi Michael, I checked the system available inodes by running df -i command and there're quite enough inodes in the system. I then removed all the files in all repository folders and restarted the system, I couldn't see the error again. I will continue to track the problem to see what's causing it, but it seems not relevant to the inode use-up reason you mentioned. Thanks. /Ben 2017-12-12 23:45 GMT+08:00 Michael Moser <moser...@gmail.com>: > Greetings Ben, > > The "No space left on device" error can also be caused by running out of > inodes on your device. You can check this with "df -i". > > -- Mike > > > On Tue, Dec 12, 2017 at 1:36 AM, 尹文才 <batman...@gmail.com> wrote: > > > sorry that I forgot to mention the environment that caused this problem, > > I'm using the latest nifi 1.4.0 release and installed it on centos 7. > > > > 2017-12-12 14:35 GMT+08:00 尹文才 <batman...@gmail.com>: > > > > > Hi guys, I'm running into a very weird problem, I wrote a processor > > > specifically to extract some data > > > and I found starting from yesterday it kept showing errors in the log, > as > > > below: > > > > > > 2017-12-12 14:01:04,661 INFO [pool-10-thread-1] o.a.n.c.r. > > WriteAheadFlowFileRepository > > > Initiating checkpoint of FlowFile Repository > > > 2017-12-12 14:01:04,676 ERROR [pool-10-thread-1] o.a.n.c.r. > > WriteAheadFlowFileRepository > > > Unable to checkpoint FlowFile Repository due to > > > java.io.FileNotFoundException: ../flowfile_repository/ > > partition-5/96.journal > > > (No space left on device) > > > java.io.FileNotFoundException: ../flowfile_repository/ > > partition-5/96.journal > > > (No space left on device) > > > at java.io.FileOutputStream.open0(Native Method) > > > at java.io.FileOutputStream.open(FileOutputStream.java:270) > > > at java.io.FileOutputStream.(FileOutputStream.java:213) > > > at java.io.FileOutputStream.(FileOutputStream.java:162) > > > at org.wali.MinimalLockingWriteAheadLog$Partition.rollover( > > > MinimalLockingWriteAheadLog.java:779) > > > at org.wali.MinimalLockingWriteAheadLog.checkpoint( > > > MinimalLockingWriteAheadLog.java:528) > > > at org.apache.nifi.controller.repository. > > > WriteAheadFlowFileRepository.checkpoint(WriteAheadFlowFileRepository. > > > java:451) > > > at org.apache.nifi.controller.repository. > > > WriteAheadFlowFileRepository$1.run(WriteAheadFlowFileRepository. > > java:423) > > > at java.util.concurrent.Executors$RunnableAdapter. > > > call(Executors.java:511) > > > at java.util.concurrent.FutureTask.runAndReset( > > > FutureTask.java:308) > > > at java.util.concurrent.ScheduledThreadPoolExecutor$ > > > ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > > > at java.util.concurrent.ScheduledThreadPoolExecutor$ > > > ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > > > at java.util.concurrent.ThreadPoolExecutor.runWorker( > > > ThreadPoolExecutor.java:1142) > > > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > > > ThreadPoolExecutor.java:617) > > > at java.lang.Thread.run(Thread.java:745) > > > > > > > > > I noticed the log mentioned no space left on device and I went to check > > > the available space and found 33G left. Does anyone know what could > > > possibly cause this and how to resolve this problem, thanks > > > > > > /Ben > > > > > >
Unable to checkpoint FlowFile Repository (No space left on device)
Hi guys, I'm running into a very weird problem, I wrote a processor specifically to extract some data and I found starting from yesterday it kept showing errors in the log, as below: 2017-12-12 14:01:04,661 INFO [pool-10-thread-1] o.a.n.c.r.WriteAheadFlowFileRepository Initiating checkpoint of FlowFile Repository 2017-12-12 14:01:04,676 ERROR [pool-10-thread-1] o.a.n.c.r.WriteAheadFlowFileRepository Unable to checkpoint FlowFile Repository due to java.io.FileNotFoundException: ../flowfile_repository/partition-5/96.journal (No space left on device) java.io.FileNotFoundException: ../flowfile_repository/partition-5/96.journal (No space left on device) at java.io.FileOutputStream.open0(Native Method) at java.io.FileOutputStream.open(FileOutputStream.java:270) at java.io.FileOutputStream.(FileOutputStream.java:213) at java.io.FileOutputStream.(FileOutputStream.java:162) at org.wali.MinimalLockingWriteAheadLog$Partition.rollover(MinimalLockingWriteAheadLog.java:779) at org.wali.MinimalLockingWriteAheadLog.checkpoint(MinimalLockingWriteAheadLog.java:528) at org.apache.nifi.controller.repository.WriteAheadFlowFileRepository.checkpoint(WriteAheadFlowFileRepository.java:451) at org.apache.nifi.controller.repository.WriteAheadFlowFileRepository$1.run(WriteAheadFlowFileRepository.java:423) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) I noticed the log mentioned no space left on device and I went to check the available space and found 33G left. Does anyone know what could possibly cause this and how to resolve this problem, thanks /Ben
Re: Unable to checkpoint FlowFile Repository (No space left on device)
sorry that I forgot to mention the environment that caused this problem, I'm using the latest nifi 1.4.0 release and installed it on centos 7. 2017-12-12 14:35 GMT+08:00 尹文才 <batman...@gmail.com>: > Hi guys, I'm running into a very weird problem, I wrote a processor > specifically to extract some data > and I found starting from yesterday it kept showing errors in the log, as > below: > > 2017-12-12 14:01:04,661 INFO [pool-10-thread-1] > o.a.n.c.r.WriteAheadFlowFileRepository > Initiating checkpoint of FlowFile Repository > 2017-12-12 14:01:04,676 ERROR [pool-10-thread-1] > o.a.n.c.r.WriteAheadFlowFileRepository > Unable to checkpoint FlowFile Repository due to > java.io.FileNotFoundException: ../flowfile_repository/partition-5/96.journal > (No space left on device) > java.io.FileNotFoundException: ../flowfile_repository/partition-5/96.journal > (No space left on device) > at java.io.FileOutputStream.open0(Native Method) > at java.io.FileOutputStream.open(FileOutputStream.java:270) > at java.io.FileOutputStream.(FileOutputStream.java:213) > at java.io.FileOutputStream.(FileOutputStream.java:162) > at org.wali.MinimalLockingWriteAheadLog$Partition.rollover( > MinimalLockingWriteAheadLog.java:779) > at org.wali.MinimalLockingWriteAheadLog.checkpoint( > MinimalLockingWriteAheadLog.java:528) > at org.apache.nifi.controller.repository. > WriteAheadFlowFileRepository.checkpoint(WriteAheadFlowFileRepository. > java:451) > at org.apache.nifi.controller.repository. > WriteAheadFlowFileRepository$1.run(WriteAheadFlowFileRepository.java:423) > at java.util.concurrent.Executors$RunnableAdapter. > call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset( > FutureTask.java:308) > at java.util.concurrent.ScheduledThreadPoolExecutor$ > ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at java.util.concurrent.ScheduledThreadPoolExecutor$ > ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > > I noticed the log mentioned no space left on device and I went to check > the available space and found 33G left. Does anyone know what could > possibly cause this and how to resolve this problem, thanks > > /Ben >
Re: Is there a configuration to limit the size of nifi's flowfile repository
hi guys, thanks for all your answers, I actually have seen that the flowfile repo in one of our openstack centos 7 machine grew up to abour 30 GB, which as a result used up all the disk space allocated for the virtual machine and the flow inside NIFI couldn't proceed and many errors started to appear such as fail to checkpoint, etc.We used NIFI now as a ETL tool to extract some data from sql server for data analysis. I actually have no idea why the flowfile repo would grow up like this, in my idea it is only used to place all flowfile attributes. It would be great if there're some options to limit the flowfile repo size. Thanks. Regard, Ben 2018-04-26 2:08 GMT+08:00 Brandon DeVries <b...@jhu.edu>: > All, > > This is something I think we shouldn't dismiss so easily. While the > FlowFile repo is lighter than the content repo, allowing it to grow too > large can cause major problems. > > Specifically, an "overgrown" FlowFile repo may prevent a NiFi instance from > coming back up after a restart due to the way in which records are held in > memory. If there is more memory available to give to the JVM, this can > sometimes be worked around... but if there isn't you may just be out of > luck. For that matter, allowing the FlowFile repo to grow so large that it > consumes all the heap isn't going to be good for system health in general > (OOM is probably never where you want to be...). > > To Pierre's point "you don't want to limit that repository in size since it > would prevent the workflows to create new flow files"... that's exactly why > I would want to limit the size of the repo. You do then get into questions > of how exactly to do this. For example, you may not want to simply block > all transactions that create a FlowFile, because it may remove even more > (e.g. MergeContent). Additionally, you have to be concerned about > deadlocks (e.g. a "Wait" that hangs forever because its "Notify" is being > starved). Or, perhaps that's all you can do... freeze everything at some > threshold prior to actual damage being done, and alert operators that > manual intervention is necessary (e.g. bring up the graph with > autoResume=false, and bleed off data in a controlled fashion). > > In summary, I believe this is a problem. Even if it doesn't come up often, > when it does it is significant. While the solution likely isn't simple, > it's worth putting some thought towards. > > Brandon > > On Wed, Apr 25, 2018 at 9:43 AM Sivaprasanna <sivaprasanna...@gmail.com> > wrote: > > > No, he actually had mentioned “like content repository”. The answer is, > > there aren’t any properties that support this, AFAIK. Pierre’s response > > pretty much sums up why there aren’t any properties. > > > > Thanks, > > Sivaprasanna > > > > On Wed, 25 Apr 2018 at 7:10 PM, Mike Thomsen <mikerthom...@gmail.com> > > wrote: > > > > > I have a feeling that what Ben meant was how to limit the content > > > repository size. > > > > > > On Wed, Apr 25, 2018 at 8:26 AM Pierre Villard < > > > pierre.villard...@gmail.com> > > > wrote: > > > > > > > Hi Ben, > > > > > > > > Since the flow file repository contains the information of the flow > > files > > > > currently being processed by NiFi, you don't want to limit that > > > repository > > > > in size since it would prevent the workflows to create new flow > files. > > > > > > > > Besides this repository is very lightweight, I'm not sure it'd need > to > > be > > > > limited in size. > > > > Do you have a specific use case in mind? > > > > > > > > Pierre > > > > > > > > > > > > 2018-04-25 9:15 GMT+02:00 尹文才 <batman...@gmail.com>: > > > > > > > > > Hi guys, I checked NIFI's system administrator guide trying to > find a > > > > > configuration item so that the size of the flowfile repository > could > > be > > > > > limited similar to the other repositories(e.g. content repository), > > > but I > > > > > didn't find such configuration items, is there currently any > > > > configuration > > > > > to limit the flowfile repository's size? thanks. > > > > > > > > > > Regards, > > > > > Ben > > > > > > > > > > > > > > >
The rate of the dataflow is exceeding the provenance recording rate. Slowing down flow to accommodate
Hi guys, I'm using nifi 1.4.0 to do some ETL work in my team and I have encountered 2 problems during my testing. The first problem is I found the nifi bulletin board was showing the following warning to me: 2017-12-25 01:31:00,460 WARN [Provenance Maintenance Thread-1] o.a.n.p.PersistentProvenanceRepository The rate of the dataflow is exceeding the provenance recording rate. Slowing down flow to accommodate. Currently, there are 96 journal files (158278228 bytes) and threshold for blocking is 80 (1181116006 bytes) I don't quite understand what this means, and I found also inside the bootstrap log that nifi restarted itself: 2017-12-25 01:31:19,249 WARN [main] org.apache.nifi.bootstrap.RunNiFi Apache NiFi appears to have died. Restarting... Is there anything I could do so solve this problem? The second problem is about the FlowFiles inside my flow, I actually implemented a few custom processors to do the ETL work. one is to extract multiple tables from sql server and for each flowfile out of it, it contains an attribute specifying the name of the temp ods table to create, and the second processor is to get all flowfiles from the first processor and create all the temp ods tables specified in the flowfiles' attribute. I found inside the app log that one of the temp table name already existed when trying to create the temp table, and it caused sql exception. After taking some time investigating in the log, I found the sql query was executed twice in the second processor, once before nifi restart, the second execution was done right after nifi restart: 2017-12-25 01:32:35,639 ERROR [Timer-Driven Process Thread-7] c.z.nifi.processors.ExecuteSqlCommand ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句失败:SELECT TOP 0 * INTO tmp.ods_bd_e_reason_20171225013007005_5567 FROM dbo.ods_bd_e_reason; I have read the document of nifi in depth but I'm still not very aware of nifi's internal mechanism, my suspect is nifi didn't manage to checkpoint the flowfile's state(which queue it was in) in memory into flowfile repository before it was dead and after restarting it recovered the flowfile's state from flowfile repository and then the flowfile went through the second processor again and thus the sql was executed twice. Is this correct? I've attached the relevant part of app log, thanks. Regards, Ben
Re: The rate of the dataflow is exceeding the provenance recording rate. Slowing down flow to accommodate
Hi Koji, I also didn't find anything related to the unexpected shutdown in my logs, is there anything I could do to make NIFI log more verbose information to the logs? Regards, Ben 2017-12-25 14:56 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>: > Hi Ben, > > I looked at the log and I expected to see some indication for the > cause of shutdown, but couldn't find any. > The PersistentProvenanceRepository rate warning is just a warning, and > it shouldn't be the trigger of an unexpected shutdown. I suspect other > reasons such as OOM killer, but I can't do any further investigation > with only these logs. > > Thanks, > Koji > > On Mon, Dec 25, 2017 at 3:46 PM, 尹文才 <batman...@gmail.com> wrote: > > Hi Koji, one more thing, do you have any idea why my first issue leads to > > the unexpected shutdown of NIFI? according to the words, it will just > slow > > down the flow. thanks. > > > > Regards, > > Ben > > > > 2017-12-25 14:31 GMT+08:00 尹文才 <batman...@gmail.com>: > > > >> Hi Koji, thanks for your help, for the first issue, I will switch to use > >> the WriteAheadProvenanceReopsitory implementation. > >> > >> For the second issue, I have uploaded the relevant part of my log file > >> onto my google drive, the link is: > >> https://drive.google.com/open?id=1oxAkSUyYZFy6IWZSeWqHI8e9Utnw1XAj > >> > >> You mean a custom processor could possibly process a flowfile twice only > >> when it's trying to commit the session but it's interrupted so the > flowfile > >> still remains inside the original queue(like NIFI went down)? > >> > >> If you need to see the full log file, please let me know, thanks. > >> > >> Regards, > >> Ben > >> > >> 2017-12-25 13:51 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>: > >> > >>> Hi Ben, > >>> > >>> For your 2nd issue, NiFi commits a process session in Processor > >>> onTrigger when it's executed by NiFi flow engine by calling > >>> session.commit(). > >>> https://github.com/apache/nifi/blob/master/nifi-api/src/main > >>> /java/org/apache/nifi/processor/AbstractProcessor.java#L28 > >>> Once a process session is committed, the FlowFile state (including > >>> which queue it is in) is persisted to disk. > >>> > >>> It's possible for a Processor to process the same FlowFile more than > >>> once, if it has done its job, but failed to commit the session. > >>> For example, if your custom processor created a temp table from a > >>> FlowFile. Then before the process session is committed, something > >>> happened and NiFi process session was rollback. In this case, the > >>> target database is already updated (the temp table is created), but > >>> NiFi FlowFile stays in the incoming queue. If the FlowFile is > >>> processed again, the processor will get an error indicating the table > >>> already exists. > >>> > >>> I tried to look at the logs you attached, but attachments do not seem > >>> to be delivered to this ML. I don't see anything attached. > >>> > >>> Thanks, > >>> Koji > >>> > >>> > >>> On Mon, Dec 25, 2017 at 1:43 PM, Koji Kawamura <ijokaruma...@gmail.com > > > >>> wrote: > >>> > Hi Ben, > >>> > > >>> > Just a quick recommendation for your first issue, 'The rate of the > >>> > dataflow is exceeding the provenance recording rate' warning message. > >>> > I'd recommend using WriteAheadProvenanceRepository instead of > >>> > PersistentProvenanceRepository. WriteAheadProvenanceRepository > >>> > provides better performance. > >>> > Please take a look at the documentation here. > >>> > https://nifi.apache.org/docs/nifi-docs/html/administration-g > >>> uide.html#provenance-repository > >>> > > >>> > Thanks, > >>> > Koji > >>> > > >>> > On Mon, Dec 25, 2017 at 12:56 PM, 尹文才 <batman...@gmail.com> wrote: > >>> >> Hi guys, I'm using nifi 1.4.0 to do some ETL work in my team and I > have > >>> >> encountered 2 problems during my testing. > >>> >> > >>> >> The first problem is I found the nifi bulletin board was showing the > >>> >> following warning to me: > >>> >> > >>> >> 2017-12-25 01:31:00,46
Re: The rate of the dataflow is exceeding the provenance recording rate. Slowing down flow to accommodate
Hi Koji, one more thing, do you have any idea why my first issue leads to the unexpected shutdown of NIFI? according to the words, it will just slow down the flow. thanks. Regards, Ben 2017-12-25 14:31 GMT+08:00 尹文才 <batman...@gmail.com>: > Hi Koji, thanks for your help, for the first issue, I will switch to use > the WriteAheadProvenanceReopsitory implementation. > > For the second issue, I have uploaded the relevant part of my log file > onto my google drive, the link is: > https://drive.google.com/open?id=1oxAkSUyYZFy6IWZSeWqHI8e9Utnw1XAj > > You mean a custom processor could possibly process a flowfile twice only > when it's trying to commit the session but it's interrupted so the flowfile > still remains inside the original queue(like NIFI went down)? > > If you need to see the full log file, please let me know, thanks. > > Regards, > Ben > > 2017-12-25 13:51 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>: > >> Hi Ben, >> >> For your 2nd issue, NiFi commits a process session in Processor >> onTrigger when it's executed by NiFi flow engine by calling >> session.commit(). >> https://github.com/apache/nifi/blob/master/nifi-api/src/main >> /java/org/apache/nifi/processor/AbstractProcessor.java#L28 >> Once a process session is committed, the FlowFile state (including >> which queue it is in) is persisted to disk. >> >> It's possible for a Processor to process the same FlowFile more than >> once, if it has done its job, but failed to commit the session. >> For example, if your custom processor created a temp table from a >> FlowFile. Then before the process session is committed, something >> happened and NiFi process session was rollback. In this case, the >> target database is already updated (the temp table is created), but >> NiFi FlowFile stays in the incoming queue. If the FlowFile is >> processed again, the processor will get an error indicating the table >> already exists. >> >> I tried to look at the logs you attached, but attachments do not seem >> to be delivered to this ML. I don't see anything attached. >> >> Thanks, >> Koji >> >> >> On Mon, Dec 25, 2017 at 1:43 PM, Koji Kawamura <ijokaruma...@gmail.com> >> wrote: >> > Hi Ben, >> > >> > Just a quick recommendation for your first issue, 'The rate of the >> > dataflow is exceeding the provenance recording rate' warning message. >> > I'd recommend using WriteAheadProvenanceRepository instead of >> > PersistentProvenanceRepository. WriteAheadProvenanceRepository >> > provides better performance. >> > Please take a look at the documentation here. >> > https://nifi.apache.org/docs/nifi-docs/html/administration-g >> uide.html#provenance-repository >> > >> > Thanks, >> > Koji >> > >> > On Mon, Dec 25, 2017 at 12:56 PM, 尹文才 <batman...@gmail.com> wrote: >> >> Hi guys, I'm using nifi 1.4.0 to do some ETL work in my team and I have >> >> encountered 2 problems during my testing. >> >> >> >> The first problem is I found the nifi bulletin board was showing the >> >> following warning to me: >> >> >> >> 2017-12-25 01:31:00,460 WARN [Provenance Maintenance Thread-1] >> >> o.a.n.p.PersistentProvenanceRepository The rate of the dataflow is >> exceeding >> >> the provenance recording rate. Slowing down flow to accommodate. >> Currently, >> >> there are 96 journal files (158278228 bytes) and threshold for >> blocking is >> >> 80 (1181116006 bytes) >> >> >> >> I don't quite understand what this means, and I found also inside the >> >> bootstrap log that nifi restarted itself: >> >> >> >> 2017-12-25 01:31:19,249 WARN [main] org.apache.nifi.bootstrap.RunNiFi >> Apache >> >> NiFi appears to have died. Restarting... >> >> >> >> Is there anything I could do so solve this problem? >> >> >> >> The second problem is about the FlowFiles inside my flow, I actually >> >> implemented a few custom processors to do the ETL work. one is to >> extract >> >> multiple tables from sql server and for each flowfile out of it, it >> contains >> >> an attribute >> >> specifying the name of the temp ods table to create, and the second >> >> processor is to get all flowfiles from the first processor and create >> all >> >> the temp ods tables specified in the flowfiles' attribute. >> >> I found inside the app log that one of the te
Re: The rate of the dataflow is exceeding the provenance recording rate. Slowing down flow to accommodate
Hi Koji, thanks for your help, for the first issue, I will switch to use the WriteAheadProvenanceReopsitory implementation. For the second issue, I have uploaded the relevant part of my log file onto my google drive, the link is: https://drive.google.com/open?id=1oxAkSUyYZFy6IWZSeWqHI8e9Utnw1XAj You mean a custom processor could possibly process a flowfile twice only when it's trying to commit the session but it's interrupted so the flowfile still remains inside the original queue(like NIFI went down)? If you need to see the full log file, please let me know, thanks. Regards, Ben 2017-12-25 13:51 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>: > Hi Ben, > > For your 2nd issue, NiFi commits a process session in Processor > onTrigger when it's executed by NiFi flow engine by calling > session.commit(). > https://github.com/apache/nifi/blob/master/nifi-api/src/ > main/java/org/apache/nifi/processor/AbstractProcessor.java#L28 > Once a process session is committed, the FlowFile state (including > which queue it is in) is persisted to disk. > > It's possible for a Processor to process the same FlowFile more than > once, if it has done its job, but failed to commit the session. > For example, if your custom processor created a temp table from a > FlowFile. Then before the process session is committed, something > happened and NiFi process session was rollback. In this case, the > target database is already updated (the temp table is created), but > NiFi FlowFile stays in the incoming queue. If the FlowFile is > processed again, the processor will get an error indicating the table > already exists. > > I tried to look at the logs you attached, but attachments do not seem > to be delivered to this ML. I don't see anything attached. > > Thanks, > Koji > > > On Mon, Dec 25, 2017 at 1:43 PM, Koji Kawamura <ijokaruma...@gmail.com> > wrote: > > Hi Ben, > > > > Just a quick recommendation for your first issue, 'The rate of the > > dataflow is exceeding the provenance recording rate' warning message. > > I'd recommend using WriteAheadProvenanceRepository instead of > > PersistentProvenanceRepository. WriteAheadProvenanceRepository > > provides better performance. > > Please take a look at the documentation here. > > https://nifi.apache.org/docs/nifi-docs/html/administration- > guide.html#provenance-repository > > > > Thanks, > > Koji > > > > On Mon, Dec 25, 2017 at 12:56 PM, 尹文才 <batman...@gmail.com> wrote: > >> Hi guys, I'm using nifi 1.4.0 to do some ETL work in my team and I have > >> encountered 2 problems during my testing. > >> > >> The first problem is I found the nifi bulletin board was showing the > >> following warning to me: > >> > >> 2017-12-25 01:31:00,460 WARN [Provenance Maintenance Thread-1] > >> o.a.n.p.PersistentProvenanceRepository The rate of the dataflow is > exceeding > >> the provenance recording rate. Slowing down flow to accommodate. > Currently, > >> there are 96 journal files (158278228 bytes) and threshold for blocking > is > >> 80 (1181116006 bytes) > >> > >> I don't quite understand what this means, and I found also inside the > >> bootstrap log that nifi restarted itself: > >> > >> 2017-12-25 01:31:19,249 WARN [main] org.apache.nifi.bootstrap.RunNiFi > Apache > >> NiFi appears to have died. Restarting... > >> > >> Is there anything I could do so solve this problem? > >> > >> The second problem is about the FlowFiles inside my flow, I actually > >> implemented a few custom processors to do the ETL work. one is to > extract > >> multiple tables from sql server and for each flowfile out of it, it > contains > >> an attribute > >> specifying the name of the temp ods table to create, and the second > >> processor is to get all flowfiles from the first processor and create > all > >> the temp ods tables specified in the flowfiles' attribute. > >> I found inside the app log that one of the temp table name already > existed > >> when trying to create the temp table, and it caused sql exception. > >> After taking some time investigating in the log, I found the sql query > was > >> executed twice in the second processor, once before nifi restart, the > second > >> execution was done right after nifi restart: > >> > >> 2017-12-25 01:32:35,639 ERROR [Timer-Driven Process Thread-7] > >> c.z.nifi.processors.ExecuteSqlCommand > >> ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] > 执行sql语句失败:SELECT > >> TOP 0 * INTO tmp.ods_bd_e_reason_201
Re: NiFi data HA in cluster mode
Thanks Joe, I will try to avoid to set processor to primary node. By the way, I've seen someone posted suggestion about Data HA in NiFi's wiki(HDFSContentRepository), is there a plan for that feature to be implemented and included in NiFi? Regards, Ben 2018-01-09 14:25 GMT+08:00 Joe Witt <joe.w...@gmail.com>: > I'd avoid setting any processor to primary node only unless it is a > source processor (something that brings data into the system). > > But, yes, I believe your description is accurate as of now. > > Thanks > > On Mon, Jan 8, 2018 at 11:21 PM, 尹文才 <batman...@gmail.com> wrote: > > Thanks Joe, so you mean for example, if I set one processor to run only > on > > primary node in the cluster and there're 100 FlowFiles in the incoming > > queue of the processor > > waiting to be processed by this processor, and the processor suddenly > goes > > down and then another node is elected as the primary node, those 100 > > FlowFiles will be kept locally > > in the node that went down and will continue to be processed by the node > > when it goes back online, these FlowFiles will not be available to the > new > > primary node and other nodes, > > am I correct? > > > > Regards, > > Ben > > > > > > 2018-01-09 14:08 GMT+08:00 Joe Witt <joe.w...@gmail.com>: > > > >> Ben, > >> > >> Data already mid-flow within a node will be kept on the node and > >> processed when the node is back on-line. All other data coming into > >> the cluster can fail-over to other nodes provided you're sourcing data > >> with queuing semantics or automated load balancing or fail-over as-is > >> present in the Apache NiFi Site to Site protocol. > >> > >> Thanks > >> Joe > >> > >> On Mon, Jan 8, 2018 at 11:05 PM, 尹文才 <batman...@gmail.com> wrote: > >> > Hi guys, I have a question about data HA when NiFi is run in clustered > >> > mode, if one node goes down, will the flowfiles owned by this node > taken > >> > over and processed by another node? > >> > Or will the flowfiles be kept locally to that node and will only be > >> > processed when that node is back online? Thanks. > >> > > >> > Regards, > >> > Ben > >> >
NiFi data HA in cluster mode
Hi guys, I have a question about data HA when NiFi is run in clustered mode, if one node goes down, will the flowfiles owned by this node taken over and processed by another node? Or will the flowfiles be kept locally to that node and will only be processed when that node is back online? Thanks. Regards, Ben
Re: NiFi data HA in cluster mode
Thanks Joe, so you mean for example, if I set one processor to run only on primary node in the cluster and there're 100 FlowFiles in the incoming queue of the processor waiting to be processed by this processor, and the processor suddenly goes down and then another node is elected as the primary node, those 100 FlowFiles will be kept locally in the node that went down and will continue to be processed by the node when it goes back online, these FlowFiles will not be available to the new primary node and other nodes, am I correct? Regards, Ben 2018-01-09 14:08 GMT+08:00 Joe Witt <joe.w...@gmail.com>: > Ben, > > Data already mid-flow within a node will be kept on the node and > processed when the node is back on-line. All other data coming into > the cluster can fail-over to other nodes provided you're sourcing data > with queuing semantics or automated load balancing or fail-over as-is > present in the Apache NiFi Site to Site protocol. > > Thanks > Joe > > On Mon, Jan 8, 2018 at 11:05 PM, 尹文才 <batman...@gmail.com> wrote: > > Hi guys, I have a question about data HA when NiFi is run in clustered > > mode, if one node goes down, will the flowfiles owned by this node taken > > over and processed by another node? > > Or will the flowfiles be kept locally to that node and will only be > > processed when that node is back online? Thanks. > > > > Regards, > > Ben >
Re: DBCP connection pool problem - connection already closed
Thanks Brett, I will switch to use jTDS and see how it goes. Regards, Ben 2018-01-11 16:22 GMT+08:00 Brett Ryan <brett.r...@gmail.com>: > Hi Ben. It’s often recommended to use the jTDS driver [1] as the MS > provided driver is considered buggy. > > I don’t make this claim, however; I’ve always used this driver for ms sql > server and never encountered issues. > > [1]: http://jtds.sourceforge.net/faq.html > > > On 11 Jan 2018, at 18:56, 尹文才 <batman...@gmail.com> wrote: > > > > Hi guys, I'm using the PutDatabaseRecord processor to write some data > into > > sql server and the processor is using the DBCP controller service as its > > connection pool. > > Sometimes I could see the following exception inside my log file: > > org.apache.nifi.processor.exception.ProcessException: Failed to commit > > database connection due to com.microsoft.sqlserver.jdbc. > SQLServerException: > > The connection is closed > > > > I know that DBCPConnectionPool is using the validation query property to > > set test-on-borrow to true and also set the validation query accordingly, > > so I set the validation query to "select 1", > > but this doesn't seem to solve the problem. I tried to find the possible > > reason and solution for this problem, the only thread I could find is > that > > someone talked about one possible reason on > > StackOverFlow: > > > > "In case of IOException, the sqlserver jdbc driver marks the connection > as > > closed, but this is not detected by the pool. So the connection is > returned > > in the pool, while unusable." > > > > I'm not sure if the reason he mentioned above is the root cause of my > > problem. Has anyone came across this kind of problem and how to work > around > > this issue? Thanks. > > > > Regards, > > Ben >
Unable to start NiFi
Hi guys, one of my colleagues was working on a remote deploy tool trying to start NiFi remotely on a windows system(NiFi package is on the windows system). I don't know how exactly the deployment tool works, but the basic flow is to launch Ansible locally which uses winrm to remotely launch NiFi through powershell. Unfortunately NiFi couldn't be started. But when he remotely logged into the windows system and manually ran NiFi on the system(through double clicking the run-nifi.bat file), NiFi started without any problem. I checked the logs written by NiFi about the start failure and it appears to me to be a memory problem, but I'm not sure if it is the root cause and don't know how to solve the problem. Could anyone help me out of this problem? Thanks. The 2 created logs are as below: 1. nifi-bootstrap.log: 2018-01-19 17:12:48,657 INFO [main] o.a.n.b.NotificationServiceManager Successfully loaded the following 0 services: [] 2018-01-19 17:12:48,673 INFO [main] org.apache.nifi.bootstrap.RunNiFi Registered no Notification Services for Notification Type NIFI_STARTED 2018-01-19 17:12:48,673 INFO [main] org.apache.nifi.bootstrap.RunNiFi Registered no Notification Services for Notification Type NIFI_STOPPED 2018-01-19 17:12:48,673 INFO [main] org.apache.nifi.bootstrap.RunNiFi Registered no Notification Services for Notification Type NIFI_DIED 2018-01-19 17:12:48,688 INFO [main] org.apache.nifi.bootstrap.Command Starting Apache NiFi... 2018-01-19 17:12:48,688 INFO [main] org.apache.nifi.bootstrap.Command Working Directory: C:\ETL\Nifi\NIFI-1~1.0 2018-01-19 17:12:48,688 INFO [main] org.apache.nifi.bootstrap.Command Command: C:\Program Files\Java\jdk1.8.0_152\bin\java.exe -classpath C:\ETL\Nifi\NIFI-1~1.0\.\conf;C:\ETL\Nifi\NIFI-1~1.0\.\lib\javax.servlet-api-3.1.0.jar;C:\ETL\Nifi\NIFI-1~1.0\.\lib\jcl-over-slf4j-1.7.25.jar;C:\ETL\Nifi\NIFI-1~1.0\.\lib\jetty-schemas-3.1.jar;C:\ETL\Nifi\NIFI-1~1.0\.\lib\jul-to-slf4j-1.7.25.jar;C:\ETL\Nifi\NIFI-1~1.0\.\lib\log4j-over-slf4j-1.7.25.jar;C:\ETL\Nifi\NIFI-1~1.0\.\lib\logback-classic-1.2.3.jar;C:\ETL\Nifi\NIFI-1~1.0\.\lib\logback-core-1.2.3.jar;C:\ETL\Nifi\NIFI-1~1.0\.\lib\nifi-api-1.4.0.jar;C:\ETL\Nifi\NIFI-1~1.0\.\lib\nifi-framework-api-1.4.0.jar;C:\ETL\Nifi\NIFI-1~1.0\.\lib\nifi-nar-utils-1.4.0.jar;C:\ETL\Nifi\NIFI-1~1.0\.\lib\nifi-properties-1.4.0.jar;C:\ETL\Nifi\NIFI-1~1.0\.\lib\nifi-runtime-1.4.0.jar;C:\ETL\Nifi\NIFI-1~1.0\.\lib\slf4j-api-1.7.25.jar -Dorg.apache.jasper.compiler.disablejsr199=true -Xms2048m -Xms2048m -Djava.security.egd=file:/dev/urandom -Dsun.net.http.allowRestrictedHeaders=true -Djava.net.preferIPv4Stack=true -Djava.awt.headless=true -XX:+UseG1GC -Djava.protocol.handler.pkgs=sun.net.www.protocol -Dnifi.properties.file.path=C:\ETL\Nifi\NIFI-1~1.0\.\conf\nifi.properties -Dnifi.bootstrap.listen.port=49572 -Dapp=NiFi -Dorg.apache.nifi.bootstrap.config.log.dir=C:\ETL\Nifi\NIFI-1~1.0\bin\..\\..\logs org.apache.nifi.NiFi 2018-01-19 17:12:49,657 INFO [NiFi logging handler] org.apache.nifi.StdOut # 2018-01-19 17:12:49,657 INFO [NiFi logging handler] org.apache.nifi.StdOut # There is insufficient memory for the Java Runtime Environment to continue. 2018-01-19 17:12:49,657 INFO [NiFi logging handler] org.apache.nifi.StdOut # Native memory allocation (mmap) failed to map 2147483648 bytes for Failed to commit area from 0x0006c000 to 0x00074000 of length 2147483648. 2018-01-19 17:12:49,657 INFO [NiFi logging handler] org.apache.nifi.StdOut # An error report file with more information is saved as: 2018-01-19 17:12:49,657 INFO [NiFi logging handler] org.apache.nifi.StdOut # C:\ETL\Nifi\NIFI-1~1.0\hs_err_pid3448.log 2018-01-19 17:12:49,657 ERROR [NiFi logging handler] org.apache.nifi.StdErr Picked up JAVA_TOOL_OPTIONS: "-Dfile.encoding=UTF8" 2018-01-19 17:12:49,657 ERROR [NiFi logging handler] org.apache.nifi.StdErr Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x0006c000, 2147483648, 0) failed; error='ҳ���ļ�̫С������ɲ�' (DOS error/errno=1455) 2018-01-19 17:12:50,876 WARN [main] org.apache.nifi.bootstrap.Command Failed to set permissions so that only the owner can read pid file C:\ETL\Nifi\NIFI-1~1.0\bin\..\run\nifi.pid; this may allows others to have access to the key needed to communicate with NiFi. Permissions should be changed so that only the owner can read this file 2018-01-19 17:12:53,816 WARN [main] org.apache.nifi.bootstrap.Command Failed to set permissions so that only the owner can read status file C:\ETL\Nifi\NIFI-1~1.0\bin\..\run\nifi.status; this may allows others to have access to the key needed to communicate with NiFi. Permissions should be changed so that only the owner can read this file 2018-01-19 17:12:55,798 INFO [main] org.apache.nifi.bootstrap.Command Launched Apache NiFi with Process ID 3448 2018-01-19 17:12:55,798 INFO [main] org.apache.nifi.bootstrap.RunNiFi NiFi never started. Will not restart NiFi 2. hs_err_pid3448.log: # # There is insufficient memory for the Java Runtime Environment
Re: clear all flowfiles in all queues upon NiFi restart
Thanks Mark, Andrew and Russell, I think using the Volatile-implementations repositories mentioned by Mark should be sufficient for me. Regards, Ben 2018-01-12 22:59 GMT+08:00 Russell Bateman <r...@windofkeltia.com>: > Andrew just meant that if you smoke the contents of all the repository > subdirectories under ${NIFI_ROOT}, it will result in what you seem to be > asking for. > > Hope this helps. > > > On 01/11/2018 09:48 PM, 尹文才 wrote: > >> Hi Andrew, sorry I didn't follow your idea, could you please elaborate >> with >> more details? >> What I want to do is to be able to clear all the FlowFiles when NiFi dies >> unexpectedly and restarts itself. >> >> Regards, >> Ben >> >> 2018-01-12 12:44 GMT+08:00 Andrew Grande <apere...@gmail.com>: >> >> Perhaps you could delete the repository directories when you need to >>> restart with no data? >>> >>> On Thu, Jan 11, 2018, 9:16 PM 尹文才 <batman...@gmail.com> wrote: >>> >>> Hi Mark, forgot to ask about VolatileFlowFileRepository you mentioned, >>>> >>> if I >>> >>>> switch to use VolatileFlowFileRepository, will NiFi swap out all the >>>> >>> other >>> >>>> FlowFiles to disk if a queue is already full? >>>> Is it just simply keeping all FlowFiles in memory? >>>> >>>> Regards, >>>> Ben >>>> >>>> 2018-01-12 12:07 GMT+08:00 尹文才 <batman...@gmail.com>: >>>> >>>> Thanks Mark, my case is that I'm using NiFi to do some ETL work and >>>>> >>>> it's >>> >>>> possible that NiFi dies unexpectedly due to lack of system resources. >>>>> >>>> After >>>> >>>>> NiFi restarts itself, >>>>> I will re-extract all the data from database and re-perform all the >>>>> operations, so I need to clear all possible FlowFiles that might exist >>>>> >>>> in >>> >>>> any queue. >>>>> >>>>> Regards, >>>>> Ben >>>>> >>>>> 2018-01-12 11:49 GMT+08:00 Mark Payne <marka...@hotmail.com>: >>>>> >>>>> Ben, >>>>>> >>>>>> I have to admit - that’s kind of an odd request :) I’m curious what >>>>>> >>>>> the >>> >>>> use case is, if you can share? >>>>>> >>>>>> Regardless, the easiest way would be to update nifi.properties so that >>>>>> the FlowFile repo that is used is the VolatileFlowFileRepository. This >>>>>> would avoid writing the FlowFile state to disk, so ok restart you will >>>>>> >>>>> lose >>>> >>>>> all FlowFiles. The content will still be present, but nifi will delete >>>>>> >>>>> it >>>> >>>>> all on startup because there is no FlowFile associated with it. >>>>>> >>>>>> I’m on my phone right now so can’t easily tell you the exact name of >>>>>> >>>>> the >>> >>>> property to change but you’ll probably find it pretty quickly. The >>>>>> >>>>> Admin >>> >>>> Guide may well explain the different repositories as well. >>>>>> >>>>>> Thanks >>>>>> -Mark >>>>>> >>>>>> Sent from my iPhone >>>>>> >>>>>> On Jan 11, 2018, at 10:31 PM, 尹文才 <batman...@gmail.com> wrote: >>>>>>> >>>>>>> Hi guys, I'm trying to clear all FlowFIles in all queues when NiFi >>>>>>> >>>>>> is >>> >>>> restarted, but I don't know the correct way to do this. I checked >>>>>>> >>>>>> all >>> >>>> NiFi's guide documentation, >>>>>>> it seems there're 2 possible solutions: >>>>>>> 1. write a custom notification service: a notification service could >>>>>>> >>>>>> be >>>> >>>>> notified when NiFi is restarted and then inside the service, delete >>>>>>> >>>>>> all >>>> >>>>> the >>>>>> >>>>>>> files inside content_repository, flowfile_repository and >>>>>>> provenance_repository. >>>>>>>I know there're now 2 existing services: email and http. But I'm >>>>>>> >>>>>> not >>> >>>> quite sure how to correctly write one and deploy it into my NiFi >>>>>>> environment, is there a tutorial on writing one notification >>>>>>> >>>>>> service? >>> >>>> 2. I know from the developer guide that by using the annotation >>>>>>> >>>>>> @Shutdown >>>>>> >>>>>>> in a custom processor, the method could be called when NiFi is >>>>>>> >>>>>> successfully >>>>>> >>>>>>> shut down. The problem with this approach is the method could >>>>>>>not be guaranteed to be called when NiFi dies unexpectedly. >>>>>>> >>>>>>> Does anyone know what is the correct way to implement it? Thanks. >>>>>>> >>>>>>> Regards, >>>>>>> Ben >>>>>>> >>>>>> >>>>> >
Re: clear all flowfiles in all queues upon NiFi restart
Hi Mark, forgot to ask about VolatileFlowFileRepository you mentioned, if I switch to use VolatileFlowFileRepository, will NiFi swap out all the other FlowFiles to disk if a queue is already full? Is it just simply keeping all FlowFiles in memory? Regards, Ben 2018-01-12 12:07 GMT+08:00 尹文才 <batman...@gmail.com>: > Thanks Mark, my case is that I'm using NiFi to do some ETL work and it's > possible that NiFi dies unexpectedly due to lack of system resources. After > NiFi restarts itself, > I will re-extract all the data from database and re-perform all the > operations, so I need to clear all possible FlowFiles that might exist in > any queue. > > Regards, > Ben > > 2018-01-12 11:49 GMT+08:00 Mark Payne <marka...@hotmail.com>: > >> Ben, >> >> I have to admit - that’s kind of an odd request :) I’m curious what the >> use case is, if you can share? >> >> Regardless, the easiest way would be to update nifi.properties so that >> the FlowFile repo that is used is the VolatileFlowFileRepository. This >> would avoid writing the FlowFile state to disk, so ok restart you will lose >> all FlowFiles. The content will still be present, but nifi will delete it >> all on startup because there is no FlowFile associated with it. >> >> I’m on my phone right now so can’t easily tell you the exact name of the >> property to change but you’ll probably find it pretty quickly. The Admin >> Guide may well explain the different repositories as well. >> >> Thanks >> -Mark >> >> Sent from my iPhone >> >> > On Jan 11, 2018, at 10:31 PM, 尹文才 <batman...@gmail.com> wrote: >> > >> > Hi guys, I'm trying to clear all FlowFIles in all queues when NiFi is >> > restarted, but I don't know the correct way to do this. I checked all >> > NiFi's guide documentation, >> > it seems there're 2 possible solutions: >> > 1. write a custom notification service: a notification service could be >> > notified when NiFi is restarted and then inside the service, delete all >> the >> > files inside content_repository, flowfile_repository and >> > provenance_repository. >> > I know there're now 2 existing services: email and http. But I'm not >> > quite sure how to correctly write one and deploy it into my NiFi >> > environment, is there a tutorial on writing one notification service? >> > >> > 2. I know from the developer guide that by using the annotation >> @Shutdown >> > in a custom processor, the method could be called when NiFi is >> successfully >> > shut down. The problem with this approach is the method could >> > not be guaranteed to be called when NiFi dies unexpectedly. >> > >> > Does anyone know what is the correct way to implement it? Thanks. >> > >> > Regards, >> > Ben >> > >
clear all flowfiles in all queues upon NiFi restart
Hi guys, I'm trying to clear all FlowFIles in all queues when NiFi is restarted, but I don't know the correct way to do this. I checked all NiFi's guide documentation, it seems there're 2 possible solutions: 1. write a custom notification service: a notification service could be notified when NiFi is restarted and then inside the service, delete all the files inside content_repository, flowfile_repository and provenance_repository. I know there're now 2 existing services: email and http. But I'm not quite sure how to correctly write one and deploy it into my NiFi environment, is there a tutorial on writing one notification service? 2. I know from the developer guide that by using the annotation @Shutdown in a custom processor, the method could be called when NiFi is successfully shut down. The problem with this approach is the method could not be guaranteed to be called when NiFi dies unexpectedly. Does anyone know what is the correct way to implement it? Thanks. Regards, Ben
Re: clear all flowfiles in all queues upon NiFi restart
Thanks Mark, my case is that I'm using NiFi to do some ETL work and it's possible that NiFi dies unexpectedly due to lack of system resources. After NiFi restarts itself, I will re-extract all the data from database and re-perform all the operations, so I need to clear all possible FlowFiles that might exist in any queue. Regards, Ben 2018-01-12 11:49 GMT+08:00 Mark Payne <marka...@hotmail.com>: > Ben, > > I have to admit - that’s kind of an odd request :) I’m curious what the > use case is, if you can share? > > Regardless, the easiest way would be to update nifi.properties so that the > FlowFile repo that is used is the VolatileFlowFileRepository. This would > avoid writing the FlowFile state to disk, so ok restart you will lose all > FlowFiles. The content will still be present, but nifi will delete it all > on startup because there is no FlowFile associated with it. > > I’m on my phone right now so can’t easily tell you the exact name of the > property to change but you’ll probably find it pretty quickly. The Admin > Guide may well explain the different repositories as well. > > Thanks > -Mark > > Sent from my iPhone > > > On Jan 11, 2018, at 10:31 PM, 尹文才 <batman...@gmail.com> wrote: > > > > Hi guys, I'm trying to clear all FlowFIles in all queues when NiFi is > > restarted, but I don't know the correct way to do this. I checked all > > NiFi's guide documentation, > > it seems there're 2 possible solutions: > > 1. write a custom notification service: a notification service could be > > notified when NiFi is restarted and then inside the service, delete all > the > > files inside content_repository, flowfile_repository and > > provenance_repository. > > I know there're now 2 existing services: email and http. But I'm not > > quite sure how to correctly write one and deploy it into my NiFi > > environment, is there a tutorial on writing one notification service? > > > > 2. I know from the developer guide that by using the annotation @Shutdown > > in a custom processor, the method could be called when NiFi is > successfully > > shut down. The problem with this approach is the method could > > not be guaranteed to be called when NiFi dies unexpectedly. > > > > Does anyone know what is the correct way to implement it? Thanks. > > > > Regards, > > Ben >
Re: clear all flowfiles in all queues upon NiFi restart
Hi Andrew, sorry I didn't follow your idea, could you please elaborate with more details? What I want to do is to be able to clear all the FlowFiles when NiFi dies unexpectedly and restarts itself. Regards, Ben 2018-01-12 12:44 GMT+08:00 Andrew Grande <apere...@gmail.com>: > Perhaps you could delete the repository directories when you need to > restart with no data? > > On Thu, Jan 11, 2018, 9:16 PM 尹文才 <batman...@gmail.com> wrote: > > > Hi Mark, forgot to ask about VolatileFlowFileRepository you mentioned, > if I > > switch to use VolatileFlowFileRepository, will NiFi swap out all the > other > > FlowFiles to disk if a queue is already full? > > Is it just simply keeping all FlowFiles in memory? > > > > Regards, > > Ben > > > > 2018-01-12 12:07 GMT+08:00 尹文才 <batman...@gmail.com>: > > > > > Thanks Mark, my case is that I'm using NiFi to do some ETL work and > it's > > > possible that NiFi dies unexpectedly due to lack of system resources. > > After > > > NiFi restarts itself, > > > I will re-extract all the data from database and re-perform all the > > > operations, so I need to clear all possible FlowFiles that might exist > in > > > any queue. > > > > > > Regards, > > > Ben > > > > > > 2018-01-12 11:49 GMT+08:00 Mark Payne <marka...@hotmail.com>: > > > > > >> Ben, > > >> > > >> I have to admit - that’s kind of an odd request :) I’m curious what > the > > >> use case is, if you can share? > > >> > > >> Regardless, the easiest way would be to update nifi.properties so that > > >> the FlowFile repo that is used is the VolatileFlowFileRepository. This > > >> would avoid writing the FlowFile state to disk, so ok restart you will > > lose > > >> all FlowFiles. The content will still be present, but nifi will delete > > it > > >> all on startup because there is no FlowFile associated with it. > > >> > > >> I’m on my phone right now so can’t easily tell you the exact name of > the > > >> property to change but you’ll probably find it pretty quickly. The > Admin > > >> Guide may well explain the different repositories as well. > > >> > > >> Thanks > > >> -Mark > > >> > > >> Sent from my iPhone > > >> > > >> > On Jan 11, 2018, at 10:31 PM, 尹文才 <batman...@gmail.com> wrote: > > >> > > > >> > Hi guys, I'm trying to clear all FlowFIles in all queues when NiFi > is > > >> > restarted, but I don't know the correct way to do this. I checked > all > > >> > NiFi's guide documentation, > > >> > it seems there're 2 possible solutions: > > >> > 1. write a custom notification service: a notification service could > > be > > >> > notified when NiFi is restarted and then inside the service, delete > > all > > >> the > > >> > files inside content_repository, flowfile_repository and > > >> > provenance_repository. > > >> > I know there're now 2 existing services: email and http. But I'm > not > > >> > quite sure how to correctly write one and deploy it into my NiFi > > >> > environment, is there a tutorial on writing one notification > service? > > >> > > > >> > 2. I know from the developer guide that by using the annotation > > >> @Shutdown > > >> > in a custom processor, the method could be called when NiFi is > > >> successfully > > >> > shut down. The problem with this approach is the method could > > >> > not be guaranteed to be called when NiFi dies unexpectedly. > > >> > > > >> > Does anyone know what is the correct way to implement it? Thanks. > > >> > > > >> > Regards, > > >> > Ben > > >> > > > > > > > > >
Re: The rate of the dataflow is exceeding the provenance recording rate. Slowing down flow to accommodate
Thanks Koji, I have already updated the logback configuration to produce more verbose logs. I was trying to reply to you with the verbose nifi logs but since I switched to use the WriteAheadProvenanceRepository implementation, up till now I haven't seen the error again. I will continue to check when the error might occur and post the logs here if needed. Once again thanks very much for your help. Regards, Ben 2017-12-25 15:37 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>: > Hi Ben, > > You can make NiFi log more verbose by editing: > NIFI_HOME/conf/logback.xml > > For example, adding following entry will reveal how NiFi repositories run: > > > > > > > Thanks, > Koji > > On Mon, Dec 25, 2017 at 4:30 PM, 尹文才 <batman...@gmail.com> wrote: > > Hi Koji, I also didn't find anything related to the unexpected shutdown > in > > my logs, is there anything I could do to make NIFI log more verbose > > information to the logs? > > > > Regards, > > Ben > > > > 2017-12-25 14:56 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>: > > > >> Hi Ben, > >> > >> I looked at the log and I expected to see some indication for the > >> cause of shutdown, but couldn't find any. > >> The PersistentProvenanceRepository rate warning is just a warning, and > >> it shouldn't be the trigger of an unexpected shutdown. I suspect other > >> reasons such as OOM killer, but I can't do any further investigation > >> with only these logs. > >> > >> Thanks, > >> Koji > >> > >> On Mon, Dec 25, 2017 at 3:46 PM, 尹文才 <batman...@gmail.com> wrote: > >> > Hi Koji, one more thing, do you have any idea why my first issue > leads to > >> > the unexpected shutdown of NIFI? according to the words, it will just > >> slow > >> > down the flow. thanks. > >> > > >> > Regards, > >> > Ben > >> > > >> > 2017-12-25 14:31 GMT+08:00 尹文才 <batman...@gmail.com>: > >> > > >> >> Hi Koji, thanks for your help, for the first issue, I will switch to > use > >> >> the WriteAheadProvenanceReopsitory implementation. > >> >> > >> >> For the second issue, I have uploaded the relevant part of my log > file > >> >> onto my google drive, the link is: > >> >> https://drive.google.com/open?id=1oxAkSUyYZFy6IWZSeWqHI8e9Utnw1XAj > >> >> > >> >> You mean a custom processor could possibly process a flowfile twice > only > >> >> when it's trying to commit the session but it's interrupted so the > >> flowfile > >> >> still remains inside the original queue(like NIFI went down)? > >> >> > >> >> If you need to see the full log file, please let me know, thanks. > >> >> > >> >> Regards, > >> >> Ben > >> >> > >> >> 2017-12-25 13:51 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>: > >> >> > >> >>> Hi Ben, > >> >>> > >> >>> For your 2nd issue, NiFi commits a process session in Processor > >> >>> onTrigger when it's executed by NiFi flow engine by calling > >> >>> session.commit(). > >> >>> https://github.com/apache/nifi/blob/master/nifi-api/src/main > >> >>> /java/org/apache/nifi/processor/AbstractProcessor.java#L28 > >> >>> Once a process session is committed, the FlowFile state (including > >> >>> which queue it is in) is persisted to disk. > >> >>> > >> >>> It's possible for a Processor to process the same FlowFile more than > >> >>> once, if it has done its job, but failed to commit the session. > >> >>> For example, if your custom processor created a temp table from a > >> >>> FlowFile. Then before the process session is committed, something > >> >>> happened and NiFi process session was rollback. In this case, the > >> >>> target database is already updated (the temp table is created), but > >> >>> NiFi FlowFile stays in the incoming queue. If the FlowFile is > >> >>> processed again, the processor will get an error indicating the > table > >> >>> already exists. > >> >>> > >> >>> I tried to look at the logs you attached, but attachments do not > seem > >> >>> to be delivered to this ML. I don't see anything attach
proper way in nifi to sync status between custom processors
Hi guys, I'm currently trying to find a proper way in nifi which could sync status between my custom processors. our requirement is like this, we're doing some ETL work using nifi and I'm extracting the data from DB into batches of FlowFiles(each batch of FlowFile has a flag FlowFile indicating the end of the batch). There're some groups of custom processors downstream that need to process these FlowFiles to do some business logic work. And we expect these processors to process one batch of FlowFiles at a time. Therefore we need to implement a custom Wait processor(let's just call it WaitBatch here) to hold all the other batches of FlowFiles while the business processors were handling the batch of FlowFiles whose creation time is earlier. In order to implement this, all the WaitBatch processors placed in the flow need to read/update records in a shared map so that each set of business-logic processors process one batch at a time. The entries are keyed using the batch number of the FlowFiles and the value of each entry is a batch release counter number which counts the number of times the batch of FlowFiles has passed through a WaitBatch processor. When a batch is released by WaitBatch, it will try to increment the batch number entry's value by 1 and then the released batch number and counter number will also be saved locally at the WaitBatch with StateManager; when the next batch reaches the WaitBatch, it will check if the counter value of the previous released batch number in the shared map is greater than the one saved locally, if the entry for the batch number does't exist(already removed) or the value in the shared map is greater, the next batch will be released and the local state and the entry on the shared map will be updated similarly. In the end of the flow, a custom processor will get the batch number from each batch and remove the entry from the shared map . So this implementation requires a shared map that could read/update frequently and atomically. I checked the Wait/Notify processors in NIFI and saw it is using the DistributedMapCacheClientService and DistributedMapCacheServer to sync status, so I'm wondering if I could use the DistributedMapCacheClientService to implement my logic. I also saw another implementation called RedisDistributedMapCacheClientService which seems to require Redis(I haven't used Redis). Thanks in advance for any suggestions. Regards, Ben
Re: proper way in nifi to sync status between custom processors
Thanks for your quick response, Koji, I haven't heard and seen anything about the NiFi record data model when I was reading the NiFi documentations,could you tell me where this model is documented? Thanks. By the way, to my knowledge, when you need to use the DistributedMapCacheServer from DistributedMapCacheClientService, you need to specify the host url for the server, this means inside a NiFi cluster when I specify the cache server and the node suddenly went down, I couldn't possibly use it until the node goes up again right? Is there currently such a cache server in NiFi that could support HA? Thanks. Regards, Ben 2017-12-26 18:34 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>: > Hi Ben, > > As you found from existing code, DistributedMapCache is used to share > state among different processors, and it can be used by your custom > processors, too. > However, I'd recommend to avoid such tight dependencies between > FlowFiles if possible, or minimize the part in flow that requires that > constraint at least for better performance and simplicity. > For example, since a FlowFile can hold fairly large amount of data, > you could merge all FlowFiles in a single FlowFile, instead of batches > of FlowFiles. If you need logical boundaries, you can use NiFi Record > data model to embed multiple records within a FlowFile, Record should > perform better. > > Hope this helps. > > Thanks, > Koji > > > On Tue, Dec 26, 2017 at 5:55 PM, 尹文才 <batman...@gmail.com> wrote: > > Hi guys, I'm currently trying to find a proper way in nifi which could > sync > > status between my custom processors. > > our requirement is like this, we're doing some ETL work using nifi and > I'm > > extracting the data from DB into batches of FlowFiles(each batch of > > FlowFile has a flag FlowFile indicating the end of the batch). > > There're some groups of custom processors downstream that need to process > > these FlowFiles to do some business logic work. And we expect these > > processors to process one batch of FlowFiles at a time. > > Therefore we need to implement a custom Wait processor(let's just call it > > WaitBatch here) to hold all the other batches of FlowFiles while the > > business processors were handling the batch of FlowFiles whose creation > > time is earlier. > > > > In order to implement this, all the WaitBatch processors placed in the > flow > > need to read/update records in a shared map so that each set of > > business-logic processors process one batch at a time. > > The entries are keyed using the batch number of the FlowFiles and the > value > > of each entry is a batch release counter number which counts the number > of > > times the batch of FlowFiles has passed through > > a WaitBatch processor. > > When a batch is released by WaitBatch, it will try to increment the batch > > number entry's value by 1 and then the released batch number and counter > > number will also be saved locally at the WaitBatch with StateManager; > > when the next batch reaches the WaitBatch, it will check if the counter > > value of the previous released batch number in the shared map is greater > > than the one saved locally, if the entry for the batch number does't > > exist(already removed) or the value in the shared map is greater, the > next > > batch will be released and the local state and the entry on the shared > map > > will be updated similarly. > > In the end of the flow, a custom processor will get the batch number from > > each batch and remove the entry from the shared map . > > > > So this implementation requires a shared map that could read/update > > frequently and atomically. I checked the Wait/Notify processors in NIFI > and > > saw it is using the DistributedMapCacheClientService and > > DistributedMapCacheServer to sync status, so I'm wondering if I could use > > the DistributedMapCacheClientService to implement my logic. I also saw > > another implementation called RedisDistributedMapCacheClientService > > which seems to require Redis(I haven't used Redis). Thanks in advance > for > > any suggestions. > > > > Regards, > > Ben >
Re: proper way in nifi to sync status between custom processors
Hi Koji, no problem. You could check the code of processor WaitBatch at the link: https://drive.google.com/open?id=1DMpW5GMiXpyZQdui989Rr3D9rlchQfWQ I also uploaded a snapshot of part of NiFi flow which includes the ExecuteSqlCommand and WaitBatch, you could check the picture at the link: https://drive.google.com/file/d/1vdxlWj8ANHQH0CMrXnydLni5o-3IVi2h/view You mentioned above that FlowFile repository fails checkpointing will affect other processors to process same FlowFile again, but as you could see from my snapshot image, the ExecuteSqlCommand is the second processor and before the WaitBatch processor, even if the FlowFile repository checkpointing failure is caused by WaitBatch, could it lead to the processors before it to process a FlowFile multiple times? Thanks. Regards, Ben 2017-12-27 12:36 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>: > Hi Ben, > > I was referring these two log messages in your previous email. > These two messages are both written by ExecuteSqlCommand, it does not > mean 'it was executed again'. > > ``` > 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1] > c.z.nifi.processors.ExecuteSqlCommand > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句: SELECT > TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM > dbo.ods_extractDataDebug; > alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column > _id; > > and it was executed again later: > > 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1] > c.z.nifi.processors.ExecuteSqlCommand > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] > 执行sql语句失败:SELECT > ``` > > As you written, the case where FlowFile repository fails checkpointing > will affect other processors to process same FlowFiles again. However > there won't be a simple solution to every processor to rollback its > job as different processors do different things. Creating a temp table > if not exist seems right approach to me. > > At the same time, the route cause of getting FlowFile repository > failed should be investigated. Is it possible to share WaitBatch code? > The reason why ask this is all 'FlowFile Repository failed to update' > is related to WaitBatch processor in the log that you shared earlier. > > Thanks, > Koji > > On Wed, Dec 27, 2017 at 1:19 PM, 尹文才 <batman...@gmail.com> wrote: > > Hi Koji, I will print the sql before actually executing it, but I checked > > the error log line you mentioned in your reply, this error was thrown by > > NiFi from within another processor called WaitBatch. > > I didn't find similar errors as the one from the ExecuteSqlCommand > > processor, I think it's because only the ExecuteSqlCommand is used to > > create temp database tables. > > You could check my ExecuteSqlCommand code via the link: > > https://drive.google.com/open?id=1NnjBihyKpmUPEH7X28Mh2hgOrhjSk_5P > > > > If the error is really caused by FlowFile repository checkpoint failure > and > > the flowfile was executed twice, I may have to create the temp table only > > if doesn't exist, I didn't fix this bug in this way > > right away is because I was afraid this fix could cover some other > problems. > > > > Thanks. > > > > Regards, > > Ben > > > > 2017-12-27 11:38 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>: > > > >> Hi Ben, > >> > >> The following two log messages are very close in terms of written > >> timestamp, but have different log level. > >> 2017-12-26 07:00:01,312 INFO > >> 2017-12-26 07:00:01,315 ERROR > >> > >> I guess those are logged within a single onTrigger of your > >> ExecuteSqlCommand custom processor, one is before executing, the other > >> is when it caught an exception. Just guessing as I don't have access > >> to the code. > >> > >> Does the same issue happen with other processors bundled with Apache > >> NiFi without your custom processor running? > >> > >> If NiFi fails to update/checkpoint FlowFile repository, then the same > >> FlowFile can be processed again after restarting NiFi. > >> > >> Thanks, > >> Koji > >> > >> > >> > >> On Wed, Dec 27, 2017 at 12:21 PM, 尹文才 <batman...@gmail.com> wrote: > >> > Thanks Koji, I will look into this article about the record model. > >> > > >> > By the way, that error I previously mentioned to you occurred again, I > >> > could see the sql query was executed twice in the log, this time I had > >> > turned on the verbose NiFi logging, the sql query is as below: > >> > > &
Re: proper way in nifi to sync status between custom processors
Thanks Koji, for the ExecuteSqlCommand issue, I was trying to re-execute the sql query if the connection is lost(connection could be unstable), my idea is to only transfer the FlowFile to the success relationship after successfully executing the sql query. You could see the do while loop in the code, the transaction will be rollbacked if the execution failed; if the connection is lost, it will retry to execute the sql. Will this logic cause my sql to be executed twice? For the WaitBatch processor, I will take your approach to test individually to see if the WaitBatch processor could cause the FlowFile repository checkpointing failure. Regards, Ben 2017-12-27 16:10 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>: > Hi Ben, > > Excuse me, I'm trying, but probably I don't fully understand what you > want to achieve with the flow. > > It looks weird that WaitBatch is failing with such FlowFile repository > error, while other processor such as ReplaceText succeeds. > I recommend to test WaitBatch alone first without combining the > database related processors, by feeding a test FlowFile having > expected FlowFile attributes. > Such input FlowFiles can be created by GenerateFlowFile processor. > If the same error happens with only WaitBatch processor, then it > should be easier to debug. > > Thanks, > Koji > > On Wed, Dec 27, 2017 at 4:49 PM, Koji Kawamura <ijokaruma...@gmail.com> > wrote: > > Hi Ben, > > > > The one thing that looks strange in the screenshot is the > > ExecuteSqlCommand having FlowFiles queued in its incoming connection. > > Those should be transferred to 'failure' relationship. > > > > Following executeSql() method, shouldn't it re-throw the caught > exception? > > > > > > try (Connection con = dbcpService.getConnection()) { > > logger.debug("设置autoCommit为false"); > > con.setAutoCommit(false); > > > > try (Statement stmt = con.createStatement()) { > > logger.info("执行sql语句: {}", new Object[]{sql}); > > stmt.execute(sql); > > > > // 所有sql语句执行在一个transaction内 > > logger.debug("提交transaction"); > > con.commit(); > > } catch (Exception ex) { > > logger.error("执行sql语句失败:{}", new Object[]{sql, ex}); > > con.rollback(); > > //将exception抛到外层处理 > > throw ex; > > } finally { > > logger.debug("重新设置autoCommit为true"); > > con.setAutoCommit(true); > > } > > } catch (Exception ex) { > > // HERE, the exception is swallowed, that's why the FlowFiles stay in > > the incoming connection. > > logger.error("重试执行sql语句:{}", new Object[]{sql, ex}); > > retryOnFail = true; > > } > > > > Thanks, > > Koji > > > > On Wed, Dec 27, 2017 at 2:38 PM, 尹文才 <batman...@gmail.com> wrote: > >> Hi Koji, no problem. You could check the code of processor WaitBatch at > the > >> link: > >> https://drive.google.com/open?id=1DMpW5GMiXpyZQdui989Rr3D9rlchQfWQ > >> > >> I also uploaded a snapshot of part of NiFi flow which includes the > >> ExecuteSqlCommand and WaitBatch, you could check the picture at the > link: > >> https://drive.google.com/file/d/1vdxlWj8ANHQH0CMrXnydLni5o-3IVi2h/view > >> > >> You mentioned above that FlowFile repository fails checkpointing will > >> affect other processors to process same FlowFile again, but as you could > >> see from my snapshot image, the ExecuteSqlCommand is the second > processor > >> and before the WaitBatch processor, even if the FlowFile repository > >> checkpointing failure is caused by WaitBatch, could it lead to the > >> processors before it to process a FlowFile multiple times? Thanks. > >> > >> Regards, > >> Ben > >> > >> 2017-12-27 12:36 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>: > >> > >>> Hi Ben, > >>> > >>> I was referring these two log messages in your previous email. > >>> These two messages are both written by ExecuteSqlCommand, it does not > >>> mean 'it was executed again'. > >>> > >>> ``` > >>> 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1] > >>> c.z.nifi.processors.ExecuteSqlCommand > >>>
Re: proper way in nifi to sync status between custom processors
Thanks Koji, I will look into this article about the record model. By the way, that error I previously mentioned to you occurred again, I could see the sql query was executed twice in the log, this time I had turned on the verbose NiFi logging, the sql query is as below: 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1] c.z.nifi.processors.ExecuteSqlCommand ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句: SELECT TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM dbo.ods_extractDataDebug; alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column _id; and it was executed again later: 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1] c.z.nifi.processors.ExecuteSqlCommand ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句失败:SELECT TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM dbo.ods_extractDataDebug; alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column _id;: com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为 'ods_extractDataDebug_20171226031801926_9195' 的对象。 com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为 'ods_extractDataDebug_20171226031801926_9195' 的对象。 at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:217) at com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1655) at com.microsoft.sqlserver.jdbc.SQLServerStatement.doExecuteStatement(SQLServerStatement.java:885) at com.microsoft.sqlserver.jdbc.SQLServerStatement$StmtExecCmd.doExecute(SQLServerStatement.java:778) at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7505) at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:2445) at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:191) at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:166) at com.microsoft.sqlserver.jdbc.SQLServerStatement.execute(SQLServerStatement.java:751) at org.apache.commons.dbcp.DelegatingStatement.execute(DelegatingStatement.java:264) at org.apache.commons.dbcp.DelegatingStatement.execute(DelegatingStatement.java:264) at com.zjrealtech.nifi.processors.ExecuteSqlCommand.executeSql(ExecuteSqlCommand.java:194) at com.zjrealtech.nifi.processors.ExecuteSqlCommand.onTrigger(ExecuteSqlCommand.java:164) at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1119) at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147) at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) I also saw a lot of NiFi's exception like "ProcessException: FlowFile Repository failed to update", not sure if this is the reason the FlowFile got processed twice. Could you help to take a look at my log file? Thanks. You could get the log file via the link: https://drive.google.com/file/d/1uVgtAVNEHxAbAPEpNTOWq_N9Xu6zMEi3/view Best Regards, Ben 2017-12-27 10:00 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>: > Hi Ben, > > This blog post written by Mark, would be a good starting point to get > familiar with NiFi Record model. > https://blogs.apache.org/nifi/entry/record-oriented-data-with-nifi > > HA for DistributedMapCacheClientService and DistributedMapCacheServer > pair is not supported at the moment. If you need HighAvailability, > RedisDistributedMapCacheClientService with Redis replication will > provide that, I haven't tried that myself though. > https://redis.io/topics/replication > > Thanks, > Koji > > On Tue, Dec 26, 2017 at 7:58 PM, 尹文才 <batman...@gmail.com> wrote: > > Thanks for your quick response, Koji, I haven't heard and seen anything > > about the NiFi record data model when I was reading the NiFi > > documentations,could you tell me where this model is documented? Thanks. > > > > By the way, to my knowledge, when you need to use the > DistributedMapCacheServer > > from DistributedMapCacheClientService, you need to specify the hos
Re: proper way in nifi to sync status between custom processors
Hi Koji, I will print the sql before actually executing it, but I checked the error log line you mentioned in your reply, this error was thrown by NiFi from within another processor called WaitBatch. I didn't find similar errors as the one from the ExecuteSqlCommand processor, I think it's because only the ExecuteSqlCommand is used to create temp database tables. You could check my ExecuteSqlCommand code via the link: https://drive.google.com/open?id=1NnjBihyKpmUPEH7X28Mh2hgOrhjSk_5P If the error is really caused by FlowFile repository checkpoint failure and the flowfile was executed twice, I may have to create the temp table only if doesn't exist, I didn't fix this bug in this way right away is because I was afraid this fix could cover some other problems. Thanks. Regards, Ben 2017-12-27 11:38 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>: > Hi Ben, > > The following two log messages are very close in terms of written > timestamp, but have different log level. > 2017-12-26 07:00:01,312 INFO > 2017-12-26 07:00:01,315 ERROR > > I guess those are logged within a single onTrigger of your > ExecuteSqlCommand custom processor, one is before executing, the other > is when it caught an exception. Just guessing as I don't have access > to the code. > > Does the same issue happen with other processors bundled with Apache > NiFi without your custom processor running? > > If NiFi fails to update/checkpoint FlowFile repository, then the same > FlowFile can be processed again after restarting NiFi. > > Thanks, > Koji > > > > On Wed, Dec 27, 2017 at 12:21 PM, 尹文才 <batman...@gmail.com> wrote: > > Thanks Koji, I will look into this article about the record model. > > > > By the way, that error I previously mentioned to you occurred again, I > > could see the sql query was executed twice in the log, this time I had > > turned on the verbose NiFi logging, the sql query is as below: > > > > 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1] > > c.z.nifi.processors.ExecuteSqlCommand > > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句: > SELECT > > TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM > > dbo.ods_extractDataDebug; > > alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column > _id; > > > > and it was executed again later: > > > > 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1] > > c.z.nifi.processors.ExecuteSqlCommand > > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] > 执行sql语句失败:SELECT > > TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM > > dbo.ods_extractDataDebug; > > alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column > > _id;: com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为 > > 'ods_extractDataDebug_20171226031801926_9195' 的对象。 > > com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为 > > 'ods_extractDataDebug_20171226031801926_9195' 的对象。 > > at > > com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError( > SQLServerException.java:217) > > at > > com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult( > SQLServerStatement.java:1655) > > at > > com.microsoft.sqlserver.jdbc.SQLServerStatement.doExecuteStatement( > SQLServerStatement.java:885) > > at > > com.microsoft.sqlserver.jdbc.SQLServerStatement$StmtExecCmd.doExecute( > SQLServerStatement.java:778) > > at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7505) > > at > > com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand( > SQLServerConnection.java:2445) > > at > > com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand( > SQLServerStatement.java:191) > > at > > com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement( > SQLServerStatement.java:166) > > at > > com.microsoft.sqlserver.jdbc.SQLServerStatement.execute( > SQLServerStatement.java:751) > > at > > org.apache.commons.dbcp.DelegatingStatement.execute( > DelegatingStatement.java:264) > > at > > org.apache.commons.dbcp.DelegatingStatement.execute( > DelegatingStatement.java:264) > > at > > com.zjrealtech.nifi.processors.ExecuteSqlCommand. > executeSql(ExecuteSqlCommand.java:194) > > at > > com.zjrealtech.nifi.processors.ExecuteSqlCommand. > onTrigger(ExecuteSqlCommand.java:164) > > at > > org.apache.nifi.processor.AbstractProcessor.onTrigger( > AbstractProcessor.java:27) > > at > > org.apache.nifi.controller.StandardProcessorNode.onTrigger( > StandardProcessorNode.java:1119) > > at > > org.apache.nifi.controller.
Re: proper way in nifi to sync status between custom processors
Hi Koji, I saw it was only showing the 1000 events so I couldn't see the event when the FlowFile was created. Regards, Ben 2017-12-27 17:21 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>: > I see, thanks. The easiest way to look at provenance events would be > by right clicking a processor instance you are interested in, then > select 'View data provenance' context menu. This way, NiFi displays > provenance events for the selected processor. > > Koji > > On Wed, Dec 27, 2017 at 6:17 PM, 尹文才 <batman...@gmail.com> wrote: > > Hi Koji, sorry about the provenance exception, it was because there's no > > space left on the machine(filled up with logs) > > > > Regards, > > Ben > > > > 2017-12-27 17:11 GMT+08:00 尹文才 <batman...@gmail.com>: > > > >> Hi Koji, thanks, the names of the temp tables are created with format > >> "MMddHHmmssSSS-", the first time indicates the time and the > second > >> part is a random number with length of 4. > >> So I think it's not possible to have 2 duplicate table names, the only > >> possibly I could think is the flowfile is passed into the processor > twice. > >> > >> About the provenance, I had updated to use the > >> WriteAheadProvenanceRepository implementation, but when I tried to check > >> the data provenance, it showed me the following exception message: > >> HTTP ERROR 500 > >> > >> Problem accessing /nifi/provenance. Reason: > >> > >> Server Error > >> > >> Caused by: > >> > >> javax.servlet.ServletException: org.eclipse.jetty.servlet.ServletHolder$1: > java.lang.NullPointerException > >> at org.eclipse.jetty.server.handler.HandlerCollection. > handle(HandlerCollection.java:138) > >> at org.eclipse.jetty.server.handler.gzip.GzipHandler. > handle(GzipHandler.java:561) > >> at org.eclipse.jetty.server.handler.HandlerWrapper.handle( > HandlerWrapper.java:132) > >> at org.eclipse.jetty.server.Server.handle(Server.java:564) > >> at org.eclipse.jetty.server.HttpChannel.handle( > HttpChannel.java:320) > >> at org.eclipse.jetty.server.HttpConnection.onFillable( > HttpConnection.java:251) > >> at org.eclipse.jetty.io.AbstractConnection$ > ReadCallback.succeeded(AbstractConnection.java:279) > >> at org.eclipse.jetty.io.FillInterest.fillable( > FillInterest.java:110) > >> at org.eclipse.jetty.io.ssl.SslConnection.onFillable( > SslConnection.java:258) > >> at org.eclipse.jetty.io.ssl.SslConnection$3.succeeded( > SslConnection.java:147) > >> at org.eclipse.jetty.io.FillInterest.fillable( > FillInterest.java:110) > >> at org.eclipse.jetty.io.ChannelEndPoint$2.run( > ChannelEndPoint.java:124) > >> at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob( > QueuedThreadPool.java:672) > >> at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run( > QueuedThreadPool.java:590) > >> at java.lang.Thread.run(Thread.java:745) > >> Caused by: org.eclipse.jetty.servlet.ServletHolder$1: > java.lang.NullPointerException > >> at org.eclipse.jetty.servlet.ServletHolder.makeUnavailable( > ServletHolder.java:596) > >> at org.eclipse.jetty.servlet.ServletHolder.initServlet( > ServletHolder.java:655) > >> at org.eclipse.jetty.servlet.ServletHolder.getServlet( > ServletHolder.java:498) > >> at org.eclipse.jetty.servlet.ServletHolder.ensureInstance( > ServletHolder.java:785) > >> at org.eclipse.jetty.servlet.ServletHolder.prepare( > ServletHolder.java:770) > >> at org.eclipse.jetty.servlet.ServletHandler.doHandle( > ServletHandler.java:538) > >> at org.eclipse.jetty.server.handler.ScopedHandler.handle( > ScopedHandler.java:143) > >> at org.eclipse.jetty.security.SecurityHandler.handle( > SecurityHandler.java:548) > >> at org.eclipse.jetty.server.handler.HandlerWrapper.handle( > HandlerWrapper.java:132) > >> at org.eclipse.jetty.server.handler.ScopedHandler. > nextHandle(ScopedHandler.java:190) > >> at org.eclipse.jetty.server.session.SessionHandler. > doHandle(SessionHandler.java:1593) > >> at org.eclipse.jetty.server.handler.ScopedHandler. > nextHandle(ScopedHandler.java:188) > >> at org.eclipse.jetty.server.handler.ContextHandler. > doHandle(ContextHandler.java:1239) > >> at org.eclipse.jetty.server.handler.ScopedHandler. > nextScope(ScopedHandler.java:168) > >> at org.eclipse.jetty.servlet.S
Re: proper way in nifi to sync status between custom processors
Hi Koji, sorry about the provenance exception, it was because there's no space left on the machine(filled up with logs) Regards, Ben 2017-12-27 17:11 GMT+08:00 尹文才 <batman...@gmail.com>: > Hi Koji, thanks, the names of the temp tables are created with format > "MMddHHmmssSSS-", the first time indicates the time and the second > part is a random number with length of 4. > So I think it's not possible to have 2 duplicate table names, the only > possibly I could think is the flowfile is passed into the processor twice. > > About the provenance, I had updated to use the > WriteAheadProvenanceRepository implementation, but when I tried to check > the data provenance, it showed me the following exception message: > HTTP ERROR 500 > > Problem accessing /nifi/provenance. Reason: > > Server Error > > Caused by: > > javax.servlet.ServletException: org.eclipse.jetty.servlet.ServletHolder$1: > java.lang.NullPointerException > at > org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:138) > at > org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:561) > at > org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132) > at org.eclipse.jetty.server.Server.handle(Server.java:564) > at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320) > at > org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251) > at > org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:279) > at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110) > at > org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:258) > at > org.eclipse.jetty.io.ssl.SslConnection$3.succeeded(SslConnection.java:147) > at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110) > at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:124) > at > org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:672) > at > org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:590) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.eclipse.jetty.servlet.ServletHolder$1: > java.lang.NullPointerException > at > org.eclipse.jetty.servlet.ServletHolder.makeUnavailable(ServletHolder.java:596) > at > org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:655) > at > org.eclipse.jetty.servlet.ServletHolder.getServlet(ServletHolder.java:498) > at > org.eclipse.jetty.servlet.ServletHolder.ensureInstance(ServletHolder.java:785) > at > org.eclipse.jetty.servlet.ServletHolder.prepare(ServletHolder.java:770) > at > org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:538) > at > org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143) > at > org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:548) > at > org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132) > at > org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:190) > at > org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1593) > at > org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:188) > at > org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1239) > at > org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:168) > at > org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:481) > at > org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1562) > at > org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:166) > at > org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1141) > at > org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) > at > org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:118) > at > org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:561) > at > org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132) > at org.eclipse.jetty.server.Server.handle(Server.java:564) > at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320) > at > org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251) > at > org.eclipse.jetty.io.AbstractConnection$Rea
Re: proper way in nifi to sync status between custom processors
/html/user-guide. > html#viewing-flowfile-lineage > > I didn't mention about it earlier because you were having Provenance > repository performance issue, but I hope you can use it now with the > WriteAheadProvenanceRepository. > > Thanks, > Koji > > On Wed, Dec 27, 2017 at 5:44 PM, 尹文才 <batman...@gmail.com> wrote: > > Thanks Koji, for the ExecuteSqlCommand issue, I was trying to re-execute > > the sql query if the connection is lost(connection could be unstable), my > > idea is to only transfer the FlowFile to the success relationship > > after successfully executing the sql query. You could see the do while > loop > > in the code, the transaction will be rollbacked if the execution failed; > if > > the connection is lost, it will retry to execute the sql. > > Will this logic cause my sql to be executed twice? > > > > For the WaitBatch processor, I will take your approach to test > individually > > to see if the WaitBatch processor could cause the FlowFile repository > > checkpointing failure. > > > > Regards, > > Ben > > > > 2017-12-27 16:10 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>: > > > >> Hi Ben, > >> > >> Excuse me, I'm trying, but probably I don't fully understand what you > >> want to achieve with the flow. > >> > >> It looks weird that WaitBatch is failing with such FlowFile repository > >> error, while other processor such as ReplaceText succeeds. > >> I recommend to test WaitBatch alone first without combining the > >> database related processors, by feeding a test FlowFile having > >> expected FlowFile attributes. > >> Such input FlowFiles can be created by GenerateFlowFile processor. > >> If the same error happens with only WaitBatch processor, then it > >> should be easier to debug. > >> > >> Thanks, > >> Koji > >> > >> On Wed, Dec 27, 2017 at 4:49 PM, Koji Kawamura <ijokaruma...@gmail.com> > >> wrote: > >> > Hi Ben, > >> > > >> > The one thing that looks strange in the screenshot is the > >> > ExecuteSqlCommand having FlowFiles queued in its incoming connection. > >> > Those should be transferred to 'failure' relationship. > >> > > >> > Following executeSql() method, shouldn't it re-throw the caught > >> exception? > >> > > >> > > >> > try (Connection con = dbcpService.getConnection()) { > >> > logger.debug("设置autoCommit为false"); > >> > con.setAutoCommit(false); > >> > > >> > try (Statement stmt = con.createStatement()) { > >> > logger.info("执行sql语句: {}", new Object[]{sql}); > >> > stmt.execute(sql); > >> > > >> > // 所有sql语句执行在一个transaction内 > >> > logger.debug("提交transaction"); > >> > con.commit(); > >> > } catch (Exception ex) { > >> > logger.error("执行sql语句失败:{}", new Object[]{sql, > ex}); > >> > con.rollback(); > >> > //将exception抛到外层处理 > >> > throw ex; > >> > } finally { > >> > logger.debug("重新设置autoCommit为true"); > >> > con.setAutoCommit(true); > >> > } > >> > } catch (Exception ex) { > >> > // HERE, the exception is swallowed, that's why the FlowFiles stay in > >> > the incoming connection. > >> > logger.error("重试执行sql语句:{}", new Object[]{sql, ex}); > >> > retryOnFail = true; > >> > } > >> > > >> > Thanks, > >> > Koji > >> > > >> > On Wed, Dec 27, 2017 at 2:38 PM, 尹文才 <batman...@gmail.com> wrote: > >> >> Hi Koji, no problem. You could check the code of processor WaitBatch > at > >> the > >> >> link: > >> >> https://drive.google.com/open?id=1DMpW5GMiXpyZQdui989Rr3D9rlchQfWQ > >> >> > >> >> I also uploaded a snapshot of part of NiFi flow which includes the > >> >> ExecuteSqlCommand and WaitBatch, you could check the picture at the > >> link: > >> >> https://drive.google.com/file/d/1vdxlWj8ANHQH0CMrXnyd
get access token inside custom processor
Hi guys, I'm trying to invoke some nifi rest apis inside my custom processor, the nifi I'm using is nifi 1.4.0 and it's a 3 node secured cluster., the username and password are kept inside a ldap server. I know that in a secured nifi cluster, in order to make any request I need the access token, my question is how could I get the access token in my custom processor? Thanks. (I think the token should be available somewhere after successful login right?) regards, ben
Is there a configuration to limit the size of nifi's flowfile repository
Hi guys, I checked NIFI's system administrator guide trying to find a configuration item so that the size of the flowfile repository could be limited similar to the other repositories(e.g. content repository), but I didn't find such configuration items, is there currently any configuration to limit the flowfile repository's size? thanks. Regards, Ben
Re: get access token inside custom processor
Thanks Bryan I got it, I thought the processor would be running as the user that had logged into the NIFI web UI. Regards, Ben 2018-02-27 22:31 GMT+08:00 Bryan Bende <bbe...@gmail.com>: > Hello, > > Your custom processor would be the same as if you were writing an > external client program. > > You would need to provide the processor with a username and password > in the processor properties, and then it would need to make a call to > the token REST end-point. > > Processors don't run as the user from the web UI, they run on behalf > of the NiFi framework and have no idea which user started/stopped > them. > > Thanks, > > Bryan > > On Tue, Feb 27, 2018 at 1:27 AM, 尹文才 <batman...@gmail.com> wrote: > > Hi guys, I'm trying to invoke some nifi rest apis inside my custom > > processor, the nifi I'm using is nifi 1.4.0 and it's a 3 node secured > > cluster., the username and password are kept inside a ldap server. > > I know that in a secured nifi cluster, in order to make any request I > need > > the access token, my question is how could I get the access token in my > > custom processor? Thanks. (I think the token should be > > available somewhere after successful login right?) > > > > regards, > > ben >