Re: Is there a configuration to limit the size of nifi's flowfile repository

2018-04-26 Thread
hi guys, thanks for all your answers, I actually have seen that the
flowfile repo in one of our openstack centos 7 machine grew up to abour 30
GB, which as a result used up all the disk space allocated for the virtual
machine and the flow inside
NIFI couldn't proceed and many errors started to appear such as fail to
checkpoint, etc.We used NIFI now as a ETL tool to extract some data from
sql server for data analysis.
I actually have no idea why the flowfile repo would grow up like this, in
my idea it is only used to place all flowfile attributes. It would be great
if there're some options to limit the flowfile repo size.

Thanks.
Regard,
Ben

2018-04-26 2:08 GMT+08:00 Brandon DeVries <b...@jhu.edu>:

> All,
>
> This is something I think we shouldn't dismiss so easily.  While the
> FlowFile repo is lighter than the content repo, allowing it to grow too
> large can cause major problems.
>
> Specifically, an "overgrown" FlowFile repo may prevent a NiFi instance from
> coming back up after a restart due to the way in which records are held in
> memory.  If there is more memory available to give to the JVM, this can
> sometimes be worked around... but if there isn't you may just be out of
> luck.  For that matter, allowing the FlowFile repo to grow so large that it
> consumes all the heap isn't going to be good for system health in general
> (OOM is probably never where you want to be...).
>
> To Pierre's point "you don't want to limit that repository in size since it
> would prevent the workflows to create new flow files"... that's exactly why
> I would want to limit the size of the repo.  You do then get into questions
> of how exactly to do this.  For example, you may not want to simply block
> all transactions that create a FlowFile, because it may remove even more
> (e.g. MergeContent).  Additionally, you have to be concerned about
> deadlocks (e.g. a "Wait" that hangs forever because its "Notify" is being
> starved).  Or, perhaps that's all you can do... freeze everything at some
> threshold prior to actual damage being done, and alert operators that
> manual intervention is necessary (e.g. bring up the graph with
> autoResume=false, and bleed off data in a controlled fashion).
>
> In summary, I believe this is a problem.  Even if it doesn't come up often,
> when it does it is significant.  While the solution likely isn't simple,
> it's worth putting some thought towards.
>
> Brandon
>
> On Wed, Apr 25, 2018 at 9:43 AM Sivaprasanna <sivaprasanna...@gmail.com>
> wrote:
>
> > No, he actually had mentioned “like content repository”. The answer is,
> > there aren’t any properties that support this, AFAIK. Pierre’s response
> > pretty much sums up why there aren’t any properties.
> >
> > Thanks,
> > Sivaprasanna
> >
> > On Wed, 25 Apr 2018 at 7:10 PM, Mike Thomsen <mikerthom...@gmail.com>
> > wrote:
> >
> > > I have a feeling that what Ben meant was how to limit the content
> > > repository size.
> > >
> > > On Wed, Apr 25, 2018 at 8:26 AM Pierre Villard <
> > > pierre.villard...@gmail.com>
> > > wrote:
> > >
> > > > Hi Ben,
> > > >
> > > > Since the flow file repository contains the information of the flow
> > files
> > > > currently being processed by NiFi, you don't want to limit that
> > > repository
> > > > in size since it would prevent the workflows to create new flow
> files.
> > > >
> > > > Besides this repository is very lightweight, I'm not sure it'd need
> to
> > be
> > > > limited in size.
> > > > Do you have a specific use case in mind?
> > > >
> > > > Pierre
> > > >
> > > >
> > > > 2018-04-25 9:15 GMT+02:00 尹文才 <batman...@gmail.com>:
> > > >
> > > > > Hi guys, I checked NIFI's system administrator guide trying to
> find a
> > > > > configuration item so that the size of the flowfile repository
> could
> > be
> > > > > limited similar to the other repositories(e.g. content repository),
> > > but I
> > > > > didn't find such configuration items, is there currently any
> > > > configuration
> > > > > to limit the flowfile repository's size? thanks.
> > > > >
> > > > > Regards,
> > > > > Ben
> > > > >
> > > >
> > >
> >
>


Is there a configuration to limit the size of nifi's flowfile repository

2018-04-25 Thread
Hi guys, I checked NIFI's system administrator guide trying to find a
configuration item so that the size of the flowfile repository could be
limited similar to the other repositories(e.g. content repository), but I
didn't find such configuration items, is there currently any configuration
to limit the flowfile repository's size? thanks.

Regards,
Ben


Re: get access token inside custom processor

2018-02-27 Thread
Thanks Bryan I got it, I thought the processor would be running as the user
that had logged into the NIFI web UI.

Regards,
Ben

2018-02-27 22:31 GMT+08:00 Bryan Bende <bbe...@gmail.com>:

> Hello,
>
> Your custom processor would be the same as if you were writing an
> external client program.
>
> You would need to provide the processor with a username and password
> in the processor properties, and then it would need to make a call to
> the token REST end-point.
>
> Processors don't run as the user from the web UI, they run on behalf
> of the NiFi framework and have no idea which user started/stopped
> them.
>
> Thanks,
>
> Bryan
>
> On Tue, Feb 27, 2018 at 1:27 AM, 尹文才 <batman...@gmail.com> wrote:
> > Hi guys, I'm trying to invoke some nifi rest apis inside my custom
> > processor, the nifi I'm using is nifi 1.4.0 and it's a 3 node secured
> > cluster., the username and password are kept inside a ldap server.
> > I know that in a secured nifi cluster, in order to make any request I
> need
> > the access token, my question is how could I get the access token in my
> > custom processor? Thanks. (I think the token should be
> > available somewhere after successful login right?)
> >
> > regards,
> > ben
>


get access token inside custom processor

2018-02-26 Thread
Hi guys, I'm trying to invoke some nifi rest apis inside my custom
processor, the nifi I'm using is nifi 1.4.0 and it's a 3 node secured
cluster., the username and password are kept inside a ldap server.
I know that in a secured nifi cluster, in order to make any request I need
the access token, my question is how could I get the access token in my
custom processor? Thanks. (I think the token should be
available somewhere after successful login right?)

regards,
ben


Unable to start NiFi

2018-01-19 Thread
Hi guys, one of my colleagues was working on a remote deploy tool trying to
start NiFi remotely on a windows system(NiFi package is on the windows
system).
I don't know how exactly the deployment tool works, but the basic flow is
to launch Ansible locally which uses winrm to remotely
launch NiFi through powershell. Unfortunately NiFi couldn't be started. But
when he remotely logged into the windows system
and manually ran NiFi on the system(through double clicking the
run-nifi.bat file), NiFi started without any problem. I checked the logs
written by NiFi about the start failure and it appears to me to be a memory
problem, but I'm not sure if it is the root cause and don't know
how to solve the problem. Could anyone help me out of this problem? Thanks.
The 2 created logs are as below:

1. nifi-bootstrap.log:
2018-01-19 17:12:48,657 INFO [main] o.a.n.b.NotificationServiceManager
Successfully loaded the following 0 services: []
2018-01-19 17:12:48,673 INFO [main] org.apache.nifi.bootstrap.RunNiFi
Registered no Notification Services for Notification Type NIFI_STARTED
2018-01-19 17:12:48,673 INFO [main] org.apache.nifi.bootstrap.RunNiFi
Registered no Notification Services for Notification Type NIFI_STOPPED
2018-01-19 17:12:48,673 INFO [main] org.apache.nifi.bootstrap.RunNiFi
Registered no Notification Services for Notification Type NIFI_DIED
2018-01-19 17:12:48,688 INFO [main] org.apache.nifi.bootstrap.Command
Starting Apache NiFi...
2018-01-19 17:12:48,688 INFO [main] org.apache.nifi.bootstrap.Command
Working Directory: C:\ETL\Nifi\NIFI-1~1.0
2018-01-19 17:12:48,688 INFO [main] org.apache.nifi.bootstrap.Command
Command: C:\Program Files\Java\jdk1.8.0_152\bin\java.exe -classpath
C:\ETL\Nifi\NIFI-1~1.0\.\conf;C:\ETL\Nifi\NIFI-1~1.0\.\lib\javax.servlet-api-3.1.0.jar;C:\ETL\Nifi\NIFI-1~1.0\.\lib\jcl-over-slf4j-1.7.25.jar;C:\ETL\Nifi\NIFI-1~1.0\.\lib\jetty-schemas-3.1.jar;C:\ETL\Nifi\NIFI-1~1.0\.\lib\jul-to-slf4j-1.7.25.jar;C:\ETL\Nifi\NIFI-1~1.0\.\lib\log4j-over-slf4j-1.7.25.jar;C:\ETL\Nifi\NIFI-1~1.0\.\lib\logback-classic-1.2.3.jar;C:\ETL\Nifi\NIFI-1~1.0\.\lib\logback-core-1.2.3.jar;C:\ETL\Nifi\NIFI-1~1.0\.\lib\nifi-api-1.4.0.jar;C:\ETL\Nifi\NIFI-1~1.0\.\lib\nifi-framework-api-1.4.0.jar;C:\ETL\Nifi\NIFI-1~1.0\.\lib\nifi-nar-utils-1.4.0.jar;C:\ETL\Nifi\NIFI-1~1.0\.\lib\nifi-properties-1.4.0.jar;C:\ETL\Nifi\NIFI-1~1.0\.\lib\nifi-runtime-1.4.0.jar;C:\ETL\Nifi\NIFI-1~1.0\.\lib\slf4j-api-1.7.25.jar
-Dorg.apache.jasper.compiler.disablejsr199=true -Xms2048m -Xms2048m
-Djava.security.egd=file:/dev/urandom
-Dsun.net.http.allowRestrictedHeaders=true -Djava.net.preferIPv4Stack=true
-Djava.awt.headless=true -XX:+UseG1GC
-Djava.protocol.handler.pkgs=sun.net.www.protocol
-Dnifi.properties.file.path=C:\ETL\Nifi\NIFI-1~1.0\.\conf\nifi.properties
-Dnifi.bootstrap.listen.port=49572 -Dapp=NiFi
-Dorg.apache.nifi.bootstrap.config.log.dir=C:\ETL\Nifi\NIFI-1~1.0\bin\..\\..\logs
org.apache.nifi.NiFi
2018-01-19 17:12:49,657 INFO [NiFi logging handler] org.apache.nifi.StdOut #
2018-01-19 17:12:49,657 INFO [NiFi logging handler] org.apache.nifi.StdOut
# There is insufficient memory for the Java Runtime Environment to continue.
2018-01-19 17:12:49,657 INFO [NiFi logging handler] org.apache.nifi.StdOut
# Native memory allocation (mmap) failed to map 2147483648 bytes for Failed
to commit area from 0x0006c000 to 0x00074000 of length
2147483648.
2018-01-19 17:12:49,657 INFO [NiFi logging handler] org.apache.nifi.StdOut
# An error report file with more information is saved as:
2018-01-19 17:12:49,657 INFO [NiFi logging handler] org.apache.nifi.StdOut
# C:\ETL\Nifi\NIFI-1~1.0\hs_err_pid3448.log
2018-01-19 17:12:49,657 ERROR [NiFi logging handler] org.apache.nifi.StdErr
Picked up JAVA_TOOL_OPTIONS: "-Dfile.encoding=UTF8"
2018-01-19 17:12:49,657 ERROR [NiFi logging handler] org.apache.nifi.StdErr
Java HotSpot(TM) 64-Bit Server VM warning: INFO:
os::commit_memory(0x0006c000, 2147483648, 0) failed;
error='ҳ���ļ�̫С���޷���ɲ�' (DOS error/errno=1455)
2018-01-19 17:12:50,876 WARN [main] org.apache.nifi.bootstrap.Command
Failed to set permissions so that only the owner can read pid file
C:\ETL\Nifi\NIFI-1~1.0\bin\..\run\nifi.pid; this may allows others to have
access to the key needed to communicate with NiFi. Permissions should be
changed so that only the owner can read this file
2018-01-19 17:12:53,816 WARN [main] org.apache.nifi.bootstrap.Command
Failed to set permissions so that only the owner can read status file
C:\ETL\Nifi\NIFI-1~1.0\bin\..\run\nifi.status; this may allows others to
have access to the key needed to communicate with NiFi. Permissions should
be changed so that only the owner can read this file
2018-01-19 17:12:55,798 INFO [main] org.apache.nifi.bootstrap.Command
Launched Apache NiFi with Process ID 3448
2018-01-19 17:12:55,798 INFO [main] org.apache.nifi.bootstrap.RunNiFi NiFi
never started. Will not restart NiFi


2. hs_err_pid3448.log:

#
# There is insufficient memory for the Java Runtime Environment 

Re: clear all flowfiles in all queues upon NiFi restart

2018-01-12 Thread
Thanks Mark, Andrew and Russell, I think using the Volatile-implementations
repositories mentioned by Mark should be sufficient for me.

Regards,
Ben

2018-01-12 22:59 GMT+08:00 Russell Bateman <r...@windofkeltia.com>:

> Andrew just meant that if you smoke the contents of all the repository
> subdirectories under ${NIFI_ROOT}, it will result in what you seem to be
> asking for.
>
> Hope this helps.
>
>
> On 01/11/2018 09:48 PM, 尹文才 wrote:
>
>> Hi Andrew, sorry I didn't follow your idea, could you please elaborate
>> with
>> more details?
>> What I want to do is to be able to clear all the FlowFiles when NiFi dies
>> unexpectedly and restarts itself.
>>
>> Regards,
>> Ben
>>
>> 2018-01-12 12:44 GMT+08:00 Andrew Grande <apere...@gmail.com>:
>>
>> Perhaps you could delete the repository directories when you need to
>>> restart with no data?
>>>
>>> On Thu, Jan 11, 2018, 9:16 PM 尹文才 <batman...@gmail.com> wrote:
>>>
>>> Hi Mark, forgot to ask about VolatileFlowFileRepository you mentioned,
>>>>
>>> if I
>>>
>>>> switch to use VolatileFlowFileRepository, will NiFi swap out all the
>>>>
>>> other
>>>
>>>> FlowFiles to disk if a queue is already full?
>>>> Is it just simply keeping all FlowFiles in memory?
>>>>
>>>> Regards,
>>>> Ben
>>>>
>>>> 2018-01-12 12:07 GMT+08:00 尹文才 <batman...@gmail.com>:
>>>>
>>>> Thanks Mark, my case is that I'm using NiFi to do some ETL work and
>>>>>
>>>> it's
>>>
>>>> possible that NiFi dies unexpectedly due to lack of system resources.
>>>>>
>>>> After
>>>>
>>>>> NiFi restarts itself,
>>>>> I will re-extract all the data from database and re-perform all the
>>>>> operations, so I need to clear all possible FlowFiles that might exist
>>>>>
>>>> in
>>>
>>>> any queue.
>>>>>
>>>>> Regards,
>>>>> Ben
>>>>>
>>>>> 2018-01-12 11:49 GMT+08:00 Mark Payne <marka...@hotmail.com>:
>>>>>
>>>>> Ben,
>>>>>>
>>>>>> I have to admit - that’s kind of an odd request :) I’m curious what
>>>>>>
>>>>> the
>>>
>>>> use case is, if you can share?
>>>>>>
>>>>>> Regardless, the easiest way would be to update nifi.properties so that
>>>>>> the FlowFile repo that is used is the VolatileFlowFileRepository. This
>>>>>> would avoid writing the FlowFile state to disk, so ok restart you will
>>>>>>
>>>>> lose
>>>>
>>>>> all FlowFiles. The content will still be present, but nifi will delete
>>>>>>
>>>>> it
>>>>
>>>>> all on startup because there is no FlowFile associated with it.
>>>>>>
>>>>>> I’m on my phone right now so can’t easily tell you the exact name of
>>>>>>
>>>>> the
>>>
>>>> property to change but you’ll probably find it pretty quickly. The
>>>>>>
>>>>> Admin
>>>
>>>> Guide may well explain the different repositories as well.
>>>>>>
>>>>>> Thanks
>>>>>> -Mark
>>>>>>
>>>>>> Sent from my iPhone
>>>>>>
>>>>>> On Jan 11, 2018, at 10:31 PM, 尹文才 <batman...@gmail.com> wrote:
>>>>>>>
>>>>>>> Hi guys, I'm trying to clear all FlowFIles in all queues when NiFi
>>>>>>>
>>>>>> is
>>>
>>>> restarted, but I don't know the correct way to do this. I checked
>>>>>>>
>>>>>> all
>>>
>>>> NiFi's guide documentation,
>>>>>>> it seems there're 2 possible solutions:
>>>>>>> 1. write a custom notification service: a notification service could
>>>>>>>
>>>>>> be
>>>>
>>>>> notified when NiFi is restarted and then inside the service, delete
>>>>>>>
>>>>>> all
>>>>
>>>>> the
>>>>>>
>>>>>>> files inside content_repository, flowfile_repository and
>>>>>>> provenance_repository.
>>>>>>>I know there're now 2 existing services: email and http. But I'm
>>>>>>>
>>>>>> not
>>>
>>>> quite sure how to correctly write one and deploy it into my NiFi
>>>>>>> environment, is there a tutorial on writing one notification
>>>>>>>
>>>>>> service?
>>>
>>>> 2. I know from the developer guide that by using the annotation
>>>>>>>
>>>>>> @Shutdown
>>>>>>
>>>>>>> in a custom processor, the method could be called when NiFi is
>>>>>>>
>>>>>> successfully
>>>>>>
>>>>>>> shut down. The problem with this approach is the method could
>>>>>>>not be guaranteed to be called when NiFi dies unexpectedly.
>>>>>>>
>>>>>>> Does anyone know what is the correct way to implement it? Thanks.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Ben
>>>>>>>
>>>>>>
>>>>>
>


Re: clear all flowfiles in all queues upon NiFi restart

2018-01-11 Thread
Hi Andrew, sorry I didn't follow your idea, could you please elaborate with
more details?
What I want to do is to be able to clear all the FlowFiles when NiFi dies
unexpectedly and restarts itself.

Regards,
Ben

2018-01-12 12:44 GMT+08:00 Andrew Grande <apere...@gmail.com>:

> Perhaps you could delete the repository directories when you need to
> restart with no data?
>
> On Thu, Jan 11, 2018, 9:16 PM 尹文才 <batman...@gmail.com> wrote:
>
> > Hi Mark, forgot to ask about VolatileFlowFileRepository you mentioned,
> if I
> > switch to use VolatileFlowFileRepository, will NiFi swap out all the
> other
> > FlowFiles to disk if a queue is already full?
> > Is it just simply keeping all FlowFiles in memory?
> >
> > Regards,
> > Ben
> >
> > 2018-01-12 12:07 GMT+08:00 尹文才 <batman...@gmail.com>:
> >
> > > Thanks Mark, my case is that I'm using NiFi to do some ETL work and
> it's
> > > possible that NiFi dies unexpectedly due to lack of system resources.
> > After
> > > NiFi restarts itself,
> > > I will re-extract all the data from database and re-perform all the
> > > operations, so I need to clear all possible FlowFiles that might exist
> in
> > > any queue.
> > >
> > > Regards,
> > > Ben
> > >
> > > 2018-01-12 11:49 GMT+08:00 Mark Payne <marka...@hotmail.com>:
> > >
> > >> Ben,
> > >>
> > >> I have to admit - that’s kind of an odd request :) I’m curious what
> the
> > >> use case is, if you can share?
> > >>
> > >> Regardless, the easiest way would be to update nifi.properties so that
> > >> the FlowFile repo that is used is the VolatileFlowFileRepository. This
> > >> would avoid writing the FlowFile state to disk, so ok restart you will
> > lose
> > >> all FlowFiles. The content will still be present, but nifi will delete
> > it
> > >> all on startup because there is no FlowFile associated with it.
> > >>
> > >> I’m on my phone right now so can’t easily tell you the exact name of
> the
> > >> property to change but you’ll probably find it pretty quickly. The
> Admin
> > >> Guide may well explain the different repositories as well.
> > >>
> > >> Thanks
> > >> -Mark
> > >>
> > >> Sent from my iPhone
> > >>
> > >> > On Jan 11, 2018, at 10:31 PM, 尹文才 <batman...@gmail.com> wrote:
> > >> >
> > >> > Hi guys, I'm trying to clear all FlowFIles in all queues when NiFi
> is
> > >> > restarted, but I don't know the correct way to do this. I checked
> all
> > >> > NiFi's guide documentation,
> > >> > it seems there're 2 possible solutions:
> > >> > 1. write a custom notification service: a notification service could
> > be
> > >> > notified when NiFi is restarted and then inside the service, delete
> > all
> > >> the
> > >> > files inside content_repository, flowfile_repository and
> > >> > provenance_repository.
> > >> >   I know there're now 2 existing services: email and http. But I'm
> not
> > >> > quite sure how to correctly write one and deploy it into my NiFi
> > >> > environment, is there a tutorial on writing one notification
> service?
> > >> >
> > >> > 2. I know from the developer guide that by using the annotation
> > >> @Shutdown
> > >> > in a custom processor, the method could be called when NiFi is
> > >> successfully
> > >> > shut down. The problem with this approach is the method could
> > >> >   not be guaranteed to be called when NiFi dies unexpectedly.
> > >> >
> > >> > Does anyone know what is the correct way to implement it? Thanks.
> > >> >
> > >> > Regards,
> > >> > Ben
> > >>
> > >
> > >
> >
>


Re: clear all flowfiles in all queues upon NiFi restart

2018-01-11 Thread
Hi Mark, forgot to ask about VolatileFlowFileRepository you mentioned, if I
switch to use VolatileFlowFileRepository, will NiFi swap out all the other
FlowFiles to disk if a queue is already full?
Is it just simply keeping all FlowFiles in memory?

Regards,
Ben

2018-01-12 12:07 GMT+08:00 尹文才 <batman...@gmail.com>:

> Thanks Mark, my case is that I'm using NiFi to do some ETL work and it's
> possible that NiFi dies unexpectedly due to lack of system resources. After
> NiFi restarts itself,
> I will re-extract all the data from database and re-perform all the
> operations, so I need to clear all possible FlowFiles that might exist in
> any queue.
>
> Regards,
> Ben
>
> 2018-01-12 11:49 GMT+08:00 Mark Payne <marka...@hotmail.com>:
>
>> Ben,
>>
>> I have to admit - that’s kind of an odd request :) I’m curious what the
>> use case is, if you can share?
>>
>> Regardless, the easiest way would be to update nifi.properties so that
>> the FlowFile repo that is used is the VolatileFlowFileRepository. This
>> would avoid writing the FlowFile state to disk, so ok restart you will lose
>> all FlowFiles. The content will still be present, but nifi will delete it
>> all on startup because there is no FlowFile associated with it.
>>
>> I’m on my phone right now so can’t easily tell you the exact name of the
>> property to change but you’ll probably find it pretty quickly. The Admin
>> Guide may well explain the different repositories as well.
>>
>> Thanks
>> -Mark
>>
>> Sent from my iPhone
>>
>> > On Jan 11, 2018, at 10:31 PM, 尹文才 <batman...@gmail.com> wrote:
>> >
>> > Hi guys, I'm trying to clear all FlowFIles in all queues when NiFi is
>> > restarted, but I don't know the correct way to do this. I checked all
>> > NiFi's guide documentation,
>> > it seems there're 2 possible solutions:
>> > 1. write a custom notification service: a notification service could be
>> > notified when NiFi is restarted and then inside the service, delete all
>> the
>> > files inside content_repository, flowfile_repository and
>> > provenance_repository.
>> >   I know there're now 2 existing services: email and http. But I'm not
>> > quite sure how to correctly write one and deploy it into my NiFi
>> > environment, is there a tutorial on writing one notification service?
>> >
>> > 2. I know from the developer guide that by using the annotation
>> @Shutdown
>> > in a custom processor, the method could be called when NiFi is
>> successfully
>> > shut down. The problem with this approach is the method could
>> >   not be guaranteed to be called when NiFi dies unexpectedly.
>> >
>> > Does anyone know what is the correct way to implement it? Thanks.
>> >
>> > Regards,
>> > Ben
>>
>
>


Re: clear all flowfiles in all queues upon NiFi restart

2018-01-11 Thread
Thanks Mark, my case is that I'm using NiFi to do some ETL work and it's
possible that NiFi dies unexpectedly due to lack of system resources. After
NiFi restarts itself,
I will re-extract all the data from database and re-perform all the
operations, so I need to clear all possible FlowFiles that might exist in
any queue.

Regards,
Ben

2018-01-12 11:49 GMT+08:00 Mark Payne <marka...@hotmail.com>:

> Ben,
>
> I have to admit - that’s kind of an odd request :) I’m curious what the
> use case is, if you can share?
>
> Regardless, the easiest way would be to update nifi.properties so that the
> FlowFile repo that is used is the VolatileFlowFileRepository. This would
> avoid writing the FlowFile state to disk, so ok restart you will lose all
> FlowFiles. The content will still be present, but nifi will delete it all
> on startup because there is no FlowFile associated with it.
>
> I’m on my phone right now so can’t easily tell you the exact name of the
> property to change but you’ll probably find it pretty quickly. The Admin
> Guide may well explain the different repositories as well.
>
> Thanks
> -Mark
>
> Sent from my iPhone
>
> > On Jan 11, 2018, at 10:31 PM, 尹文才 <batman...@gmail.com> wrote:
> >
> > Hi guys, I'm trying to clear all FlowFIles in all queues when NiFi is
> > restarted, but I don't know the correct way to do this. I checked all
> > NiFi's guide documentation,
> > it seems there're 2 possible solutions:
> > 1. write a custom notification service: a notification service could be
> > notified when NiFi is restarted and then inside the service, delete all
> the
> > files inside content_repository, flowfile_repository and
> > provenance_repository.
> >   I know there're now 2 existing services: email and http. But I'm not
> > quite sure how to correctly write one and deploy it into my NiFi
> > environment, is there a tutorial on writing one notification service?
> >
> > 2. I know from the developer guide that by using the annotation @Shutdown
> > in a custom processor, the method could be called when NiFi is
> successfully
> > shut down. The problem with this approach is the method could
> >   not be guaranteed to be called when NiFi dies unexpectedly.
> >
> > Does anyone know what is the correct way to implement it? Thanks.
> >
> > Regards,
> > Ben
>


clear all flowfiles in all queues upon NiFi restart

2018-01-11 Thread
Hi guys, I'm trying to clear all FlowFIles in all queues when NiFi is
restarted, but I don't know the correct way to do this. I checked all
NiFi's guide documentation,
it seems there're 2 possible solutions:
1. write a custom notification service: a notification service could be
notified when NiFi is restarted and then inside the service, delete all the
files inside content_repository, flowfile_repository and
provenance_repository.
   I know there're now 2 existing services: email and http. But I'm not
quite sure how to correctly write one and deploy it into my NiFi
environment, is there a tutorial on writing one notification service?

2. I know from the developer guide that by using the annotation @Shutdown
in a custom processor, the method could be called when NiFi is successfully
shut down. The problem with this approach is the method could
   not be guaranteed to be called when NiFi dies unexpectedly.

Does anyone know what is the correct way to implement it? Thanks.

Regards,
Ben


Re: DBCP connection pool problem - connection already closed

2018-01-11 Thread
Thanks Brett, I will switch to use jTDS and see how it goes.

Regards,
Ben

2018-01-11 16:22 GMT+08:00 Brett Ryan <brett.r...@gmail.com>:

> Hi Ben. It’s often recommended to use the jTDS driver [1] as the MS
> provided driver is considered buggy.
>
> I don’t make this claim, however; I’ve always used this driver for ms sql
> server and never encountered issues.
>
>   [1]: http://jtds.sourceforge.net/faq.html
>
> > On 11 Jan 2018, at 18:56, 尹文才 <batman...@gmail.com> wrote:
> >
> > Hi guys, I'm using the PutDatabaseRecord processor to write some data
> into
> > sql server and the processor is using the DBCP controller service as its
> > connection pool.
> > Sometimes I could see the following exception inside my log file:
> > org.apache.nifi.processor.exception.ProcessException: Failed to commit
> > database connection due to com.microsoft.sqlserver.jdbc.
> SQLServerException:
> > The connection is closed
> >
> > I know that DBCPConnectionPool is using the validation query property to
> > set test-on-borrow to true and also set the validation query accordingly,
> > so I set the validation query to "select 1",
> > but this doesn't seem to solve the problem. I tried to find the possible
> > reason and solution for this problem, the only thread I could find is
> that
> > someone talked about one possible reason on
> > StackOverFlow:
> >
> > "In case of IOException, the sqlserver jdbc driver marks the connection
> as
> > closed, but this is not detected by the pool. So the connection is
> returned
> > in the pool, while unusable."
> >
> > I'm not sure if the reason he mentioned above is the root cause of my
> > problem. Has anyone came across this kind of problem and how to work
> around
> > this issue? Thanks.
> >
> > Regards,
> > Ben
>


Re: NiFi data HA in cluster mode

2018-01-08 Thread
Thanks Joe, I will try to avoid to set processor to primary node. By the
way, I've seen someone posted suggestion about Data HA in NiFi's
wiki(HDFSContentRepository), is there a plan for that feature to be
implemented and included in NiFi?

Regards,
Ben

2018-01-09 14:25 GMT+08:00 Joe Witt <joe.w...@gmail.com>:

> I'd avoid setting any processor to primary node only unless it is a
> source processor (something that brings data into the system).
>
> But, yes, I believe your description is accurate as of now.
>
> Thanks
>
> On Mon, Jan 8, 2018 at 11:21 PM, 尹文才 <batman...@gmail.com> wrote:
> > Thanks Joe, so you mean for example, if I set one processor to run only
> on
> > primary node in the cluster and there're 100 FlowFiles in the incoming
> > queue of the processor
> > waiting to be processed by this processor, and the processor suddenly
> goes
> > down and then another node is elected as the primary node, those 100
> > FlowFiles will be kept locally
> > in the node that went down and will continue to be processed by the node
> > when it goes back online, these FlowFiles will not be available to the
> new
> > primary node and other nodes,
> > am I correct?
> >
> > Regards,
> > Ben
> >
> >
> > 2018-01-09 14:08 GMT+08:00 Joe Witt <joe.w...@gmail.com>:
> >
> >> Ben,
> >>
> >> Data already mid-flow within a node will be kept on the node and
> >> processed when the node is back on-line.  All other data coming into
> >> the cluster can fail-over to other nodes provided you're sourcing data
> >> with queuing semantics or automated load balancing or fail-over as-is
> >> present in the Apache NiFi Site to Site protocol.
> >>
> >> Thanks
> >> Joe
> >>
> >> On Mon, Jan 8, 2018 at 11:05 PM, 尹文才 <batman...@gmail.com> wrote:
> >> > Hi guys, I have a question about data HA when NiFi is run in clustered
> >> > mode, if one node goes down, will the flowfiles owned by this node
> taken
> >> > over and processed by another node?
> >> > Or will the flowfiles be kept locally to that node and will only be
> >> > processed when that node is back online? Thanks.
> >> >
> >> > Regards,
> >> > Ben
> >>
>


Re: NiFi data HA in cluster mode

2018-01-08 Thread
Thanks Joe, so you mean for example, if I set one processor to run only on
primary node in the cluster and there're 100 FlowFiles in the incoming
queue of the processor
waiting to be processed by this processor, and the processor suddenly goes
down and then another node is elected as the primary node, those 100
FlowFiles will be kept locally
in the node that went down and will continue to be processed by the node
when it goes back online, these FlowFiles will not be available to the new
primary node and other nodes,
am I correct?

Regards,
Ben


2018-01-09 14:08 GMT+08:00 Joe Witt <joe.w...@gmail.com>:

> Ben,
>
> Data already mid-flow within a node will be kept on the node and
> processed when the node is back on-line.  All other data coming into
> the cluster can fail-over to other nodes provided you're sourcing data
> with queuing semantics or automated load balancing or fail-over as-is
> present in the Apache NiFi Site to Site protocol.
>
> Thanks
> Joe
>
> On Mon, Jan 8, 2018 at 11:05 PM, 尹文才 <batman...@gmail.com> wrote:
> > Hi guys, I have a question about data HA when NiFi is run in clustered
> > mode, if one node goes down, will the flowfiles owned by this node taken
> > over and processed by another node?
> > Or will the flowfiles be kept locally to that node and will only be
> > processed when that node is back online? Thanks.
> >
> > Regards,
> > Ben
>


NiFi data HA in cluster mode

2018-01-08 Thread
Hi guys, I have a question about data HA when NiFi is run in clustered
mode, if one node goes down, will the flowfiles owned by this node taken
over and processed by another node?
Or will the flowfiles be kept locally to that node and will only be
processed when that node is back online? Thanks.

Regards,
Ben


Re: proper way in nifi to sync status between custom processors

2017-12-27 Thread
Hi Koji, I saw it was only showing the 1000 events so I couldn't see the
event when the FlowFile was created.

Regards,
Ben

2017-12-27 17:21 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>:

> I see, thanks. The easiest way to look at provenance events would be
> by right clicking a processor instance you are interested in, then
> select 'View data provenance' context menu. This way, NiFi displays
> provenance events for the selected processor.
>
> Koji
>
> On Wed, Dec 27, 2017 at 6:17 PM, 尹文才 <batman...@gmail.com> wrote:
> > Hi Koji, sorry about the provenance exception, it was because there's no
> > space left on the machine(filled up with logs)
> >
> > Regards,
> > Ben
> >
> > 2017-12-27 17:11 GMT+08:00 尹文才 <batman...@gmail.com>:
> >
> >> Hi Koji, thanks, the names of the temp tables are created with format
> >> "MMddHHmmssSSS-", the first time indicates the time and the
> second
> >> part is a random number with length of 4.
> >> So I think it's not possible to have 2 duplicate table names, the only
> >> possibly I could think is the flowfile is passed into the processor
> twice.
> >>
> >> About the provenance, I had updated to use the
> >> WriteAheadProvenanceRepository implementation, but when I tried to check
> >> the data provenance, it showed me the following exception message:
> >> HTTP ERROR 500
> >>
> >> Problem accessing /nifi/provenance. Reason:
> >>
> >> Server Error
> >>
> >> Caused by:
> >>
> >> javax.servlet.ServletException: org.eclipse.jetty.servlet.ServletHolder$1:
> java.lang.NullPointerException
> >>   at org.eclipse.jetty.server.handler.HandlerCollection.
> handle(HandlerCollection.java:138)
> >>   at org.eclipse.jetty.server.handler.gzip.GzipHandler.
> handle(GzipHandler.java:561)
> >>   at org.eclipse.jetty.server.handler.HandlerWrapper.handle(
> HandlerWrapper.java:132)
> >>   at org.eclipse.jetty.server.Server.handle(Server.java:564)
> >>   at org.eclipse.jetty.server.HttpChannel.handle(
> HttpChannel.java:320)
> >>   at org.eclipse.jetty.server.HttpConnection.onFillable(
> HttpConnection.java:251)
> >>   at org.eclipse.jetty.io.AbstractConnection$
> ReadCallback.succeeded(AbstractConnection.java:279)
> >>   at org.eclipse.jetty.io.FillInterest.fillable(
> FillInterest.java:110)
> >>   at org.eclipse.jetty.io.ssl.SslConnection.onFillable(
> SslConnection.java:258)
> >>   at org.eclipse.jetty.io.ssl.SslConnection$3.succeeded(
> SslConnection.java:147)
> >>   at org.eclipse.jetty.io.FillInterest.fillable(
> FillInterest.java:110)
> >>   at org.eclipse.jetty.io.ChannelEndPoint$2.run(
> ChannelEndPoint.java:124)
> >>   at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(
> QueuedThreadPool.java:672)
> >>   at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(
> QueuedThreadPool.java:590)
> >>   at java.lang.Thread.run(Thread.java:745)
> >> Caused by: org.eclipse.jetty.servlet.ServletHolder$1:
> java.lang.NullPointerException
> >>   at org.eclipse.jetty.servlet.ServletHolder.makeUnavailable(
> ServletHolder.java:596)
> >>   at org.eclipse.jetty.servlet.ServletHolder.initServlet(
> ServletHolder.java:655)
> >>   at org.eclipse.jetty.servlet.ServletHolder.getServlet(
> ServletHolder.java:498)
> >>   at org.eclipse.jetty.servlet.ServletHolder.ensureInstance(
> ServletHolder.java:785)
> >>   at org.eclipse.jetty.servlet.ServletHolder.prepare(
> ServletHolder.java:770)
> >>   at org.eclipse.jetty.servlet.ServletHandler.doHandle(
> ServletHandler.java:538)
> >>   at org.eclipse.jetty.server.handler.ScopedHandler.handle(
> ScopedHandler.java:143)
> >>   at org.eclipse.jetty.security.SecurityHandler.handle(
> SecurityHandler.java:548)
> >>   at org.eclipse.jetty.server.handler.HandlerWrapper.handle(
> HandlerWrapper.java:132)
> >>   at org.eclipse.jetty.server.handler.ScopedHandler.
> nextHandle(ScopedHandler.java:190)
> >>   at org.eclipse.jetty.server.session.SessionHandler.
> doHandle(SessionHandler.java:1593)
> >>   at org.eclipse.jetty.server.handler.ScopedHandler.
> nextHandle(ScopedHandler.java:188)
> >>   at org.eclipse.jetty.server.handler.ContextHandler.
> doHandle(ContextHandler.java:1239)
> >>   at org.eclipse.jetty.server.handler.ScopedHandler.
> nextScope(ScopedHandler.java:168)
> >>   at org.eclipse.jetty.servlet.S

Re: proper way in nifi to sync status between custom processors

2017-12-27 Thread
Hi Koji, sorry about the provenance exception, it was because there's no
space left on the machine(filled up with logs)

Regards,
Ben

2017-12-27 17:11 GMT+08:00 尹文才 <batman...@gmail.com>:

> Hi Koji, thanks, the names of the temp tables are created with format
> "MMddHHmmssSSS-", the first time indicates the time and the second
> part is a random number with length of 4.
> So I think it's not possible to have 2 duplicate table names, the only
> possibly I could think is the flowfile is passed into the processor twice.
>
> About the provenance, I had updated to use the
> WriteAheadProvenanceRepository implementation, but when I tried to check
> the data provenance, it showed me the following exception message:
> HTTP ERROR 500
>
> Problem accessing /nifi/provenance. Reason:
>
> Server Error
>
> Caused by:
>
> javax.servlet.ServletException: org.eclipse.jetty.servlet.ServletHolder$1: 
> java.lang.NullPointerException
>   at 
> org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:138)
>   at 
> org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:561)
>   at 
> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
>   at org.eclipse.jetty.server.Server.handle(Server.java:564)
>   at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320)
>   at 
> org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251)
>   at 
> org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:279)
>   at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110)
>   at 
> org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:258)
>   at 
> org.eclipse.jetty.io.ssl.SslConnection$3.succeeded(SslConnection.java:147)
>   at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110)
>   at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:124)
>   at 
> org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:672)
>   at 
> org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:590)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: org.eclipse.jetty.servlet.ServletHolder$1: 
> java.lang.NullPointerException
>   at 
> org.eclipse.jetty.servlet.ServletHolder.makeUnavailable(ServletHolder.java:596)
>   at 
> org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:655)
>   at 
> org.eclipse.jetty.servlet.ServletHolder.getServlet(ServletHolder.java:498)
>   at 
> org.eclipse.jetty.servlet.ServletHolder.ensureInstance(ServletHolder.java:785)
>   at 
> org.eclipse.jetty.servlet.ServletHolder.prepare(ServletHolder.java:770)
>   at 
> org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:538)
>   at 
> org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143)
>   at 
> org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:548)
>   at 
> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
>   at 
> org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:190)
>   at 
> org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1593)
>   at 
> org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:188)
>   at 
> org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1239)
>   at 
> org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:168)
>   at 
> org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:481)
>   at 
> org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1562)
>   at 
> org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:166)
>   at 
> org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1141)
>   at 
> org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
>   at 
> org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:118)
>   at 
> org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:561)
>   at 
> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
>   at org.eclipse.jetty.server.Server.handle(Server.java:564)
>   at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320)
>   at 
> org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251)
>   at 
> org.eclipse.jetty.io.AbstractConnection$Rea

Re: proper way in nifi to sync status between custom processors

2017-12-27 Thread
/html/user-guide.
> html#viewing-flowfile-lineage
>
> I didn't mention about it earlier because you were having Provenance
> repository performance issue, but I hope you can use it now with the
> WriteAheadProvenanceRepository.
>
> Thanks,
> Koji
>
> On Wed, Dec 27, 2017 at 5:44 PM, 尹文才 <batman...@gmail.com> wrote:
> > Thanks Koji, for the ExecuteSqlCommand issue, I was trying to re-execute
> > the sql query if the connection is lost(connection could be unstable), my
> > idea is to only transfer the FlowFile to the success relationship
> > after successfully executing the sql query. You could see the do while
> loop
> > in the code, the transaction will be rollbacked if the execution failed;
> if
> > the connection is lost, it will retry to execute the sql.
> > Will this logic cause my sql to be executed twice?
> >
> > For the WaitBatch processor, I will take your approach to test
> individually
> > to see if the WaitBatch processor could cause the FlowFile repository
> > checkpointing failure.
> >
> > Regards,
> > Ben
> >
> > 2017-12-27 16:10 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>:
> >
> >> Hi Ben,
> >>
> >> Excuse me, I'm trying, but probably I don't fully understand what you
> >> want to achieve with the flow.
> >>
> >> It looks weird that WaitBatch is failing with such FlowFile repository
> >> error, while other processor such as ReplaceText succeeds.
> >> I recommend to test WaitBatch alone first without combining the
> >> database related processors, by feeding a test FlowFile having
> >> expected FlowFile attributes.
> >> Such input FlowFiles can be created by GenerateFlowFile processor.
> >> If the same error happens with only WaitBatch processor, then it
> >> should be easier to debug.
> >>
> >> Thanks,
> >> Koji
> >>
> >> On Wed, Dec 27, 2017 at 4:49 PM, Koji Kawamura <ijokaruma...@gmail.com>
> >> wrote:
> >> > Hi Ben,
> >> >
> >> > The one thing that looks strange in the screenshot is the
> >> > ExecuteSqlCommand having FlowFiles queued in its incoming connection.
> >> > Those should be transferred to 'failure' relationship.
> >> >
> >> > Following executeSql() method, shouldn't it re-throw the caught
> >> exception?
> >> >
> >> >
> >> > try (Connection con = dbcpService.getConnection()) {
> >> > logger.debug("设置autoCommit为false");
> >> > con.setAutoCommit(false);
> >> >
> >> > try (Statement stmt = con.createStatement()) {
> >> > logger.info("执行sql语句: {}", new Object[]{sql});
> >> > stmt.execute(sql);
> >> >
> >> > // 所有sql语句执行在一个transaction内
> >> > logger.debug("提交transaction");
> >> > con.commit();
> >> > } catch (Exception ex) {
> >> > logger.error("执行sql语句失败:{}", new Object[]{sql,
> ex});
> >> > con.rollback();
> >> > //将exception抛到外层处理
> >> > throw ex;
> >> > } finally {
> >> > logger.debug("重新设置autoCommit为true");
> >> > con.setAutoCommit(true);
> >> > }
> >> > } catch (Exception ex) {
> >> > // HERE, the exception is swallowed, that's why the FlowFiles stay in
> >> > the incoming connection.
> >> > logger.error("重试执行sql语句:{}", new Object[]{sql, ex});
> >> > retryOnFail = true;
> >> > }
> >> >
> >> > Thanks,
> >> > Koji
> >> >
> >> > On Wed, Dec 27, 2017 at 2:38 PM, 尹文才 <batman...@gmail.com> wrote:
> >> >> Hi Koji, no problem. You could check the code of processor WaitBatch
> at
> >> the
> >> >> link:
> >> >> https://drive.google.com/open?id=1DMpW5GMiXpyZQdui989Rr3D9rlchQfWQ
> >> >>
> >> >> I also uploaded a snapshot of part of NiFi flow which includes the
> >> >> ExecuteSqlCommand and WaitBatch, you could check the picture at the
> >> link:
> >> >> https://drive.google.com/file/d/1vdxlWj8ANHQH0CMrXnyd

Re: proper way in nifi to sync status between custom processors

2017-12-27 Thread
Thanks Koji, for the ExecuteSqlCommand issue, I was trying to re-execute
the sql query if the connection is lost(connection could be unstable), my
idea is to only transfer the FlowFile to the success relationship
after successfully executing the sql query. You could see the do while loop
in the code, the transaction will be rollbacked if the execution failed; if
the connection is lost, it will retry to execute the sql.
Will this logic cause my sql to be executed twice?

For the WaitBatch processor, I will take your approach to test individually
to see if the WaitBatch processor could cause the FlowFile repository
checkpointing failure.

Regards,
Ben

2017-12-27 16:10 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>:

> Hi Ben,
>
> Excuse me, I'm trying, but probably I don't fully understand what you
> want to achieve with the flow.
>
> It looks weird that WaitBatch is failing with such FlowFile repository
> error, while other processor such as ReplaceText succeeds.
> I recommend to test WaitBatch alone first without combining the
> database related processors, by feeding a test FlowFile having
> expected FlowFile attributes.
> Such input FlowFiles can be created by GenerateFlowFile processor.
> If the same error happens with only WaitBatch processor, then it
> should be easier to debug.
>
> Thanks,
> Koji
>
> On Wed, Dec 27, 2017 at 4:49 PM, Koji Kawamura <ijokaruma...@gmail.com>
> wrote:
> > Hi Ben,
> >
> > The one thing that looks strange in the screenshot is the
> > ExecuteSqlCommand having FlowFiles queued in its incoming connection.
> > Those should be transferred to 'failure' relationship.
> >
> > Following executeSql() method, shouldn't it re-throw the caught
> exception?
> >
> >
> > try (Connection con = dbcpService.getConnection()) {
> > logger.debug("设置autoCommit为false");
> > con.setAutoCommit(false);
> >
> > try (Statement stmt = con.createStatement()) {
> > logger.info("执行sql语句: {}", new Object[]{sql});
> > stmt.execute(sql);
> >
> > // 所有sql语句执行在一个transaction内
> > logger.debug("提交transaction");
> > con.commit();
> > } catch (Exception ex) {
> > logger.error("执行sql语句失败:{}", new Object[]{sql, ex});
> > con.rollback();
> > //将exception抛到外层处理
> > throw ex;
> > } finally {
> > logger.debug("重新设置autoCommit为true");
> > con.setAutoCommit(true);
> > }
> > } catch (Exception ex) {
> > // HERE, the exception is swallowed, that's why the FlowFiles stay in
> > the incoming connection.
> > logger.error("重试执行sql语句:{}", new Object[]{sql, ex});
> > retryOnFail = true;
> > }
> >
> > Thanks,
> > Koji
> >
> > On Wed, Dec 27, 2017 at 2:38 PM, 尹文才 <batman...@gmail.com> wrote:
> >> Hi Koji, no problem. You could check the code of processor WaitBatch at
> the
> >> link:
> >> https://drive.google.com/open?id=1DMpW5GMiXpyZQdui989Rr3D9rlchQfWQ
> >>
> >> I also uploaded a snapshot of part of NiFi flow which includes the
> >> ExecuteSqlCommand and WaitBatch, you could check the picture at the
> link:
> >> https://drive.google.com/file/d/1vdxlWj8ANHQH0CMrXnydLni5o-3IVi2h/view
> >>
> >> You mentioned above that FlowFile repository fails checkpointing will
> >> affect other processors to process same FlowFile again, but as you could
> >> see from my snapshot image, the ExecuteSqlCommand is the second
> processor
> >> and before the WaitBatch processor, even if the FlowFile repository
> >> checkpointing failure is caused by WaitBatch, could it lead to the
> >> processors before it to process a FlowFile multiple times? Thanks.
> >>
> >> Regards,
> >> Ben
> >>
> >> 2017-12-27 12:36 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>:
> >>
> >>> Hi Ben,
> >>>
> >>> I was referring these two log messages in your previous email.
> >>> These two messages are both written by ExecuteSqlCommand, it does not
> >>> mean 'it was executed again'.
> >>>
> >>> ```
> >>> 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1]
> >>> c.z.nifi.processors.ExecuteSqlCommand
> >>>

Re: proper way in nifi to sync status between custom processors

2017-12-26 Thread
Hi Koji, no problem. You could check the code of processor WaitBatch at the
link:
https://drive.google.com/open?id=1DMpW5GMiXpyZQdui989Rr3D9rlchQfWQ

I also uploaded a snapshot of part of NiFi flow which includes the
ExecuteSqlCommand and WaitBatch, you could check the picture at the link:
https://drive.google.com/file/d/1vdxlWj8ANHQH0CMrXnydLni5o-3IVi2h/view

You mentioned above that FlowFile repository fails checkpointing will
affect other processors to process same FlowFile again, but as you could
see from my snapshot image, the ExecuteSqlCommand is the second processor
and before the WaitBatch processor, even if the FlowFile repository
checkpointing failure is caused by WaitBatch, could it lead to the
processors before it to process a FlowFile multiple times? Thanks.

Regards,
Ben

2017-12-27 12:36 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>:

> Hi Ben,
>
> I was referring these two log messages in your previous email.
> These two messages are both written by ExecuteSqlCommand, it does not
> mean 'it was executed again'.
>
> ```
> 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1]
> c.z.nifi.processors.ExecuteSqlCommand
> ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句: SELECT
> TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
> dbo.ods_extractDataDebug;
> alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column
> _id;
>
> and it was executed again later:
>
> 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1]
> c.z.nifi.processors.ExecuteSqlCommand
> ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14]
> 执行sql语句失败:SELECT
> ```
>
> As you written, the case where FlowFile repository fails checkpointing
> will affect other processors to process same FlowFiles again. However
> there won't be a simple solution to every processor to rollback its
> job as different processors do different things. Creating a temp table
> if not exist seems right approach to me.
>
> At the same time, the route cause of getting FlowFile repository
> failed should be investigated. Is it possible to share WaitBatch code?
> The reason why ask this is all 'FlowFile Repository failed to update'
> is related to WaitBatch processor in the log that you shared earlier.
>
> Thanks,
> Koji
>
> On Wed, Dec 27, 2017 at 1:19 PM, 尹文才 <batman...@gmail.com> wrote:
> > Hi Koji, I will print the sql before actually executing it, but I checked
> > the error log line you mentioned in your reply, this error was thrown by
> > NiFi from within another processor called WaitBatch.
> > I didn't find similar errors as the one from the ExecuteSqlCommand
> > processor, I think it's because only the ExecuteSqlCommand is used to
> > create temp database tables.
> > You could check my ExecuteSqlCommand code via the link:
> > https://drive.google.com/open?id=1NnjBihyKpmUPEH7X28Mh2hgOrhjSk_5P
> >
> > If the error is really caused by FlowFile repository checkpoint failure
> and
> > the flowfile was executed twice, I may have to create the temp table only
> > if doesn't exist, I didn't fix this bug in this way
> > right away is because I was afraid this fix could cover some other
> problems.
> >
> > Thanks.
> >
> > Regards,
> > Ben
> >
> > 2017-12-27 11:38 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>:
> >
> >> Hi Ben,
> >>
> >> The following two log messages are very close in terms of written
> >> timestamp, but have different log level.
> >> 2017-12-26 07:00:01,312 INFO
> >> 2017-12-26 07:00:01,315 ERROR
> >>
> >> I guess those are logged within a single onTrigger of your
> >> ExecuteSqlCommand custom processor, one is before executing, the other
> >> is when it caught an exception. Just guessing as I don't have access
> >> to the code.
> >>
> >> Does the same issue happen with other processors bundled with Apache
> >> NiFi without your custom processor running?
> >>
> >> If NiFi fails to update/checkpoint FlowFile repository, then the same
> >> FlowFile can be processed again after restarting NiFi.
> >>
> >> Thanks,
> >> Koji
> >>
> >>
> >>
> >> On Wed, Dec 27, 2017 at 12:21 PM, 尹文才 <batman...@gmail.com> wrote:
> >> > Thanks Koji, I will look into this article about the record model.
> >> >
> >> > By the way, that error I previously mentioned to you occurred again, I
> >> > could see the sql query was executed twice in the log, this time I had
> >> > turned on the verbose NiFi logging, the sql query is as below:
> >> >
> &

Re: proper way in nifi to sync status between custom processors

2017-12-26 Thread
Hi Koji, I will print the sql before actually executing it, but I checked
the error log line you mentioned in your reply, this error was thrown by
NiFi from within another processor called WaitBatch.
I didn't find similar errors as the one from the ExecuteSqlCommand
processor, I think it's because only the ExecuteSqlCommand is used to
create temp database tables.
You could check my ExecuteSqlCommand code via the link:
https://drive.google.com/open?id=1NnjBihyKpmUPEH7X28Mh2hgOrhjSk_5P

If the error is really caused by FlowFile repository checkpoint failure and
the flowfile was executed twice, I may have to create the temp table only
if doesn't exist, I didn't fix this bug in this way
right away is because I was afraid this fix could cover some other problems.

Thanks.

Regards,
Ben

2017-12-27 11:38 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>:

> Hi Ben,
>
> The following two log messages are very close in terms of written
> timestamp, but have different log level.
> 2017-12-26 07:00:01,312 INFO
> 2017-12-26 07:00:01,315 ERROR
>
> I guess those are logged within a single onTrigger of your
> ExecuteSqlCommand custom processor, one is before executing, the other
> is when it caught an exception. Just guessing as I don't have access
> to the code.
>
> Does the same issue happen with other processors bundled with Apache
> NiFi without your custom processor running?
>
> If NiFi fails to update/checkpoint FlowFile repository, then the same
> FlowFile can be processed again after restarting NiFi.
>
> Thanks,
> Koji
>
>
>
> On Wed, Dec 27, 2017 at 12:21 PM, 尹文才 <batman...@gmail.com> wrote:
> > Thanks Koji, I will look into this article about the record model.
> >
> > By the way, that error I previously mentioned to you occurred again, I
> > could see the sql query was executed twice in the log, this time I had
> > turned on the verbose NiFi logging, the sql query is as below:
> >
> > 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1]
> > c.z.nifi.processors.ExecuteSqlCommand
> > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句:
> SELECT
> > TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
> > dbo.ods_extractDataDebug;
> > alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column
> _id;
> >
> > and it was executed again later:
> >
> > 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1]
> > c.z.nifi.processors.ExecuteSqlCommand
> > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14]
> 执行sql语句失败:SELECT
> > TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
> > dbo.ods_extractDataDebug;
> > alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column
> > _id;: com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为
> > 'ods_extractDataDebug_20171226031801926_9195' 的对象。
> > com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为
> > 'ods_extractDataDebug_20171226031801926_9195' 的对象。
> > at
> > com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(
> SQLServerException.java:217)
> > at
> > com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(
> SQLServerStatement.java:1655)
> > at
> > com.microsoft.sqlserver.jdbc.SQLServerStatement.doExecuteStatement(
> SQLServerStatement.java:885)
> > at
> > com.microsoft.sqlserver.jdbc.SQLServerStatement$StmtExecCmd.doExecute(
> SQLServerStatement.java:778)
> > at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7505)
> > at
> > com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(
> SQLServerConnection.java:2445)
> > at
> > com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(
> SQLServerStatement.java:191)
> > at
> > com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(
> SQLServerStatement.java:166)
> > at
> > com.microsoft.sqlserver.jdbc.SQLServerStatement.execute(
> SQLServerStatement.java:751)
> > at
> > org.apache.commons.dbcp.DelegatingStatement.execute(
> DelegatingStatement.java:264)
> > at
> > org.apache.commons.dbcp.DelegatingStatement.execute(
> DelegatingStatement.java:264)
> > at
> > com.zjrealtech.nifi.processors.ExecuteSqlCommand.
> executeSql(ExecuteSqlCommand.java:194)
> > at
> > com.zjrealtech.nifi.processors.ExecuteSqlCommand.
> onTrigger(ExecuteSqlCommand.java:164)
> > at
> > org.apache.nifi.processor.AbstractProcessor.onTrigger(
> AbstractProcessor.java:27)
> > at
> > org.apache.nifi.controller.StandardProcessorNode.onTrigger(
> StandardProcessorNode.java:1119)
> > at
> > org.apache.nifi.controller.

Re: proper way in nifi to sync status between custom processors

2017-12-26 Thread
Thanks Koji, I will look into this article about the record model.

By the way, that error I previously mentioned to you occurred again, I
could see the sql query was executed twice in the log, this time I had
turned on the verbose NiFi logging, the sql query is as below:

2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1]
c.z.nifi.processors.ExecuteSqlCommand
ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句: SELECT
TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
dbo.ods_extractDataDebug;
alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column _id;

and it was executed again later:

2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1]
c.z.nifi.processors.ExecuteSqlCommand
ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句失败:SELECT
TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
dbo.ods_extractDataDebug;
alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column
_id;: com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为
'ods_extractDataDebug_20171226031801926_9195' 的对象。
com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为
'ods_extractDataDebug_20171226031801926_9195' 的对象。
at
com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:217)
at
com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1655)
at
com.microsoft.sqlserver.jdbc.SQLServerStatement.doExecuteStatement(SQLServerStatement.java:885)
at
com.microsoft.sqlserver.jdbc.SQLServerStatement$StmtExecCmd.doExecute(SQLServerStatement.java:778)
at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7505)
at
com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:2445)
at
com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:191)
at
com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:166)
at
com.microsoft.sqlserver.jdbc.SQLServerStatement.execute(SQLServerStatement.java:751)
at
org.apache.commons.dbcp.DelegatingStatement.execute(DelegatingStatement.java:264)
at
org.apache.commons.dbcp.DelegatingStatement.execute(DelegatingStatement.java:264)
at
com.zjrealtech.nifi.processors.ExecuteSqlCommand.executeSql(ExecuteSqlCommand.java:194)
at
com.zjrealtech.nifi.processors.ExecuteSqlCommand.onTrigger(ExecuteSqlCommand.java:164)
at
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1119)
at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
at
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

I also saw a lot of NiFi's exception like "ProcessException: FlowFile
Repository failed to update", not sure if this is the reason the FlowFile
got processed twice.  Could you help to take a look at my log file? Thanks.
You could get the log file via the link:
https://drive.google.com/file/d/1uVgtAVNEHxAbAPEpNTOWq_N9Xu6zMEi3/view

Best Regards,
Ben

2017-12-27 10:00 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>:

> Hi Ben,
>
> This blog post written by Mark, would be a good starting point to get
> familiar with NiFi Record model.
> https://blogs.apache.org/nifi/entry/record-oriented-data-with-nifi
>
> HA for DistributedMapCacheClientService and DistributedMapCacheServer
> pair is not supported at the moment. If you need HighAvailability,
> RedisDistributedMapCacheClientService with Redis replication will
> provide that, I haven't tried that myself though.
> https://redis.io/topics/replication
>
> Thanks,
> Koji
>
> On Tue, Dec 26, 2017 at 7:58 PM, 尹文才 <batman...@gmail.com> wrote:
> > Thanks for your quick response, Koji, I haven't heard and seen anything
> > about the NiFi record data model when I was reading the NiFi
> > documentations,could you tell me where this model is documented? Thanks.
> >
> > By the way, to my knowledge, when you need to use the
> DistributedMapCacheServer
> > from DistributedMapCacheClientService, you need to specify the hos

Re: proper way in nifi to sync status between custom processors

2017-12-26 Thread
Thanks for your quick response, Koji, I haven't heard and seen anything
about the NiFi record data model when I was reading the NiFi
documentations,could you tell me where this model is documented? Thanks.

By the way, to my knowledge, when you need to use the DistributedMapCacheServer
from DistributedMapCacheClientService, you need to specify the host url for
the server, this means inside a NiFi cluster
when I specify the cache server and the node suddenly went down, I couldn't
possibly use it until the node goes up again right? Is there currently such
a cache server in NiFi that could support HA? Thanks.

Regards,
Ben

2017-12-26 18:34 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>:

> Hi Ben,
>
> As you found from existing code, DistributedMapCache is used to share
> state among different processors, and it can be used by your custom
> processors, too.
> However, I'd recommend to avoid such tight dependencies between
> FlowFiles if possible, or minimize the part in flow that requires that
> constraint at least for better performance and simplicity.
> For example, since a FlowFile can hold fairly large amount of data,
> you could merge all FlowFiles in a single FlowFile, instead of batches
> of FlowFiles. If you need logical boundaries, you can use NiFi Record
> data model to embed multiple records within a FlowFile, Record should
> perform better.
>
> Hope this helps.
>
> Thanks,
> Koji
>
>
> On Tue, Dec 26, 2017 at 5:55 PM, 尹文才 <batman...@gmail.com> wrote:
> > Hi guys, I'm currently trying to find a proper way in nifi which could
> sync
> > status between my custom processors.
> > our requirement is like this, we're doing some ETL work using nifi and
> I'm
> > extracting the data from DB into batches of FlowFiles(each batch of
> > FlowFile has a flag FlowFile indicating the end of the batch).
> > There're some groups of custom processors downstream that need to process
> > these FlowFiles to do some business logic work. And we expect these
> > processors to process one batch of FlowFiles at a time.
> > Therefore we need to implement a custom Wait processor(let's just call it
> > WaitBatch here) to hold all the other batches of FlowFiles while the
> > business processors were handling the batch of FlowFiles whose creation
> > time is earlier.
> >
> > In order to implement this, all the WaitBatch processors placed in the
> flow
> > need to read/update records in a shared map so that each set of
> > business-logic processors process one batch at a time.
> > The entries are keyed using the batch number of the FlowFiles and the
> value
> > of each entry is a batch release counter number which counts the number
> of
> > times the batch of FlowFiles has passed through
> > a WaitBatch processor.
> > When a batch is released by WaitBatch, it will try to increment the batch
> > number entry's value by 1 and then the released batch number and counter
> > number will also be saved locally at the WaitBatch with StateManager;
> > when the next batch reaches the WaitBatch, it will check if the counter
> > value of the previous released batch number in the shared map is greater
> > than the one saved locally, if the entry for the batch number does't
> > exist(already removed) or the value in the shared map is greater, the
> next
> > batch will be released and the local state and the entry on the shared
> map
> > will be updated similarly.
> > In the end of the flow, a custom processor will get the batch number from
> > each batch and remove the entry from the shared map .
> >
> > So this implementation requires a shared map that could read/update
> > frequently and atomically. I checked the Wait/Notify processors in NIFI
> and
> > saw it is using the DistributedMapCacheClientService and
> > DistributedMapCacheServer to sync status, so I'm wondering if I could use
> > the DistributedMapCacheClientService to implement my logic. I also saw
> > another implementation called RedisDistributedMapCacheClientService
> > which seems to require Redis(I haven't used Redis).  Thanks in advance
> for
> > any suggestions.
> >
> > Regards,
> > Ben
>


proper way in nifi to sync status between custom processors

2017-12-26 Thread
Hi guys, I'm currently trying to find a proper way in nifi which could sync
status between my custom processors.
our requirement is like this, we're doing some ETL work using nifi and I'm
extracting the data from DB into batches of FlowFiles(each batch of
FlowFile has a flag FlowFile indicating the end of the batch).
There're some groups of custom processors downstream that need to process
these FlowFiles to do some business logic work. And we expect these
processors to process one batch of FlowFiles at a time.
Therefore we need to implement a custom Wait processor(let's just call it
WaitBatch here) to hold all the other batches of FlowFiles while the
business processors were handling the batch of FlowFiles whose creation
time is earlier.

In order to implement this, all the WaitBatch processors placed in the flow
need to read/update records in a shared map so that each set of
business-logic processors process one batch at a time.
The entries are keyed using the batch number of the FlowFiles and the value
of each entry is a batch release counter number which counts the number of
times the batch of FlowFiles has passed through
a WaitBatch processor.
When a batch is released by WaitBatch, it will try to increment the batch
number entry's value by 1 and then the released batch number and counter
number will also be saved locally at the WaitBatch with StateManager;
when the next batch reaches the WaitBatch, it will check if the counter
value of the previous released batch number in the shared map is greater
than the one saved locally, if the entry for the batch number does't
exist(already removed) or the value in the shared map is greater, the next
batch will be released and the local state and the entry on the shared map
will be updated similarly.
In the end of the flow, a custom processor will get the batch number from
each batch and remove the entry from the shared map .

So this implementation requires a shared map that could read/update
frequently and atomically. I checked the Wait/Notify processors in NIFI and
saw it is using the DistributedMapCacheClientService and
DistributedMapCacheServer to sync status, so I'm wondering if I could use
the DistributedMapCacheClientService to implement my logic. I also saw
another implementation called RedisDistributedMapCacheClientService
which seems to require Redis(I haven't used Redis).  Thanks in advance for
any suggestions.

Regards,
Ben


Re: The rate of the dataflow is exceeding the provenance recording rate. Slowing down flow to accommodate

2017-12-25 Thread
Thanks Koji,  I have already updated the logback configuration to produce
more verbose logs.
I was trying to reply to you with the verbose nifi logs but since I
switched to use the WriteAheadProvenanceRepository implementation, up till
now I haven't seen the error again.
I will continue to check when the error might occur and post the logs here
if needed. Once again thanks very much for your help.

Regards,
Ben

2017-12-25 15:37 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>:

> Hi Ben,
>
> You can make NiFi log more verbose by editing:
> NIFI_HOME/conf/logback.xml
>
> For example, adding following entry will reveal how NiFi repositories run:
>
> 
> 
> 
> 
>
> Thanks,
> Koji
>
> On Mon, Dec 25, 2017 at 4:30 PM, 尹文才 <batman...@gmail.com> wrote:
> > Hi Koji, I also didn't find anything related to the unexpected shutdown
> in
> > my logs, is there anything I could do  to make NIFI log more verbose
> > information to the logs?
> >
> > Regards,
> > Ben
> >
> > 2017-12-25 14:56 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>:
> >
> >> Hi Ben,
> >>
> >> I looked at the log and I expected to see some indication for the
> >> cause of shutdown, but couldn't find any.
> >> The PersistentProvenanceRepository rate warning is just a warning, and
> >> it shouldn't be the trigger of an unexpected shutdown. I suspect other
> >> reasons such as OOM killer, but I can't do any further investigation
> >> with only these logs.
> >>
> >> Thanks,
> >> Koji
> >>
> >> On Mon, Dec 25, 2017 at 3:46 PM, 尹文才 <batman...@gmail.com> wrote:
> >> > Hi Koji, one more thing, do you have any idea why my first issue
> leads to
> >> > the unexpected shutdown of NIFI? according to the words, it will just
> >> slow
> >> > down the flow. thanks.
> >> >
> >> > Regards,
> >> > Ben
> >> >
> >> > 2017-12-25 14:31 GMT+08:00 尹文才 <batman...@gmail.com>:
> >> >
> >> >> Hi Koji, thanks for your help, for the first issue, I will switch to
> use
> >> >> the WriteAheadProvenanceReopsitory implementation.
> >> >>
> >> >> For the second issue, I have uploaded the relevant part of my log
> file
> >> >> onto my google drive, the link is:
> >> >> https://drive.google.com/open?id=1oxAkSUyYZFy6IWZSeWqHI8e9Utnw1XAj
> >> >>
> >> >> You mean a custom processor could possibly process a flowfile twice
> only
> >> >> when it's trying to commit the session but it's interrupted so the
> >> flowfile
> >> >> still remains inside the original queue(like NIFI went down)?
> >> >>
> >> >> If you need to see the full log file, please let me know, thanks.
> >> >>
> >> >> Regards,
> >> >> Ben
> >> >>
> >> >> 2017-12-25 13:51 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>:
> >> >>
> >> >>> Hi Ben,
> >> >>>
> >> >>> For your 2nd issue, NiFi commits a process session in Processor
> >> >>> onTrigger when it's executed by NiFi flow engine by calling
> >> >>> session.commit().
> >> >>> https://github.com/apache/nifi/blob/master/nifi-api/src/main
> >> >>> /java/org/apache/nifi/processor/AbstractProcessor.java#L28
> >> >>> Once a process session is committed, the FlowFile state (including
> >> >>> which queue it is in) is persisted to disk.
> >> >>>
> >> >>> It's possible for a Processor to process the same FlowFile more than
> >> >>> once, if it has done its job, but failed to commit the session.
> >> >>> For example, if your custom processor created a temp table from a
> >> >>> FlowFile. Then before the process session is committed, something
> >> >>> happened and NiFi process session was rollback. In this case, the
> >> >>> target database is already updated (the temp table is created), but
> >> >>> NiFi FlowFile stays in the incoming queue. If the FlowFile is
> >> >>> processed again, the processor will get an error indicating the
> table
> >> >>> already exists.
> >> >>>
> >> >>> I tried to look at the logs you attached, but attachments do not
> seem
> >> >>> to be delivered to this ML. I don't see anything attach

Re: The rate of the dataflow is exceeding the provenance recording rate. Slowing down flow to accommodate

2017-12-24 Thread
Hi Koji, I also didn't find anything related to the unexpected shutdown in
my logs, is there anything I could do  to make NIFI log more verbose
information to the logs?

Regards,
Ben

2017-12-25 14:56 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>:

> Hi Ben,
>
> I looked at the log and I expected to see some indication for the
> cause of shutdown, but couldn't find any.
> The PersistentProvenanceRepository rate warning is just a warning, and
> it shouldn't be the trigger of an unexpected shutdown. I suspect other
> reasons such as OOM killer, but I can't do any further investigation
> with only these logs.
>
> Thanks,
> Koji
>
> On Mon, Dec 25, 2017 at 3:46 PM, 尹文才 <batman...@gmail.com> wrote:
> > Hi Koji, one more thing, do you have any idea why my first issue leads to
> > the unexpected shutdown of NIFI? according to the words, it will just
> slow
> > down the flow. thanks.
> >
> > Regards,
> > Ben
> >
> > 2017-12-25 14:31 GMT+08:00 尹文才 <batman...@gmail.com>:
> >
> >> Hi Koji, thanks for your help, for the first issue, I will switch to use
> >> the WriteAheadProvenanceReopsitory implementation.
> >>
> >> For the second issue, I have uploaded the relevant part of my log file
> >> onto my google drive, the link is:
> >> https://drive.google.com/open?id=1oxAkSUyYZFy6IWZSeWqHI8e9Utnw1XAj
> >>
> >> You mean a custom processor could possibly process a flowfile twice only
> >> when it's trying to commit the session but it's interrupted so the
> flowfile
> >> still remains inside the original queue(like NIFI went down)?
> >>
> >> If you need to see the full log file, please let me know, thanks.
> >>
> >> Regards,
> >> Ben
> >>
> >> 2017-12-25 13:51 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>:
> >>
> >>> Hi Ben,
> >>>
> >>> For your 2nd issue, NiFi commits a process session in Processor
> >>> onTrigger when it's executed by NiFi flow engine by calling
> >>> session.commit().
> >>> https://github.com/apache/nifi/blob/master/nifi-api/src/main
> >>> /java/org/apache/nifi/processor/AbstractProcessor.java#L28
> >>> Once a process session is committed, the FlowFile state (including
> >>> which queue it is in) is persisted to disk.
> >>>
> >>> It's possible for a Processor to process the same FlowFile more than
> >>> once, if it has done its job, but failed to commit the session.
> >>> For example, if your custom processor created a temp table from a
> >>> FlowFile. Then before the process session is committed, something
> >>> happened and NiFi process session was rollback. In this case, the
> >>> target database is already updated (the temp table is created), but
> >>> NiFi FlowFile stays in the incoming queue. If the FlowFile is
> >>> processed again, the processor will get an error indicating the table
> >>> already exists.
> >>>
> >>> I tried to look at the logs you attached, but attachments do not seem
> >>> to be delivered to this ML. I don't see anything attached.
> >>>
> >>> Thanks,
> >>> Koji
> >>>
> >>>
> >>> On Mon, Dec 25, 2017 at 1:43 PM, Koji Kawamura <ijokaruma...@gmail.com
> >
> >>> wrote:
> >>> > Hi Ben,
> >>> >
> >>> > Just a quick recommendation for your first issue, 'The rate of the
> >>> > dataflow is exceeding the provenance recording rate' warning message.
> >>> > I'd recommend using WriteAheadProvenanceRepository instead of
> >>> > PersistentProvenanceRepository. WriteAheadProvenanceRepository
> >>> > provides better performance.
> >>> > Please take a look at the documentation here.
> >>> > https://nifi.apache.org/docs/nifi-docs/html/administration-g
> >>> uide.html#provenance-repository
> >>> >
> >>> > Thanks,
> >>> > Koji
> >>> >
> >>> > On Mon, Dec 25, 2017 at 12:56 PM, 尹文才 <batman...@gmail.com> wrote:
> >>> >> Hi guys, I'm using nifi 1.4.0 to do some ETL work in my team and I
> have
> >>> >> encountered 2 problems during my testing.
> >>> >>
> >>> >> The first problem is I found the nifi bulletin board was showing the
> >>> >> following warning to me:
> >>> >>
> >>> >> 2017-12-25 01:31:00,46

Re: The rate of the dataflow is exceeding the provenance recording rate. Slowing down flow to accommodate

2017-12-24 Thread
Hi Koji, one more thing, do you have any idea why my first issue leads to
the unexpected shutdown of NIFI? according to the words, it will just slow
down the flow. thanks.

Regards,
Ben

2017-12-25 14:31 GMT+08:00 尹文才 <batman...@gmail.com>:

> Hi Koji, thanks for your help, for the first issue, I will switch to use
> the WriteAheadProvenanceReopsitory implementation.
>
> For the second issue, I have uploaded the relevant part of my log file
> onto my google drive, the link is:
> https://drive.google.com/open?id=1oxAkSUyYZFy6IWZSeWqHI8e9Utnw1XAj
>
> You mean a custom processor could possibly process a flowfile twice only
> when it's trying to commit the session but it's interrupted so the flowfile
> still remains inside the original queue(like NIFI went down)?
>
> If you need to see the full log file, please let me know, thanks.
>
> Regards,
> Ben
>
> 2017-12-25 13:51 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>:
>
>> Hi Ben,
>>
>> For your 2nd issue, NiFi commits a process session in Processor
>> onTrigger when it's executed by NiFi flow engine by calling
>> session.commit().
>> https://github.com/apache/nifi/blob/master/nifi-api/src/main
>> /java/org/apache/nifi/processor/AbstractProcessor.java#L28
>> Once a process session is committed, the FlowFile state (including
>> which queue it is in) is persisted to disk.
>>
>> It's possible for a Processor to process the same FlowFile more than
>> once, if it has done its job, but failed to commit the session.
>> For example, if your custom processor created a temp table from a
>> FlowFile. Then before the process session is committed, something
>> happened and NiFi process session was rollback. In this case, the
>> target database is already updated (the temp table is created), but
>> NiFi FlowFile stays in the incoming queue. If the FlowFile is
>> processed again, the processor will get an error indicating the table
>> already exists.
>>
>> I tried to look at the logs you attached, but attachments do not seem
>> to be delivered to this ML. I don't see anything attached.
>>
>> Thanks,
>> Koji
>>
>>
>> On Mon, Dec 25, 2017 at 1:43 PM, Koji Kawamura <ijokaruma...@gmail.com>
>> wrote:
>> > Hi Ben,
>> >
>> > Just a quick recommendation for your first issue, 'The rate of the
>> > dataflow is exceeding the provenance recording rate' warning message.
>> > I'd recommend using WriteAheadProvenanceRepository instead of
>> > PersistentProvenanceRepository. WriteAheadProvenanceRepository
>> > provides better performance.
>> > Please take a look at the documentation here.
>> > https://nifi.apache.org/docs/nifi-docs/html/administration-g
>> uide.html#provenance-repository
>> >
>> > Thanks,
>> > Koji
>> >
>> > On Mon, Dec 25, 2017 at 12:56 PM, 尹文才 <batman...@gmail.com> wrote:
>> >> Hi guys, I'm using nifi 1.4.0 to do some ETL work in my team and I have
>> >> encountered 2 problems during my testing.
>> >>
>> >> The first problem is I found the nifi bulletin board was showing the
>> >> following warning to me:
>> >>
>> >> 2017-12-25 01:31:00,460 WARN [Provenance Maintenance Thread-1]
>> >> o.a.n.p.PersistentProvenanceRepository The rate of the dataflow is
>> exceeding
>> >> the provenance recording rate. Slowing down flow to accommodate.
>> Currently,
>> >> there are 96 journal files (158278228 bytes) and threshold for
>> blocking is
>> >> 80 (1181116006 bytes)
>> >>
>> >> I don't quite understand what this means, and I found also inside the
>> >> bootstrap log that nifi restarted itself:
>> >>
>> >> 2017-12-25 01:31:19,249 WARN [main] org.apache.nifi.bootstrap.RunNiFi
>> Apache
>> >> NiFi appears to have died. Restarting...
>> >>
>> >> Is there anything I could do so solve this problem?
>> >>
>> >> The second problem is about the FlowFiles inside my flow, I actually
>> >> implemented a few custom processors to do the ETL work. one is to
>> extract
>> >> multiple tables from sql server and for each flowfile out of it, it
>> contains
>> >> an attribute
>> >> specifying the name of the temp ods table to create, and the second
>> >> processor is to get all flowfiles from the first processor and create
>> all
>> >> the temp ods tables specified in the flowfiles' attribute.
>> >> I found inside the app log that one of the te

Re: The rate of the dataflow is exceeding the provenance recording rate. Slowing down flow to accommodate

2017-12-24 Thread
Hi Koji, thanks for your help, for the first issue, I will switch to use
the WriteAheadProvenanceReopsitory implementation.

For the second issue, I have uploaded the relevant part of my log file onto
my google drive, the link is:
https://drive.google.com/open?id=1oxAkSUyYZFy6IWZSeWqHI8e9Utnw1XAj

You mean a custom processor could possibly process a flowfile twice only
when it's trying to commit the session but it's interrupted so the flowfile
still remains inside the original queue(like NIFI went down)?

If you need to see the full log file, please let me know, thanks.

Regards,
Ben

2017-12-25 13:51 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>:

> Hi Ben,
>
> For your 2nd issue, NiFi commits a process session in Processor
> onTrigger when it's executed by NiFi flow engine by calling
> session.commit().
> https://github.com/apache/nifi/blob/master/nifi-api/src/
> main/java/org/apache/nifi/processor/AbstractProcessor.java#L28
> Once a process session is committed, the FlowFile state (including
> which queue it is in) is persisted to disk.
>
> It's possible for a Processor to process the same FlowFile more than
> once, if it has done its job, but failed to commit the session.
> For example, if your custom processor created a temp table from a
> FlowFile. Then before the process session is committed, something
> happened and NiFi process session was rollback. In this case, the
> target database is already updated (the temp table is created), but
> NiFi FlowFile stays in the incoming queue. If the FlowFile is
> processed again, the processor will get an error indicating the table
> already exists.
>
> I tried to look at the logs you attached, but attachments do not seem
> to be delivered to this ML. I don't see anything attached.
>
> Thanks,
> Koji
>
>
> On Mon, Dec 25, 2017 at 1:43 PM, Koji Kawamura <ijokaruma...@gmail.com>
> wrote:
> > Hi Ben,
> >
> > Just a quick recommendation for your first issue, 'The rate of the
> > dataflow is exceeding the provenance recording rate' warning message.
> > I'd recommend using WriteAheadProvenanceRepository instead of
> > PersistentProvenanceRepository. WriteAheadProvenanceRepository
> > provides better performance.
> > Please take a look at the documentation here.
> > https://nifi.apache.org/docs/nifi-docs/html/administration-
> guide.html#provenance-repository
> >
> > Thanks,
> > Koji
> >
> > On Mon, Dec 25, 2017 at 12:56 PM, 尹文才 <batman...@gmail.com> wrote:
> >> Hi guys, I'm using nifi 1.4.0 to do some ETL work in my team and I have
> >> encountered 2 problems during my testing.
> >>
> >> The first problem is I found the nifi bulletin board was showing the
> >> following warning to me:
> >>
> >> 2017-12-25 01:31:00,460 WARN [Provenance Maintenance Thread-1]
> >> o.a.n.p.PersistentProvenanceRepository The rate of the dataflow is
> exceeding
> >> the provenance recording rate. Slowing down flow to accommodate.
> Currently,
> >> there are 96 journal files (158278228 bytes) and threshold for blocking
> is
> >> 80 (1181116006 bytes)
> >>
> >> I don't quite understand what this means, and I found also inside the
> >> bootstrap log that nifi restarted itself:
> >>
> >> 2017-12-25 01:31:19,249 WARN [main] org.apache.nifi.bootstrap.RunNiFi
> Apache
> >> NiFi appears to have died. Restarting...
> >>
> >> Is there anything I could do so solve this problem?
> >>
> >> The second problem is about the FlowFiles inside my flow, I actually
> >> implemented a few custom processors to do the ETL work. one is to
> extract
> >> multiple tables from sql server and for each flowfile out of it, it
> contains
> >> an attribute
> >> specifying the name of the temp ods table to create, and the second
> >> processor is to get all flowfiles from the first processor and create
> all
> >> the temp ods tables specified in the flowfiles' attribute.
> >> I found inside the app log that one of the temp table name already
> existed
> >> when trying to create the temp table, and it caused sql exception.
> >> After taking some time investigating in the log, I found the sql query
> was
> >> executed twice in the second processor, once before nifi restart, the
> second
> >> execution was done right after nifi restart:
> >>
> >> 2017-12-25 01:32:35,639 ERROR [Timer-Driven Process Thread-7]
> >> c.z.nifi.processors.ExecuteSqlCommand
> >> ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14]
> 执行sql语句失败:SELECT
> >> TOP 0 * INTO tmp.ods_bd_e_reason_201

The rate of the dataflow is exceeding the provenance recording rate. Slowing down flow to accommodate

2017-12-24 Thread
Hi guys, I'm using nifi 1.4.0 to do some ETL work in my team and I have
encountered 2 problems during my testing.

The first problem is I found the nifi bulletin board was showing the
following warning to me:

2017-12-25 01:31:00,460 WARN [Provenance Maintenance Thread-1]
o.a.n.p.PersistentProvenanceRepository The rate of the dataflow is
exceeding the provenance recording rate. Slowing down flow to accommodate.
Currently, there are 96 journal files (158278228 bytes) and threshold for
blocking is 80 (1181116006 bytes)

I don't quite understand what this means, and I found also inside the
bootstrap log that nifi restarted itself:

2017-12-25 01:31:19,249 WARN [main] org.apache.nifi.bootstrap.RunNiFi
Apache NiFi appears to have died. Restarting...

Is there anything I could do so solve this problem?

The second problem is about the FlowFiles inside my flow, I actually
implemented a few custom processors to do the ETL work. one is to extract
multiple tables from sql server and for each flowfile out of it, it
contains an attribute
specifying the name of the temp ods table to create, and the second
processor is to get all flowfiles from the first processor and create all
the temp ods tables specified in the flowfiles' attribute.
I found inside the app log that one of the temp table name already existed
when trying to create the temp table, and it caused sql exception.
After taking some time investigating in the log, I found the sql query was
executed twice in the second processor, once before nifi restart, the
second execution was done right after nifi restart:

2017-12-25 01:32:35,639 ERROR [Timer-Driven Process Thread-7]
c.z.nifi.processors.ExecuteSqlCommand
ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句失败:SELECT
TOP 0 * INTO tmp.ods_bd_e_reason_20171225013007005_5567 FROM
dbo.ods_bd_e_reason;


I have read the document of nifi in depth but I'm still not very aware of
nifi's internal mechanism, my suspect is nifi didn't manage to checkpoint
the flowfile's state(which queue it was in) in memory into flowfile
repository
before it was dead and after restarting it recovered the flowfile's state
from flowfile repository and then the flowfile went through the second
processor again and thus the sql was executed twice. Is this correct?

I've attached the relevant part of app log, thanks.

Regards,
Ben


Re: Unable to checkpoint FlowFile Repository (No space left on device)

2017-12-14 Thread
Thanks Michael, I could try turning off the content repository archive
feature. One thing I'm curious though is when I watched the content
repository folder when my nifi flow is still actively running, it kept
writing new empty files to the archive folder and some of them are old
flowfiles' content after I checked(only a very small portion, most are
empty files, I used the default archive option of 12 hours and 50%), the
speed of creating these files differ slightly when I tune
the interval of my data extracting processor. The most important thing is
My custom processor that extracts data has no more data to extract at the
moment, so I didn't know what's going on inside NiFI that kept increasing
the use of inodes.

/Ben

2017-12-15 0:27 GMT+08:00 Michael Moser <moser...@gmail.com>:

> Maximum number of inodes is defined when you build a file system on a
> device.  You would have to backup your data, rebuild your file system and
> tell it to allocate more inodes than the default (an mkfs option, I
> think?), then restore your data.
>
> You can turn off the NiFi content_repository archive, if you don't need
> that feature, by setting nifi.content.repository.archive.enabled=false in
> your nifi.properties.
>
> -- Mike
>
>
> On Thu, Dec 14, 2017 at 2:22 AM, 尹文才 <batman...@gmail.com> wrote:
>
> > One Strange thing one of our testers found was that we're using the
> default
> > 12hours archive and 50% disk space configuration, he noticed that when
> nifi
> > removed the archived files inside the content_repository, he checked the
> > inodes ' count didn't go down, then he tried to remove some of the
> archived
> > files inside using the rm command, the inodes's count did go down.
> >
> > /Ben
> >
> > 2017-12-14 9:49 GMT+08:00 尹文才 <batman...@gmail.com>:
> >
> > > Hi Michael, the no space left on device occurred again and I checked
> the
> > > inodes at the time and found it was indeed full, why would the inodes
> > > become full and are there any solutions to get around this problem?
> > Thanks.
> > >
> > > /Ben
> > >
> > > 2017-12-13 13:36 GMT+08:00 尹文才 <batman...@gmail.com>:
> > >
> > >> Hi Michael, I checked the system available inodes by running df -i
> > >> command and there're quite enough inodes in the system. I then removed
> > all
> > >> the files in all repository folders and restarted
> > >> the system, I couldn't see the error again. I will continue to track
> the
> > >> problem to see what's causing it, but it seems not relevant to the
> inode
> > >> use-up reason you mentioned. Thanks.
> > >>
> > >> /Ben
> > >>
> > >> 2017-12-12 23:45 GMT+08:00 Michael Moser <moser...@gmail.com>:
> > >>
> > >>> Greetings Ben,
> > >>>
> > >>> The "No space left on device" error can also be caused by running out
> > of
> > >>> inodes on your device.  You can check this with "df -i".
> > >>>
> > >>> -- Mike
> > >>>
> > >>>
> > >>> On Tue, Dec 12, 2017 at 1:36 AM, 尹文才 <batman...@gmail.com> wrote:
> > >>>
> > >>> > sorry that I forgot to mention the environment that caused this
> > >>> problem,
> > >>> > I'm using the latest nifi 1.4.0 release and installed it on centos
> 7.
> > >>> >
> > >>> > 2017-12-12 14:35 GMT+08:00 尹文才 <batman...@gmail.com>:
> > >>> >
> > >>> > > Hi guys, I'm running into a very weird problem, I wrote a
> processor
> > >>> > > specifically to extract some data
> > >>> > > and I found starting from yesterday it kept showing errors in the
> > >>> log, as
> > >>> > > below:
> > >>> > >
> > >>> > > 2017-12-12 14:01:04,661 INFO [pool-10-thread-1] o.a.n.c.r.
> > >>> > WriteAheadFlowFileRepository
> > >>> > > Initiating checkpoint of FlowFile Repository
> > >>> > > 2017-12-12 14:01:04,676 ERROR [pool-10-thread-1] o.a.n.c.r.
> > >>> > WriteAheadFlowFileRepository
> > >>> > > Unable to checkpoint FlowFile Repository due to
> > >>> > > java.io.FileNotFoundException: ../flowfile_repository/
> > >>> > partition-5/96.journal
> > >>> > > (No space left on device)
> > >>> > > java.io.FileNotFoundException: ../flowfile_reposito

Re: Unable to checkpoint FlowFile Repository (No space left on device)

2017-12-13 Thread
One Strange thing one of our testers found was that we're using the default
12hours archive and 50% disk space configuration, he noticed that when nifi
removed the archived files inside the content_repository, he checked the
inodes ' count didn't go down, then he tried to remove some of the archived
files inside using the rm command, the inodes's count did go down.

/Ben

2017-12-14 9:49 GMT+08:00 尹文才 <batman...@gmail.com>:

> Hi Michael, the no space left on device occurred again and I checked the
> inodes at the time and found it was indeed full, why would the inodes
> become full and are there any solutions to get around this problem? Thanks.
>
> /Ben
>
> 2017-12-13 13:36 GMT+08:00 尹文才 <batman...@gmail.com>:
>
>> Hi Michael, I checked the system available inodes by running df -i
>> command and there're quite enough inodes in the system. I then removed all
>> the files in all repository folders and restarted
>> the system, I couldn't see the error again. I will continue to track the
>> problem to see what's causing it, but it seems not relevant to the inode
>> use-up reason you mentioned. Thanks.
>>
>> /Ben
>>
>> 2017-12-12 23:45 GMT+08:00 Michael Moser <moser...@gmail.com>:
>>
>>> Greetings Ben,
>>>
>>> The "No space left on device" error can also be caused by running out of
>>> inodes on your device.  You can check this with "df -i".
>>>
>>> -- Mike
>>>
>>>
>>> On Tue, Dec 12, 2017 at 1:36 AM, 尹文才 <batman...@gmail.com> wrote:
>>>
>>> > sorry that I forgot to mention the environment that caused this
>>> problem,
>>> > I'm using the latest nifi 1.4.0 release and installed it on centos 7.
>>> >
>>> > 2017-12-12 14:35 GMT+08:00 尹文才 <batman...@gmail.com>:
>>> >
>>> > > Hi guys, I'm running into a very weird problem, I wrote a processor
>>> > > specifically to extract some data
>>> > > and I found starting from yesterday it kept showing errors in the
>>> log, as
>>> > > below:
>>> > >
>>> > > 2017-12-12 14:01:04,661 INFO [pool-10-thread-1] o.a.n.c.r.
>>> > WriteAheadFlowFileRepository
>>> > > Initiating checkpoint of FlowFile Repository
>>> > > 2017-12-12 14:01:04,676 ERROR [pool-10-thread-1] o.a.n.c.r.
>>> > WriteAheadFlowFileRepository
>>> > > Unable to checkpoint FlowFile Repository due to
>>> > > java.io.FileNotFoundException: ../flowfile_repository/
>>> > partition-5/96.journal
>>> > > (No space left on device)
>>> > > java.io.FileNotFoundException: ../flowfile_repository/
>>> > partition-5/96.journal
>>> > > (No space left on device)
>>> > > at java.io.FileOutputStream.open0(Native Method)
>>> > > at java.io.FileOutputStream.open(FileOutputStream.java:270)
>>> > > at java.io.FileOutputStream.>> >(FileOutputStream.java:213)
>>> > > at java.io.FileOutputStream.>> >(FileOutputStream.java:162)
>>> > > at org.wali.MinimalLockingWriteAheadLog$Partition.rollover(
>>> > > MinimalLockingWriteAheadLog.java:779)
>>> > > at org.wali.MinimalLockingWriteAheadLog.checkpoint(
>>> > > MinimalLockingWriteAheadLog.java:528)
>>> > > at org.apache.nifi.controller.repository.
>>> > > WriteAheadFlowFileRepository.checkpoint(WriteAheadFlowFileRe
>>> pository.
>>> > > java:451)
>>> > > at org.apache.nifi.controller.repository.
>>> > > WriteAheadFlowFileRepository$1.run(WriteAheadFlowFileRepository.
>>> > java:423)
>>> > > at java.util.concurrent.Executors$RunnableAdapter.
>>> > > call(Executors.java:511)
>>> > > at java.util.concurrent.FutureTask.runAndReset(
>>> > > FutureTask.java:308)
>>> > > at java.util.concurrent.ScheduledThreadPoolExecutor$
>>> > > ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>> > > at java.util.concurrent.ScheduledThreadPoolExecutor$
>>> > > ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>> > > at java.util.concurrent.ThreadPoolExecutor.runWorker(
>>> > > ThreadPoolExecutor.java:1142)
>>> > > at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>> > > ThreadPoolExecutor.java:617)
>>> > > at java.lang.Thread.run(Thread.java:745)
>>> > >
>>> > >
>>> > > I noticed the log mentioned no space left on device and I went to
>>> check
>>> > > the available space and found 33G left. Does anyone know what could
>>> > > possibly cause this and how to resolve this problem, thanks
>>> > >
>>> > > /Ben
>>> > >
>>> >
>>>
>>
>>
>


Re: Unable to checkpoint FlowFile Repository (No space left on device)

2017-12-13 Thread
Hi Michael, the no space left on device occurred again and I checked the
inodes at the time and found it was indeed full, why would the inodes
become full and are there any solutions to get around this problem? Thanks.

/Ben

2017-12-13 13:36 GMT+08:00 尹文才 <batman...@gmail.com>:

> Hi Michael, I checked the system available inodes by running df -i command
> and there're quite enough inodes in the system. I then removed all the
> files in all repository folders and restarted
> the system, I couldn't see the error again. I will continue to track the
> problem to see what's causing it, but it seems not relevant to the inode
> use-up reason you mentioned. Thanks.
>
> /Ben
>
> 2017-12-12 23:45 GMT+08:00 Michael Moser <moser...@gmail.com>:
>
>> Greetings Ben,
>>
>> The "No space left on device" error can also be caused by running out of
>> inodes on your device.  You can check this with "df -i".
>>
>> -- Mike
>>
>>
>> On Tue, Dec 12, 2017 at 1:36 AM, 尹文才 <batman...@gmail.com> wrote:
>>
>> > sorry that I forgot to mention the environment that caused this problem,
>> > I'm using the latest nifi 1.4.0 release and installed it on centos 7.
>> >
>> > 2017-12-12 14:35 GMT+08:00 尹文才 <batman...@gmail.com>:
>> >
>> > > Hi guys, I'm running into a very weird problem, I wrote a processor
>> > > specifically to extract some data
>> > > and I found starting from yesterday it kept showing errors in the
>> log, as
>> > > below:
>> > >
>> > > 2017-12-12 14:01:04,661 INFO [pool-10-thread-1] o.a.n.c.r.
>> > WriteAheadFlowFileRepository
>> > > Initiating checkpoint of FlowFile Repository
>> > > 2017-12-12 14:01:04,676 ERROR [pool-10-thread-1] o.a.n.c.r.
>> > WriteAheadFlowFileRepository
>> > > Unable to checkpoint FlowFile Repository due to
>> > > java.io.FileNotFoundException: ../flowfile_repository/
>> > partition-5/96.journal
>> > > (No space left on device)
>> > > java.io.FileNotFoundException: ../flowfile_repository/
>> > partition-5/96.journal
>> > > (No space left on device)
>> > > at java.io.FileOutputStream.open0(Native Method)
>> > > at java.io.FileOutputStream.open(FileOutputStream.java:270)
>> > > at java.io.FileOutputStream.(FileOutputStream.java:213)
>> > > at java.io.FileOutputStream.(FileOutputStream.java:162)
>> > > at org.wali.MinimalLockingWriteAheadLog$Partition.rollover(
>> > > MinimalLockingWriteAheadLog.java:779)
>> > > at org.wali.MinimalLockingWriteAheadLog.checkpoint(
>> > > MinimalLockingWriteAheadLog.java:528)
>> > > at org.apache.nifi.controller.repository.
>> > > WriteAheadFlowFileRepository.checkpoint(WriteAheadFlowFileRepository.
>> > > java:451)
>> > > at org.apache.nifi.controller.repository.
>> > > WriteAheadFlowFileRepository$1.run(WriteAheadFlowFileRepository.
>> > java:423)
>> > > at java.util.concurrent.Executors$RunnableAdapter.
>> > > call(Executors.java:511)
>> > > at java.util.concurrent.FutureTask.runAndReset(
>> > > FutureTask.java:308)
>> > > at java.util.concurrent.ScheduledThreadPoolExecutor$
>> > > ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>> > > at java.util.concurrent.ScheduledThreadPoolExecutor$
>> > > ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>> > > at java.util.concurrent.ThreadPoolExecutor.runWorker(
>> > > ThreadPoolExecutor.java:1142)
>> > > at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>> > > ThreadPoolExecutor.java:617)
>> > > at java.lang.Thread.run(Thread.java:745)
>> > >
>> > >
>> > > I noticed the log mentioned no space left on device and I went to
>> check
>> > > the available space and found 33G left. Does anyone know what could
>> > > possibly cause this and how to resolve this problem, thanks
>> > >
>> > > /Ben
>> > >
>> >
>>
>
>


Re: Unable to checkpoint FlowFile Repository (No space left on device)

2017-12-12 Thread
Hi Michael, I checked the system available inodes by running df -i command
and there're quite enough inodes in the system. I then removed all the
files in all repository folders and restarted
the system, I couldn't see the error again. I will continue to track the
problem to see what's causing it, but it seems not relevant to the inode
use-up reason you mentioned. Thanks.

/Ben

2017-12-12 23:45 GMT+08:00 Michael Moser <moser...@gmail.com>:

> Greetings Ben,
>
> The "No space left on device" error can also be caused by running out of
> inodes on your device.  You can check this with "df -i".
>
> -- Mike
>
>
> On Tue, Dec 12, 2017 at 1:36 AM, 尹文才 <batman...@gmail.com> wrote:
>
> > sorry that I forgot to mention the environment that caused this problem,
> > I'm using the latest nifi 1.4.0 release and installed it on centos 7.
> >
> > 2017-12-12 14:35 GMT+08:00 尹文才 <batman...@gmail.com>:
> >
> > > Hi guys, I'm running into a very weird problem, I wrote a processor
> > > specifically to extract some data
> > > and I found starting from yesterday it kept showing errors in the log,
> as
> > > below:
> > >
> > > 2017-12-12 14:01:04,661 INFO [pool-10-thread-1] o.a.n.c.r.
> > WriteAheadFlowFileRepository
> > > Initiating checkpoint of FlowFile Repository
> > > 2017-12-12 14:01:04,676 ERROR [pool-10-thread-1] o.a.n.c.r.
> > WriteAheadFlowFileRepository
> > > Unable to checkpoint FlowFile Repository due to
> > > java.io.FileNotFoundException: ../flowfile_repository/
> > partition-5/96.journal
> > > (No space left on device)
> > > java.io.FileNotFoundException: ../flowfile_repository/
> > partition-5/96.journal
> > > (No space left on device)
> > > at java.io.FileOutputStream.open0(Native Method)
> > > at java.io.FileOutputStream.open(FileOutputStream.java:270)
> > > at java.io.FileOutputStream.(FileOutputStream.java:213)
> > > at java.io.FileOutputStream.(FileOutputStream.java:162)
> > > at org.wali.MinimalLockingWriteAheadLog$Partition.rollover(
> > > MinimalLockingWriteAheadLog.java:779)
> > > at org.wali.MinimalLockingWriteAheadLog.checkpoint(
> > > MinimalLockingWriteAheadLog.java:528)
> > > at org.apache.nifi.controller.repository.
> > > WriteAheadFlowFileRepository.checkpoint(WriteAheadFlowFileRepository.
> > > java:451)
> > > at org.apache.nifi.controller.repository.
> > > WriteAheadFlowFileRepository$1.run(WriteAheadFlowFileRepository.
> > java:423)
> > > at java.util.concurrent.Executors$RunnableAdapter.
> > > call(Executors.java:511)
> > > at java.util.concurrent.FutureTask.runAndReset(
> > > FutureTask.java:308)
> > > at java.util.concurrent.ScheduledThreadPoolExecutor$
> > > ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> > > at java.util.concurrent.ScheduledThreadPoolExecutor$
> > > ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> > > at java.util.concurrent.ThreadPoolExecutor.runWorker(
> > > ThreadPoolExecutor.java:1142)
> > > at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> > > ThreadPoolExecutor.java:617)
> > > at java.lang.Thread.run(Thread.java:745)
> > >
> > >
> > > I noticed the log mentioned no space left on device and I went to check
> > > the available space and found 33G left. Does anyone know what could
> > > possibly cause this and how to resolve this problem, thanks
> > >
> > > /Ben
> > >
> >
>


Re: Unable to checkpoint FlowFile Repository (No space left on device)

2017-12-11 Thread
sorry that I forgot to mention the environment that caused this problem,
I'm using the latest nifi 1.4.0 release and installed it on centos 7.

2017-12-12 14:35 GMT+08:00 尹文才 <batman...@gmail.com>:

> Hi guys, I'm running into a very weird problem, I wrote a processor
> specifically to extract some data
> and I found starting from yesterday it kept showing errors in the log, as
> below:
>
> 2017-12-12 14:01:04,661 INFO [pool-10-thread-1] 
> o.a.n.c.r.WriteAheadFlowFileRepository
> Initiating checkpoint of FlowFile Repository
> 2017-12-12 14:01:04,676 ERROR [pool-10-thread-1] 
> o.a.n.c.r.WriteAheadFlowFileRepository
> Unable to checkpoint FlowFile Repository due to
> java.io.FileNotFoundException: ../flowfile_repository/partition-5/96.journal
> (No space left on device)
> java.io.FileNotFoundException: ../flowfile_repository/partition-5/96.journal
> (No space left on device)
> at java.io.FileOutputStream.open0(Native Method)
> at java.io.FileOutputStream.open(FileOutputStream.java:270)
> at java.io.FileOutputStream.(FileOutputStream.java:213)
> at java.io.FileOutputStream.(FileOutputStream.java:162)
> at org.wali.MinimalLockingWriteAheadLog$Partition.rollover(
> MinimalLockingWriteAheadLog.java:779)
> at org.wali.MinimalLockingWriteAheadLog.checkpoint(
> MinimalLockingWriteAheadLog.java:528)
> at org.apache.nifi.controller.repository.
> WriteAheadFlowFileRepository.checkpoint(WriteAheadFlowFileRepository.
> java:451)
> at org.apache.nifi.controller.repository.
> WriteAheadFlowFileRepository$1.run(WriteAheadFlowFileRepository.java:423)
> at java.util.concurrent.Executors$RunnableAdapter.
> call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(
> FutureTask.java:308)
> at java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
> I noticed the log mentioned no space left on device and I went to check
> the available space and found 33G left. Does anyone know what could
> possibly cause this and how to resolve this problem, thanks
>
> /Ben
>


Unable to checkpoint FlowFile Repository (No space left on device)

2017-12-11 Thread
Hi guys, I'm running into a very weird problem, I wrote a processor
specifically to extract some data
and I found starting from yesterday it kept showing errors in the log, as
below:

2017-12-12 14:01:04,661 INFO [pool-10-thread-1]
o.a.n.c.r.WriteAheadFlowFileRepository Initiating checkpoint of FlowFile
Repository
2017-12-12 14:01:04,676 ERROR [pool-10-thread-1]
o.a.n.c.r.WriteAheadFlowFileRepository Unable to checkpoint FlowFile
Repository due to java.io.FileNotFoundException:
../flowfile_repository/partition-5/96.journal (No space left on device)
java.io.FileNotFoundException:
../flowfile_repository/partition-5/96.journal (No space left on device)
at java.io.FileOutputStream.open0(Native Method)
at java.io.FileOutputStream.open(FileOutputStream.java:270)
at java.io.FileOutputStream.(FileOutputStream.java:213)
at java.io.FileOutputStream.(FileOutputStream.java:162)
at
org.wali.MinimalLockingWriteAheadLog$Partition.rollover(MinimalLockingWriteAheadLog.java:779)
at
org.wali.MinimalLockingWriteAheadLog.checkpoint(MinimalLockingWriteAheadLog.java:528)
at
org.apache.nifi.controller.repository.WriteAheadFlowFileRepository.checkpoint(WriteAheadFlowFileRepository.java:451)
at
org.apache.nifi.controller.repository.WriteAheadFlowFileRepository$1.run(WriteAheadFlowFileRepository.java:423)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


I noticed the log mentioned no space left on device and I went to check the
available space and found 33G left. Does anyone know what could possibly
cause this and how to resolve this problem, thanks

/Ben


Re: save variables to template when creating a template

2017-10-11 Thread
Thanks Matt, how should I transfer the variable registry, are these
variables saved in a file somewhere?

/Ben

2017-10-12 0:58 GMT+08:00 Matt Burgess <mattyb...@apache.org>:

> Ben,
>
> The variables are separate from the template by design, this is so you
> can provide a variable registry in dev/test and a different one for
> production, for example. If you want to keep the variable you would
> likely want to transfer the variable registry along with the template
> to the other NiFi instance.
>
> Regards,
> Matt
>
> On Tue, Oct 10, 2017 at 11:45 PM, 尹文才 <batman...@gmail.com> wrote:
> > Hi guys, I've been using the new variable registry feature in 1.4.0 and I
> > like it very much. But when I define a variable in a processor group and
> > try to create a template out out it, I noticed that the variable I
> defined
> > is not included in the template after I import the template to another
> NIFI
> > instance. Does anyone know if it's possible to keep the variable with the
> > template? Thanks.
> >
> > /Ben
>


save variables to template when creating a template

2017-10-10 Thread
Hi guys, I've been using the new variable registry feature in 1.4.0 and I
like it very much. But when I define a variable in a processor group and
try to create a template out out it, I noticed that the variable I defined
is not included in the template after I import the template to another NIFI
instance. Does anyone know if it's possible to keep the variable with the
template? Thanks.

/Ben


Re: route flow based on variable

2017-10-09 Thread
Thanks Bryan, I tried after reading your reply, I could use the variable
directly in RouteOnAttribute, I thought I could only use attributes of a
FlowFile in RouteOnAttribute, thanks.
/Ben

2017-10-09 19:24 GMT+08:00 Bryan Bende <bbe...@gmail.com>:

> Ben,
>
> 1) Yes, the variables are hierarchical, so a variable at the root group
> would be visible to all components, unless there is a variable with the
> same name at a lower level which would override it.
>
> 2) I haven’t tried this, but I would expect that you should still be able
> to use RouteOnAttribute to route on a variable… Lets say that your root
> group has a variable “env” and in one environment you have this set to
> “dev” and in another environment you have it set to “prod”. You might have
> a part of your flow that you only run in “prod” so you put a
> RouteOnAttribute with something like ${env:equals(“prod”)} which would only
> enable this path in prod.
>
> Variables on their own are not associated with flow files, so if you
> wanted to do something more specific per-flow file, then the only way would
> be what you described with using UpdateAttribute.
>
> Thanks,
>
> Bryan
>
> > On Oct 8, 2017, at 10:44 PM, 尹文才 <batman...@gmail.com> wrote:
> >
> > Hi guys, I've played around with the latest NIFI 1.4.0 release for a
> while
> > and I think the new variable registry feature is great, however I have 2
> > questions about this feature:
> >
> > 1. It seems that I could only add variables to a processor group, could I
> > add a global variable in the NIFI root processor group so it could be
> used
> > anywhere inside NIFI?
> >
> > 2. I want to route to different flows based on the variables I added, but
> > currently the only way I know that could make this work is like this:
> > myprocessor->updateAttribute(add the variable into FlowFile attribute)->
> > routeOnAttribute->different flows based on the variable value
> >
> > I didn't find any routeOnVariable processor, is there any easier way
> that I
> > could use to implement conditional flow in NIFI? Thanks
> >
> > /Ben
>
>


route flow based on variable

2017-10-08 Thread
Hi guys, I've played around with the latest NIFI 1.4.0 release for a while
and I think the new variable registry feature is great, however I have 2
questions about this feature:

1. It seems that I could only add variables to a processor group, could I
add a global variable in the NIFI root processor group so it could be used
anywhere inside NIFI?

2. I want to route to different flows based on the variables I added, but
currently the only way I know that could make this work is like this:
myprocessor->updateAttribute(add the variable into FlowFile attribute)->
routeOnAttribute->different flows based on the variable value

I didn't find any routeOnVariable processor, is there any easier way that I
could use to implement conditional flow in NIFI? Thanks

/Ben


Re: NIFI templates in template folder in sync with templates inside NIFI UI

2017-09-15 Thread
Thanks Matt, I could make use of the rest api to automate the template sync
work.

2017-09-15 1:00 GMT+08:00 Matt Gilman <matt.c.gil...@gmail.com>:

> Ben,
>
> In the 0.x baseline, the templates were stored in the templates directory
> that you're referring to. Starting in the 1.x baseline, the templates were
> migrated to become part of the flow.xml.gz. In order to support users
> upgrading from 0.x to 1.x, templates in the directory are automatically
> moved to the flow.xml.gz. If you're looking to automate template
> importing/removal, I would recommend using the REST API. If you open up
> your browser you should be able to see these requests in action.
> Additionally, you can find documentation for them here [1]. The
> import/upload endpoints are under Process Groups and the removal endpoints
> are under Templates.
>
> Thanks
>
> Matt
>
> [1] https://nifi.apache.org/docs/nifi-docs/rest-api/index.html
>
> On Thu, Sep 14, 2017 at 2:40 AM, 尹文才 <batman...@gmail.com> wrote:
>
> > Hi guys, I put all my flow template xml files inside the nifi template
> > directory specified inside the nifi.properties config file so that nifi
> > could read in all my templates. But sometimes I need to clear all the
> > templates currently inside nifi and then place in a new template, I would
> > remove all the template xml files inside the template folder and put the
> > new template xml file into it. But when I restart nifi, there're 5
> > templates available, which are 4 old templates and 1 new template.
> >
> > I know I could manually remove all templates inside nifi, but I wish to
> do
> > this all programmatically. I also know nifi keeps the templates inside
> the
> > flow.xml.gz file.
> > So does nifi simply adds the new templates in the template folder into
> the
> > flow.xml.gz file without removing the old ones?
> >
> > I have used beyond compare trying to compare the template xml file
> content
> > with the template section inside the flow.xml extracted from flow.xml.gz,
> > they're slightly differently and most content are the same, does anyone
> > know the relationship between these two?
> >
> > What I want to achieve is to keep the templates in the template folder
> and
> > the templates in nifi in sync without having to manually  do anything in
> > nifi UI, does
> > anyone know if it's possible and if so how? Thanks.
> >
> > /Ben
> >
>


NIFI templates in template folder in sync with templates inside NIFI UI

2017-09-14 Thread
Hi guys, I put all my flow template xml files inside the nifi template
directory specified inside the nifi.properties config file so that nifi
could read in all my templates. But sometimes I need to clear all the
templates currently inside nifi and then place in a new template, I would
remove all the template xml files inside the template folder and put the
new template xml file into it. But when I restart nifi, there're 5
templates available, which are 4 old templates and 1 new template.

I know I could manually remove all templates inside nifi, but I wish to do
this all programmatically. I also know nifi keeps the templates inside the
flow.xml.gz file.
So does nifi simply adds the new templates in the template folder into the
flow.xml.gz file without removing the old ones?

I have used beyond compare trying to compare the template xml file content
with the template section inside the flow.xml extracted from flow.xml.gz,
they're slightly differently and most content are the same, does anyone
know the relationship between these two?

What I want to achieve is to keep the templates in the template folder and
the templates in nifi in sync without having to manually  do anything in
nifi UI, does
anyone know if it's possible and if so how? Thanks.

/Ben


Re: how to execute code when processor is stopping

2017-08-14 Thread
Thanks Koji, this is exactly what I'm looking for.

Regards,
Ben

2017-08-14 12:21 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>:

> Hi Ben,
>
> AbstractSessionFactoryProcessor has a protected isScheduled() method,
> that can be used by a processor implementation class to check whether
> it is still being scheduled (not being stopped).
> For an example, ConsumeKafka_0_10.onTrigger uses it with while loop:
> https://github.com/apache/nifi/blob/master/nifi-nar-
> bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/
> src/main/java/org/apache/nifi/processors/kafka/pubsub/
> ConsumeKafka_0_10.java#L316
>
> Thanks,
> Koji
>
> On Mon, Aug 14, 2017 at 11:12 AM, 尹文才 <batman...@gmail.com> wrote:
> > Hi guys, about my case, I have another question, if I implement the retry
> > logic inside the ontrigger method and I need to retry until the database
> > connection is back online, in case user needs to stop the processor in
> NIFI
> > UI while the database is still offline, according to my understanding
> > ontrigger will keep executing the retry logic and the processor couldn't
> be
> > stopped even if user tries to stop it, is there any way to solve this
> > problem? Thanks.
> >
> > Regards,
> > Ben
> >
> > 2017-08-12 6:30 GMT+08:00 尹文才 <batman...@gmail.com>:
> >
> >> Hi Bryan and Matt, thanks for all your suggestions, I was trying to make
> >> sure that the OnUnscheduled method was not called too frequently when
> the
> >> connection is offline.
> >> You guys were right, these sort of logic should not be placed inside the
> >> scheduling methods, I need to refactor my code to place them into
> onTrigger.
> >>
> >> Regards,
> >> Ben
> >>
> >> 2017-08-12 0:53 GMT+08:00 Matt Burgess <mattyb...@apache.org>:
> >>
> >>> I'm a fan of Bryan's last suggestion. For dynamic/automatic retry
> >>> (such as database connection retries), I recommend putting the
> >>> connection logic in the onTrigger() method. If you can check
> >>> connectivity, then your onTrigger() would know whether it needs to try
> >>> to reconnect before it does any work. If it tries to reconnect and is
> >>> unsuccessful, you can yield the processor if you want, so as not to
> >>> hammer the DB with connection attempts. The CaptureChangeMySQL
> >>> processor does this, it has a retry loop for trying various nodes in a
> >>> MySQL cluster, but once it's connected, it goes on about its work, and
> >>> if a connection fails, it will retry the connection loop before it
> >>> does any more work. It only uses onTrigger and none of the scheduling
> >>> stuff.
> >>>
> >>> Regards,
> >>> Matt
> >>>
> >>> On Fri, Aug 11, 2017 at 11:06 AM, Bryan Bende <bbe...@gmail.com>
> wrote:
> >>> > Ben,
> >>> >
> >>> > I apologize if I am not understanding the situation, but...
> >>> >
> >>> > In the case where your OnScheduled code is in a retry loop, if
> someone
> >>> > stops the processor it will call your OnUnscheduled code which will
> >>> > set the flag to bounce out of the loop. This sounds like what you
> >>> > want, right?
> >>> >
> >>> > In the case where OnScheduled times out, the framework is calling
> >>> > OnUnscheduled which would call your code to set the flag, but
> wouldn't
> >>> > that not matter at this point because you aren't looping anymore
> >>> > anyway?
> >>> >
> >>> > If the framework calls OnScheduled again, your code should set the
> >>> > flag back to whatever it needs to be to start looping again right?
> >>> >
> >>> > An alternative that might avoid some of this would be to lazily
> >>> > initialize the connection in the onTrigger method of the processor.
> >>> >
> >>> > -Bryan
> >>> >
> >>> >
> >>> > On Fri, Aug 11, 2017 at 9:16 AM, 尹文才 <batman...@gmail.com> wrote:
> >>> >> thanks Pierre, my case is that I need to implement a database
> >>> connection
> >>> >> retry logic inside my OnScheduled method, when the database is not
> >>> >> available I will retry until the connection is back online.
> >>> >> The problem is when the database is offline it will throw timed out
> >>> >> execution exce

Re: how to execute code when processor is stopping

2017-08-13 Thread
Hi guys, about my case, I have another question, if I implement the retry
logic inside the ontrigger method and I need to retry until the database
connection is back online, in case user needs to stop the processor in NIFI
UI while the database is still offline, according to my understanding
ontrigger will keep executing the retry logic and the processor couldn't be
stopped even if user tries to stop it, is there any way to solve this
problem? Thanks.

Regards,
Ben

2017-08-12 6:30 GMT+08:00 尹文才 <batman...@gmail.com>:

> Hi Bryan and Matt, thanks for all your suggestions, I was trying to make
> sure that the OnUnscheduled method was not called too frequently when the
> connection is offline.
> You guys were right, these sort of logic should not be placed inside the
> scheduling methods, I need to refactor my code to place them into onTrigger.
>
> Regards,
> Ben
>
> 2017-08-12 0:53 GMT+08:00 Matt Burgess <mattyb...@apache.org>:
>
>> I'm a fan of Bryan's last suggestion. For dynamic/automatic retry
>> (such as database connection retries), I recommend putting the
>> connection logic in the onTrigger() method. If you can check
>> connectivity, then your onTrigger() would know whether it needs to try
>> to reconnect before it does any work. If it tries to reconnect and is
>> unsuccessful, you can yield the processor if you want, so as not to
>> hammer the DB with connection attempts. The CaptureChangeMySQL
>> processor does this, it has a retry loop for trying various nodes in a
>> MySQL cluster, but once it's connected, it goes on about its work, and
>> if a connection fails, it will retry the connection loop before it
>> does any more work. It only uses onTrigger and none of the scheduling
>> stuff.
>>
>> Regards,
>> Matt
>>
>> On Fri, Aug 11, 2017 at 11:06 AM, Bryan Bende <bbe...@gmail.com> wrote:
>> > Ben,
>> >
>> > I apologize if I am not understanding the situation, but...
>> >
>> > In the case where your OnScheduled code is in a retry loop, if someone
>> > stops the processor it will call your OnUnscheduled code which will
>> > set the flag to bounce out of the loop. This sounds like what you
>> > want, right?
>> >
>> > In the case where OnScheduled times out, the framework is calling
>> > OnUnscheduled which would call your code to set the flag, but wouldn't
>> > that not matter at this point because you aren't looping anymore
>> > anyway?
>> >
>> > If the framework calls OnScheduled again, your code should set the
>> > flag back to whatever it needs to be to start looping again right?
>> >
>> > An alternative that might avoid some of this would be to lazily
>> > initialize the connection in the onTrigger method of the processor.
>> >
>> > -Bryan
>> >
>> >
>> > On Fri, Aug 11, 2017 at 9:16 AM, 尹文才 <batman...@gmail.com> wrote:
>> >> thanks Pierre, my case is that I need to implement a database
>> connection
>> >> retry logic inside my OnScheduled method, when the database is not
>> >> available I will retry until the connection is back online.
>> >> The problem is when the database is offline it will throw timed out
>> >> execution exception inside OnScheduled and then call OnUnscheduled. But
>> >> when I manually stop the processor the OnUnsheduled
>> >> will also get called. I know my logic sounds a little weird but I need
>> to
>> >> set some flag in the OnUnscheduled method to stop the retry logic
>> inside
>> >> OnScheduled in order to be able to stop the processor,
>> >> otherwise the processor is not able to be stopped unless I restart the
>> >> whole NIFI.
>> >>
>> >> Regards,
>> >> Ben
>> >>
>> >> 2017-08-11 17:18 GMT+08:00 Pierre Villard <pierre.villard...@gmail.com
>> >:
>> >>
>> >>> Oh OK, get it now!
>> >>>
>> >>> Not sure what's your use case, but I don't think you can do that
>> unless you
>> >>> set some information when the process actually executes onTrigger for
>> the
>> >>> first time and you then check this value in your OnUnscheduled
>> annotated
>> >>> method.
>> >>>
>> >>> Pierre
>> >>>
>> >>> 2017-08-11 10:11 GMT+02:00 尹文才 <batman...@gmail.com>:
>> >>>
>> >>> > Hi Pierre, I've checked the developer guide before I sent the email
>> and
>> >>> > accordin

Re: how to execute code when processor is stopping

2017-08-11 Thread
Hi Bryan and Matt, thanks for all your suggestions, I was trying to make
sure that the OnUnscheduled method was not called too frequently when the
connection is offline.
You guys were right, these sort of logic should not be placed inside the
scheduling methods, I need to refactor my code to place them into onTrigger.

Regards,
Ben

2017-08-12 0:53 GMT+08:00 Matt Burgess <mattyb...@apache.org>:

> I'm a fan of Bryan's last suggestion. For dynamic/automatic retry
> (such as database connection retries), I recommend putting the
> connection logic in the onTrigger() method. If you can check
> connectivity, then your onTrigger() would know whether it needs to try
> to reconnect before it does any work. If it tries to reconnect and is
> unsuccessful, you can yield the processor if you want, so as not to
> hammer the DB with connection attempts. The CaptureChangeMySQL
> processor does this, it has a retry loop for trying various nodes in a
> MySQL cluster, but once it's connected, it goes on about its work, and
> if a connection fails, it will retry the connection loop before it
> does any more work. It only uses onTrigger and none of the scheduling
> stuff.
>
> Regards,
> Matt
>
> On Fri, Aug 11, 2017 at 11:06 AM, Bryan Bende <bbe...@gmail.com> wrote:
> > Ben,
> >
> > I apologize if I am not understanding the situation, but...
> >
> > In the case where your OnScheduled code is in a retry loop, if someone
> > stops the processor it will call your OnUnscheduled code which will
> > set the flag to bounce out of the loop. This sounds like what you
> > want, right?
> >
> > In the case where OnScheduled times out, the framework is calling
> > OnUnscheduled which would call your code to set the flag, but wouldn't
> > that not matter at this point because you aren't looping anymore
> > anyway?
> >
> > If the framework calls OnScheduled again, your code should set the
> > flag back to whatever it needs to be to start looping again right?
> >
> > An alternative that might avoid some of this would be to lazily
> > initialize the connection in the onTrigger method of the processor.
> >
> > -Bryan
> >
> >
> > On Fri, Aug 11, 2017 at 9:16 AM, 尹文才 <batman...@gmail.com> wrote:
> >> thanks Pierre, my case is that I need to implement a database connection
> >> retry logic inside my OnScheduled method, when the database is not
> >> available I will retry until the connection is back online.
> >> The problem is when the database is offline it will throw timed out
> >> execution exception inside OnScheduled and then call OnUnscheduled. But
> >> when I manually stop the processor the OnUnsheduled
> >> will also get called. I know my logic sounds a little weird but I need
> to
> >> set some flag in the OnUnscheduled method to stop the retry logic inside
> >> OnScheduled in order to be able to stop the processor,
> >> otherwise the processor is not able to be stopped unless I restart the
> >> whole NIFI.
> >>
> >> Regards,
> >> Ben
> >>
> >> 2017-08-11 17:18 GMT+08:00 Pierre Villard <pierre.villard...@gmail.com
> >:
> >>
> >>> Oh OK, get it now!
> >>>
> >>> Not sure what's your use case, but I don't think you can do that
> unless you
> >>> set some information when the process actually executes onTrigger for
> the
> >>> first time and you then check this value in your OnUnscheduled
> annotated
> >>> method.
> >>>
> >>> Pierre
> >>>
> >>> 2017-08-11 10:11 GMT+02:00 尹文才 <batman...@gmail.com>:
> >>>
> >>> > Hi Pierre, I've checked the developer guide before I sent the email
> and
> >>> > according to the developer guide, the method annotated with
> OnUnScheduled
> >>> > will be called in 2 cases according to my understanding, please
> correct
> >>> me
> >>> > if I'm wrong:
> >>> > 1. when user tries to stop the processor in the NIFI UI, thus the
> >>> processor
> >>> > is no longer scheduled to run in this case, and the method will be
> >>> called.
> >>> > 2. when method annotated with OnScheduled throws exceptions, for
> example
> >>> > time out execution exception, the OnUnScheduled method will also be
> >>> called.
> >>> >
> >>> > My question is how to tell the first scenario from the second one?
> >>> Thanks.
> >>> >
> >>> > Regards,
> >>> > Ben
> >&

Re: Assigning JIRAs to myself

2017-08-11 Thread
Hi Aldrin, I would also like to be added to the list of NIFI contributors
to start working on some easy JIRAS, thanks.

Regards,
Ben

2017-08-11 20:03 GMT+08:00 Aldrin Piri :

> Hi Arun,
>
> I've added you to the list of contributors and you should be able to self
> assign issues.  Please let us know if you are having any issues.
>
> Thanks and look forward to the contributions!
>
> On Fri, Aug 11, 2017 at 7:59 AM, Arun Manivannan  wrote:
>
> > Hi,
> >
> > Good morning.
> >
> > I would like to try to solve some beginner level issues.
> >
> > Have read the developer's guide and the contributor guide.
> >
> > How can I assign an issue to myself on the JIRA?  Great work on the
> > "beginner" tag.  Thanks !
> >
> >
> > Cheers,
> > Arun
> >
>


Re: how to execute code when processor is stopping

2017-08-11 Thread
Hi Pierre, I've checked the developer guide before I sent the email and
according to the developer guide, the method annotated with OnUnScheduled
will be called in 2 cases according to my understanding, please correct me
if I'm wrong:
1. when user tries to stop the processor in the NIFI UI, thus the processor
is no longer scheduled to run in this case, and the method will be called.
2. when method annotated with OnScheduled throws exceptions, for example
time out execution exception, the OnUnScheduled method will also be called.

My question is how to tell the first scenario from the second one? Thanks.

Regards,
Ben

2017-08-11 15:51 GMT+08:00 Pierre Villard <pierre.villard...@gmail.com>:

> Hi Ben,
>
> You might want to have a look here:
> https://nifi.apache.org/docs/nifi-docs/html/developer-
> guide.html#component-lifecycle
>
> Pierre
>
> 2017-08-11 9:06 GMT+02:00 尹文才 <batman...@gmail.com>:
>
> > Hi guys, I'm trying to execute some code in my processor when the
> processor
> > is asked to stop in the NIFI UI by the user, I checked the developer
> guide
> > and only find OnUnscheduled will be called when the processor is no long
> > scheduled to run. I've tested this OnUnscheduled, it will also be called
> > after timed out executing OnScheduled task. So is there a way to execute
> > some code only when the processor is stopping?
> >
> > Regards,
> > Ben
> >
>


how to execute code when processor is stopping

2017-08-11 Thread
Hi guys, I'm trying to execute some code in my processor when the processor
is asked to stop in the NIFI UI by the user, I checked the developer guide
and only find OnUnscheduled will be called when the processor is no long
scheduled to run. I've tested this OnUnscheduled, it will also be called
after timed out executing OnScheduled task. So is there a way to execute
some code only when the processor is stopping?

Regards,
Ben


Re: get controller service's configuration

2017-08-11 Thread
Thanks for your explanation Bryan, it seems that I could only get the DBCP
controller service's driver name after something like getDrivername() is
added into the current DBCPService interface.

Regards,
Ben

2017-08-10 22:01 GMT+08:00 Bryan Bende <bbe...@gmail.com>:

> The way controller services are setup you have the following...
>
> - DBCPService interface (provides getConnection()) extends
> ControllerService interface (empty interface to indicate it is a CS)
> - DBCPConnectionPool extends AbstractControllerService implements
> DBCPService
> - Processor XYZ depends on DBCPService interface
>
> The DBCPService interface is the common point between the processor
> and the implementations. The processor XYZ classpath only knows about
> the DBCPService interface, it doesn't know anything about the classes
> that implement it... there could actually be several implementations
> in different NARs, but it is up to the framework to provide access to
> these.
>
> Since the processor only depends on the interface, which in this case
> only exposes getConnection(), you can't really assume the service has
> certain properties because DBCPConnectionPool.DB_DRIVERNAME is
> specific to the DBCPConnectionPool implementation... another
> implementation may not have that property, or may call it something
> different. The interface would have to provide getDriverName() so that
> each implementation could provide that.
>
> -Bryan
>
>
> On Thu, Aug 10, 2017 at 4:33 AM, 尹文才 <batman...@gmail.com> wrote:
> > Thanks Andy, I've tried your approach, in my case the controller service
> is
> > a DBCPConnectionPool and when I tried to get driver class name property
> > through context.getProperty(DBCPConnectionPool.DB_
> DRIVERNAME).getValue(),
> > but I the value is null. The AbstractControllerService class does have a
> > method getConfigurationContext() to get configuration context, but the
> > method is protected. So I still didn't find a feasible way to get the
> > controller service's properties.
> >
> > Regards,
> > Ben
> >
> > 2017-08-10 12:18 GMT+08:00 Andy LoPresto <alopre...@apache.org>:
> >
> >> You can get the current property values of a controller service from the
> >> processor by using the ProcessContext object. For example, in GetHTTP
> [1],
> >> in the @OnScheduled method, you could do:
> >>
> >> context.getControllerServiceLookup().getControllerService("my-
> >> controller-service-id”);
> >>
> >> context.getProperty("controller-service-property-name");
> >> context.getProperty(SomeControllerService.
> CONSTANT_PROPERTY_DESCRIPTOR);
> >>
> >> I forget if context.getProperty() will give the controller service
> >> properties as well as the processor properties. If it doesn’t, you can
> cast
> >> the retrieved ControllerService into AbstractControllerService or the
> >> concrete class and access available properties directly from the
> >> encapsulated ConfigurationContext.
> >>
> >> [1] https://github.com/apache/nifi/blob/master/nifi-nar-
> >> bundles/nifi-standard-bundle/nifi-standard-processors/src/
> >> main/java/org/apache/nifi/processors/standard/GetHTTP.java#L295
> >>
> >> Andy LoPresto
> >> alopre...@apache.org
> >> *alopresto.apa...@gmail.com <alopresto.apa...@gmail.com>*
> >> PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69
> >>
> >> On Aug 9, 2017, at 6:57 PM, 尹文才 <batman...@gmail.com> wrote:
> >>
> >> Thanks Koji, I checked the link you provided and I think getting a
> >> DataSource is no different than getting the DBCP service(they could just
> >> get the connection). Actually I was trying to get the configured driver
> >> class to check the database type.
> >>
> >> Regards,
> >> Ben
> >>
> >> 2017-08-10 9:29 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>:
> >>
> >> Hi Ben,
> >>
> >> I'm not aware of ways to obtain configurations of a controller from a
> >> processor. Those should be encapsulated inside a controller service.
> >> If you'd like to create DataSource instance instead of just obtaining
> >> a connection, this discussion might be helpful:
> >> https://github.com/apache/nifi/pull/1417
> >>
> >> Although I would not recommend, if you really need to obtain all
> >> configurations, you can do so by calling NiFi REST API from your
> >> processor.
> >>
> >> Thanks,
> >> Koji
> >>
> >> On Thu, Aug 10, 2017 at 10:09 AM, 尹文才 <batman...@gmail.com> wrote:
> >>
> >> Hi guys, I have a customized processor with a DBCP controller service as
> >>
> >> a
> >>
> >> property. I could get the DBCP controller service in my code, but does
> >> anyone know how to obtain all the configurations of the DBCP controller
> >> service in java code(e.g. Database Connection URL, Database Driver
> >> Location, etc) Thanks.
> >>
> >> Regards,
> >> Ben
> >>
> >>
> >>
> >>
>


Re: get controller service's configuration

2017-08-10 Thread
Thanks Andy, I've tried your approach, in my case the controller service is
a DBCPConnectionPool and when I tried to get driver class name property
through context.getProperty(DBCPConnectionPool.DB_DRIVERNAME).getValue(),
but I the value is null. The AbstractControllerService class does have a
method getConfigurationContext() to get configuration context, but the
method is protected. So I still didn't find a feasible way to get the
controller service's properties.

Regards,
Ben

2017-08-10 12:18 GMT+08:00 Andy LoPresto <alopre...@apache.org>:

> You can get the current property values of a controller service from the
> processor by using the ProcessContext object. For example, in GetHTTP [1],
> in the @OnScheduled method, you could do:
>
> context.getControllerServiceLookup().getControllerService("my-
> controller-service-id”);
>
> context.getProperty("controller-service-property-name");
> context.getProperty(SomeControllerService.CONSTANT_PROPERTY_DESCRIPTOR);
>
> I forget if context.getProperty() will give the controller service
> properties as well as the processor properties. If it doesn’t, you can cast
> the retrieved ControllerService into AbstractControllerService or the
> concrete class and access available properties directly from the
> encapsulated ConfigurationContext.
>
> [1] https://github.com/apache/nifi/blob/master/nifi-nar-
> bundles/nifi-standard-bundle/nifi-standard-processors/src/
> main/java/org/apache/nifi/processors/standard/GetHTTP.java#L295
>
> Andy LoPresto
> alopre...@apache.org
> *alopresto.apa...@gmail.com <alopresto.apa...@gmail.com>*
> PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69
>
> On Aug 9, 2017, at 6:57 PM, 尹文才 <batman...@gmail.com> wrote:
>
> Thanks Koji, I checked the link you provided and I think getting a
> DataSource is no different than getting the DBCP service(they could just
> get the connection). Actually I was trying to get the configured driver
> class to check the database type.
>
> Regards,
> Ben
>
> 2017-08-10 9:29 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>:
>
> Hi Ben,
>
> I'm not aware of ways to obtain configurations of a controller from a
> processor. Those should be encapsulated inside a controller service.
> If you'd like to create DataSource instance instead of just obtaining
> a connection, this discussion might be helpful:
> https://github.com/apache/nifi/pull/1417
>
> Although I would not recommend, if you really need to obtain all
> configurations, you can do so by calling NiFi REST API from your
> processor.
>
> Thanks,
> Koji
>
> On Thu, Aug 10, 2017 at 10:09 AM, 尹文才 <batman...@gmail.com> wrote:
>
> Hi guys, I have a customized processor with a DBCP controller service as
>
> a
>
> property. I could get the DBCP controller service in my code, but does
> anyone know how to obtain all the configurations of the DBCP controller
> service in java code(e.g. Database Connection URL, Database Driver
> Location, etc) Thanks.
>
> Regards,
> Ben
>
>
>
>


Re: get controller service's configuration

2017-08-09 Thread
Thanks Koji, I checked the link you provided and I think getting a
DataSource is no different than getting the DBCP service(they could just
get the connection). Actually I was trying to get the configured driver
class to check the database type.

Regards,
Ben

2017-08-10 9:29 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>:

> Hi Ben,
>
> I'm not aware of ways to obtain configurations of a controller from a
> processor. Those should be encapsulated inside a controller service.
> If you'd like to create DataSource instance instead of just obtaining
> a connection, this discussion might be helpful:
> https://github.com/apache/nifi/pull/1417
>
> Although I would not recommend, if you really need to obtain all
> configurations, you can do so by calling NiFi REST API from your
> processor.
>
> Thanks,
> Koji
>
> On Thu, Aug 10, 2017 at 10:09 AM, 尹文才 <batman...@gmail.com> wrote:
> > Hi guys, I have a customized processor with a DBCP controller service as
> a
> > property. I could get the DBCP controller service in my code, but does
> > anyone know how to obtain all the configurations of the DBCP controller
> > service in java code(e.g. Database Connection URL, Database Driver
> > Location, etc) Thanks.
> >
> > Regards,
> > Ben
>


get controller service's configuration

2017-08-09 Thread
Hi guys, I have a customized processor with a DBCP controller service as a
property. I could get the DBCP controller service in my code, but does
anyone know how to obtain all the configurations of the DBCP controller
service in java code(e.g. Database Connection URL, Database Driver
Location, etc) Thanks.

Regards,
Ben


Re: how to submit bug found in NIFI code

2017-07-25 Thread
Thanks Janosch, I managed to submit a NIFI processor bug after creating an
account of JIRA.

The bug pertains to a new NIFI processor called PutDatabaseRecord and the
link of the issue is as blow, please correct me if anything is wrong since
this is the first time I submit a bug for NIFI:

https://issues.apache.org/jira/browse/NIFI-4228

Regards,
Ben

2017-07-25 19:29 GMT+08:00 Woschitz, Janosch <
janosch.wosch...@thinkbiganalytics.com>:

> Hi Ben,
>
> A submission via JIRA would be the best way to submit a bug. All Apache
> projects using the same JIRA installation but you need to sign up in order
> to use them.
>
> You can create an account via https://issues.apache.org/
> jira/secure/Signup!default.jspa
>
> If this should not be possible you can still share you bug report on the
> dev list but I would highly recommend to file it via JIRA.
>
> Regards,
> Janosch
>
>
>
> On 25.07.17, 11:30, "尹文才" <batman...@gmail.com> wrote:
>
> >Hi guys, I wonder if any of you knows the correct way to submit bugs found
> >in current NIFI's code, I checked the JIRA page of NIFI and it seems that
> I
> >need to login to be able to submit an issue. Thanks.
> >
> >Regards,
> >Ben
>


how to submit bug found in NIFI code

2017-07-25 Thread
Hi guys, I wonder if any of you knows the correct way to submit bugs found
in current NIFI's code, I checked the JIRA page of NIFI and it seems that I
need to login to be able to submit an issue. Thanks.

Regards,
Ben


Re: Trigger timer-driven processor to run from outside NIFI

2017-07-14 Thread
Thanks very much Koji for your quick response and your example, I will look
into your example. 

Regards,
Ben

2017-07-14 17:27 GMT+08:00 Koji Kawamura <ijokaruma...@apache.org>:

> Hi Ben,
>
> If the processor is running, stop will wait for the thread to complete.
> Please see stop() method here.
> https://github.com/apache/nifi/blob/master/nifi-nar-
> bundles/nifi-framework-bundle/nifi-framework/nifi-framework-
> core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
>
> To stop or start a processor, you just need to update its state. I have an
> example here. Please refer updateProcessorState function.
> https://github.com/ijokarumawak/nifi-api-client-js/blob/master/nifi-api-
> client.js
>
> Trying to reply from mobile at best effort. Excuse me for typos if any..
>
> Thanks,
> Koji
>
>
> On Jul 14, 2017 5:29 PM, "尹文才" <batman...@gmail.com> wrote:
>
> Hi Koji, I tried to play with the NIFI REST API with chrome postman app,
> when you mentioned about restart the processor via REST API, did you mean
> first sending stop command and then send start command via the API "PUT
> /processors/{id}"? By the way, when I send the stop command to the
> processor, will it finish its work
> and then stop the processor or will it simply kill the task directly?
> Thanks.
>
> Regards,
> Ben
>
> 2017-07-14 15:13 GMT+08:00 尹文才 <batman...@gmail.com>:
>
> > Thanks Koji, I checked the NIFI REST API and it seems that I need to use
> > Groovy to do it(I don't understand Groovy), is there any Java related
> > examples which interact with NIFI via REST API? Thanks.
> >
> > Regards,
> > Ben
> >
> > 2017-07-14 13:49 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>:
> >
> >> Hi Ben,
> >>
> >> Just an idea, using ListenHTTP or HandleHTTPRequest (or whatever
> >> listener type processor you can use) in front of your processor might
> >> be helpful. You also need to change your processor to support incoming
> >> FlowFile as well if it doesn't currently. This way, the outside
> >> application can send a simple HTTP request to do your processor its
> >> job.
> >>
> >> Another possible way would be using NiFi REST API, stop the processor
> >> and then restart it. When the processor is restarted, its onTrigger
> >> will be called, and it will wait for next time to be scheduled (next
> >> 5min in your case).
> >>
> >> Thanks,
> >> Koji
> >>
> >> On Wed, Jul 12, 2017 at 5:04 PM, 尹文才 <batman...@gmail.com> wrote:
> >> > Hi guys, is it possible for a Java application outside the NIFI
> >> environment
> >> > to trigger a timer-driven processor to do its work(I mean its
> ontrigger
> >> > method will be called) when the processor is not yet due to be
> >> triggered?
> >> > The reason why I'm asking about this is because I have a Java
> >> applicatiion
> >> > with UI outside NIFI and there're some configuration data that could
> be
> >> > updated into a database, and my processor in NIFI need to get the
> >> updated
> >> > configuration
> >> > data from that database as soon as possible, but my processor is
> >> configured
> >> > to be timer driven of 5 mins. I hope the processor could be triggered
> to
> >> > run after the configuration is updated by the Java application when
> it's
> >> > not yet reached
> >> > the time for it to be triggered. Thanks.
> >> >
> >> > Regards,
> >> > Ben
> >>
> >
> >
>


Re: Trigger timer-driven processor to run from outside NIFI

2017-07-14 Thread
Thanks Koji, I checked the NIFI REST API and it seems that I need to use
Groovy to do it(I don't understand Groovy), is there any Java related
examples which interact with NIFI via REST API? Thanks.

Regards,
Ben

2017-07-14 13:49 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>:

> Hi Ben,
>
> Just an idea, using ListenHTTP or HandleHTTPRequest (or whatever
> listener type processor you can use) in front of your processor might
> be helpful. You also need to change your processor to support incoming
> FlowFile as well if it doesn't currently. This way, the outside
> application can send a simple HTTP request to do your processor its
> job.
>
> Another possible way would be using NiFi REST API, stop the processor
> and then restart it. When the processor is restarted, its onTrigger
> will be called, and it will wait for next time to be scheduled (next
> 5min in your case).
>
> Thanks,
> Koji
>
> On Wed, Jul 12, 2017 at 5:04 PM, 尹文才 <batman...@gmail.com> wrote:
> > Hi guys, is it possible for a Java application outside the NIFI
> environment
> > to trigger a timer-driven processor to do its work(I mean its ontrigger
> > method will be called) when the processor is not yet due to be triggered?
> > The reason why I'm asking about this is because I have a Java
> applicatiion
> > with UI outside NIFI and there're some configuration data that could be
> > updated into a database, and my processor in NIFI need to get the updated
> > configuration
> > data from that database as soon as possible, but my processor is
> configured
> > to be timer driven of 5 mins. I hope the processor could be triggered to
> > run after the configuration is updated by the Java application when it's
> > not yet reached
> > the time for it to be triggered. Thanks.
> >
> > Regards,
> > Ben
>


Trigger timer-driven processor to run from outside NIFI

2017-07-12 Thread
Hi guys, is it possible for a Java application outside the NIFI environment
to trigger a timer-driven processor to do its work(I mean its ontrigger
method will be called) when the processor is not yet due to be triggered?
The reason why I'm asking about this is because I have a Java applicatiion
with UI outside NIFI and there're some configuration data that could be
updated into a database, and my processor in NIFI need to get the updated
configuration
data from that database as soon as possible, but my processor is configured
to be timer driven of 5 mins. I hope the processor could be triggered to
run after the configuration is updated by the Java application when it's
not yet reached
the time for it to be triggered. Thanks.

Regards,
Ben


Re: FlowFile position when transferred to Relationship.SELF

2017-07-10 Thread
Hi Koji, thanks for the explanation, I checked the NIFI documentation you
provided, do you mean I should use the FIFO prioritizer in my case? Because
as you mentioned the FlowFiles would be put back into their original
positions, so as I  understand using FIFO should make the FlowFiles in
consistent order.

Regards,
Ben

2017-07-10 17:06 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>:

> Hi,
>
> I think it puts back a FlowFile to its original position but update
> queued date as implemented here:
> https://github.com/apache/nifi/blob/master/nifi-nar-
> bundles/nifi-framework-bundle/nifi-framework/nifi-framework-
> core/src/main/java/org/apache/nifi/controller/repository/
> StandardProcessSession.java#L1851
>
> In order to pull FlowFiles from a queue in consistent order, you need
> to specify a prioritizer.
> https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#prioritization
>
> I'm just curious about the functionality you added. Wait processor has
> 'Releasable FlowFile Count' and it could be used to make a batch of
> FlowFiles wait and go. Or Notify's 'Signal Counter Delta' could be
> useful, too.
>
> Regards,
> Koji
>
> On Mon, Jul 10, 2017 at 4:43 PM, 尹文才 <batman...@gmail.com> wrote:
> > Hi guys, I have written a customized processor whose functionality is
> > similar to the NIFI's Wait processor, the difference is my processor
> needs
> > to wait a batch of data and when the batch end flag is found, it will
> > transfer the batch of data to destinations.
> >
> > I checked the source code of Wait processor and also transferred the
> > flowfiles to Relationship.SELF which is the incoming queue when the batch
> > of data is not yet complete. The problem I found was sometimes I could
> see
> > the sequence of the FlowFiles transferred from my processor to
> destinations
> > were not in order.
> > I then added sequence attribute(number starting from 1) to all FlowFiles
> > coming into my processor and I could verify that this problem happen from
> > time to time, but I couldn't find the stable way to reproduce it.
> >
> > My question is how does NIFI handle the FlowFile when it's being
> > transferred to Relationship.SELF, does it put back to its original
> position
> > in the incoming queue? Thanks.
> >
> > Regards,
> > Ben
>


FlowFile position when transferred to Relationship.SELF

2017-07-10 Thread
Hi guys, I have written a customized processor whose functionality is
similar to the NIFI's Wait processor, the difference is my processor needs
to wait a batch of data and when the batch end flag is found, it will
transfer the batch of data to destinations.

I checked the source code of Wait processor and also transferred the
flowfiles to Relationship.SELF which is the incoming queue when the batch
of data is not yet complete. The problem I found was sometimes I could see
the sequence of the FlowFiles transferred from my processor to destinations
were not in order.
I then added sequence attribute(number starting from 1) to all FlowFiles
coming into my processor and I could verify that this problem happen from
time to time, but I couldn't find the stable way to reproduce it.

My question is how does NIFI handle the FlowFile when it's being
transferred to Relationship.SELF, does it put back to its original position
in the incoming queue? Thanks.

Regards,
Ben


Re: get ControllerService inside Advanced custom UI

2017-07-02 Thread
Thanks Matt, I was having difficulty getting the serviceIdentifier from
that controller service property defined in my processor, now I got it.

Regards,
Ben

2017-07-02 21:40 GMT+08:00 Matt Gilman <matt.c.gil...@gmail.com>:

> Ben,
>
> Yes, configuring the Processor with a Controller Service is what I meant.
> If you define a Property that identifies a Controller Service (for instance
> here [1]) the value of that property when accessed through
> getComponentDetails will be the serviceIdentifier.
>
> Matt
>
> [1]
> https://github.com/apache/nifi/blob/master/nifi-nar-
> bundles/nifi-standard-bundle/nifi-standard-processors/src/
> main/java/org/apache/nifi/processors/standard/InvokeHTTP.java#L205
>
> On Sun, Jul 2, 2017 at 12:15 AM, 尹文才 <batman...@gmail.com> wrote:
>
> > Thanks Matt, how should I get the service identifier with the method
> > getComponentDetails, do you mean I should add a service identifier
> property
> > in my processor to let user to specify it for the controller service? In
> > that case how could the service identifier configured associate with the
> > configured controller service?
> >
> > Regards,
> > Ben
> >
> > 2017-07-01 21:35 GMT+08:00 Matt Gilman <matt.c.gil...@gmail.com>:
> >
> > > Ben,
> > >
> > > That is the correct method to invoke. The serviceIdentifier will be the
> > > value of the Property that identifies the service in question. The
> > > componentId will be the identifier of Processor. You'll need to invoke
> > >
> > > ComponentDetails getComponentDetails(NiFiWebRequestContext
> > requestContext)
> > >
> > > in order to get the service identifier from the configured properties.
> > >
> > > Thanks
> > >
> > > Matt
> > >
> > > On Sat, Jul 1, 2017 at 4:17 AM, 尹文才 <batman...@gmail.com> wrote:
> > >
> > > > Hi guys, I' m currently creating a custom UI for my processor and I
> > need
> > > to
> > > > get the DBCP Connection ControllerService in the backend code for the
> > UI,
> > > > the ControllerService is a property defined in the processor
> > properties.
> > > I
> > > > saw there's one method in class NiFiWebConfigurationContext as below:
> > > >
> > > > ontrollerService getControllerService(String serviceIdentifier,
> String
> > > > componentId);
> > > >
> > > >
> > > > However this method requires the serviceIdentifier which I don't know
> > > >
> > > > how to obtain. How exactly should I get the ControllerService?
> > > >
> > > > Thanks.
> > > >
> > > >
> > > > Regards,
> > > >
> > > > Ben
> > > >
> > >
> >
>


Re: get ControllerService inside Advanced custom UI

2017-07-01 Thread
Thanks Matt, how should I get the service identifier with the method
getComponentDetails, do you mean I should add a service identifier property
in my processor to let user to specify it for the controller service? In
that case how could the service identifier configured associate with the
configured controller service?

Regards,
Ben

2017-07-01 21:35 GMT+08:00 Matt Gilman <matt.c.gil...@gmail.com>:

> Ben,
>
> That is the correct method to invoke. The serviceIdentifier will be the
> value of the Property that identifies the service in question. The
> componentId will be the identifier of Processor. You'll need to invoke
>
> ComponentDetails getComponentDetails(NiFiWebRequestContext requestContext)
>
> in order to get the service identifier from the configured properties.
>
> Thanks
>
> Matt
>
> On Sat, Jul 1, 2017 at 4:17 AM, 尹文才 <batman...@gmail.com> wrote:
>
> > Hi guys, I' m currently creating a custom UI for my processor and I need
> to
> > get the DBCP Connection ControllerService in the backend code for the UI,
> > the ControllerService is a property defined in the processor properties.
> I
> > saw there's one method in class NiFiWebConfigurationContext as below:
> >
> > ontrollerService getControllerService(String serviceIdentifier, String
> > componentId);
> >
> >
> > However this method requires the serviceIdentifier which I don't know
> >
> > how to obtain. How exactly should I get the ControllerService?
> >
> > Thanks.
> >
> >
> > Regards,
> >
> > Ben
> >
>


get ControllerService inside Advanced custom UI

2017-07-01 Thread
Hi guys, I' m currently creating a custom UI for my processor and I need to
get the DBCP Connection ControllerService in the backend code for the UI,
the ControllerService is a property defined in the processor properties. I
saw there's one method in class NiFiWebConfigurationContext as below:

ontrollerService getControllerService(String serviceIdentifier, String
componentId);


However this method requires the serviceIdentifier which I don't know

how to obtain. How exactly should I get the ControllerService?

Thanks.


Regards,

Ben