I suspect that backpressure resets when nifi is restarted. If true, that
would nix using it to keep a Processor shutdown.


Looking at NiFi’s code, it looks like it has some elements that could be
leveraged to support what I’m trying to achieve without having to resort
to create clones of import/export Processors to get there. I suspect
that the clone's processing model would loose the fight with nifi's
processing model.

From what I can tell, the FlowFile and Provenance Repositories have much
of the data to be able to achieve the behavior I’m looking for. The
challenge is to get nifi to use it vs. tossing it out on a
session.rollback() or when nifi is restarted.

I was thinking something like session.preCommit() that would be called
before a "no going back" action in a Processor.
The command would cause the preservation of the FlowFile at that point
in time. Sort of like .checkpoint(), but don't throw away the FlowFile
because it might be needed.
.commit() would release the preservation; with .rollback() (or NiFi
dying and restarted) making sure the FlowFiles weren't thrown away.
Then, if there's a problem, the FlowFile queue would show the original
FlowFile and any .preCommit FlowFiles (visually as children).

Through the UI, the User would decide on the course of action for the
FlowFiles; leveraging the proposed Interactive Queue Management feature.
The Processor would need a configuration of Blocking or NonBlocking
behavior when an input FlowFile was in this state (and next up to be
processed?). The idea is to be able to stop the Processor from
processing any other FlowFiles until the queue had been cleared of any
FlowFiles with children .preCommit FlowFiles.
Regardless of Blocking/NonBlocking, the FlowFile would need to remain in
the Processor's input queue until it's been manually dealt with.

The .preCommit would take an text argument so a log of what step was
going to happen (ex. 'delete file from remote system') could be saved
off. This text would be shown with the FlowFile so the User could make
an informed decision for it's disposition.
In the case of successive .preCommit calls, each one's text message
would have to be retained in order for the User to make an informed
manual resolution decision.


Using GetFTP for example...

@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
  ...
  ...
  ...
  preMsg = "delete file " + fullpathFilename + " from " + hostname
  session.preCommit(BEFORE, preMsg)
  [FTP Delete]
  if success {
     session.preCommit(AFTER, preMsg)
  } else {
    session.preCommit(FAILURE, preMsg, <failure reason>)
    logger.error("Failed: {}; rolling back session", new
Object[]{preMsg, e.getMessage()}, e);
    session.rollback();
  }
}

Where I picked BEFORE, AFTER and FAILURE constants to make sure the
message to the User in the UI would be consistent.


From our communication, I guess it’s fair to say that what I’m trying to
achieve is outside of NiFi's objectives.


On 01/02/2017 07:55 PM, Joe Witt wrote:
> As far as the scenario you mentioned with FTP let's identify the
> sequence in more detail because i wanted to be more general on the
> phase about changing the remote system to avoid pulling the data
> again.
>
> Phases of NiFi interacting with an SFTP server (lets talk about GetSFTP 
> still):
>
> (P)ULL: Pull data of a file on remote SFTP server into nifi's
> repository (not yet committed)
> (C)OMMIT: Tell nifi to commit its session.
> (S)AVESTATE: Either keep track of state about the remote system in
> nifi OR change the remote file via delete or rename.
>
> The default behavior we've built for is (P)(C)(S) which is at least
> once and is susceptible to duplicates in the presence of ill-timed
> failures.  The duplication could happen if after (C) and before (S)
> happens or before (S) fully completes there is some failure either in
> NiFi or in communicating with the remote system.
>
> We could offer alternatively (P)(S)(C) which is at most once and is
> susceptible to data loss in the presence of ill-timed failures.  The
> loss could happen if after (S) and before or during (C) some failure
> occurs in NiFi.  Failures before will simply cause a rollback and
> retry and a no-harm done so to speak.
>
> Ok, so I think we're on the same page about the failure cases/logic.
> Your recent email had an important clarifying note about being single
> threaded and indeed wanting to effectively stop the pipeline in the
> presence of some failing data.  You might find it better to create
> very specific/precise processors against your desired behavior.
> Though, you may be able to use the out of the box processors and the
> backpressure mechanism of NiFi (set backpressure at 1 for example).
> It sounds like the source process has a very specific way of writing
> data into the SFTP accessible location and with some sort of a naming
> or timestamping scheme to help you ensure ordering and it sounds like
> the receiving system (whatever NiFi is delivering to) has a very
> specific set of preconditions to honor.  If the out of the box
> processors and prescriptive settings dont' get you the results you
> seek you might find that using a scripted processor to behave
> precisely as you need is the best or writing a custom processor.
>
> Thanks
> Joe
>
> On Mon, Jan 2, 2017 at 6:19 PM,  <[email protected]> wrote:
>> Yes, I had read [1]. I'm reasonably satisfied that nifi is keeping track
>> of what's happening to the data when its "within it's walls". That, and
>> it's inherent design to handle files vs. little XML messages, was what
>> piqued my interest.
>> My concerns are related 'coming into nifi' and 'being sent out of nifi'.
>>
>> In the FTP example, if you do step 1 and hold onto the FlowFile (now
>> safely within nifi); then step 2 (let's pick DELETE) -- but the FTP
>> connection fails between the DELETE and the remote FTP side returning
>> SUCCESS, so there's no way for nifi to know if the delete succeeded or
>> failed.  I politely left out the possibility of nifi dieing unexpectedly
>> in that same timeframe.
>>
>> For my needs I don't want the FlowFile to be tossed away; I need nifi to
>> hold onto it (and let me at it) until I can resolve the problem manually
>> -- because I still need the data. And, because of the sequential
>> processing requirements for the data in question I can't allow any more
>> files to be FTP'd in from the source directory until the problem has
>> been resolved.
>> I would not be running multiple threads for these types of transfers.
>>
>> PutSQL has similar issues -- just going the other way. The data has
>> updated the database but nifi didn't get the SUCCESS reply -- so let's
>> send the data again!
>>
>> [1] https://nifi.apache.org/docs/nifi-docs/html/nifi-in-depth.html
>>
>>
>> On 01/02/2017 12:52 PM, Joe Witt wrote:
>>> Hello
>>>
>>> NiFi's data durability model can be well understood by reading here
>>> [1].  This addresses how data is handled once it is under NiFi's
>>> control.
>>>
>>> So then there is the matter of considering the safety of data as it is
>>> coming 'into nifi' and 'being sent out of nifi'.
>>>
>>> When exchanging data between processes you have generally the
>>> following things to consider and these are well discussed on-line.
>>> 1) At least once
>>> 2) At most once
>>> 3) Exactly once.
>>>
>>> So when bring data 'into nifi' and 'out of nifi' we do generally
>>> provide by default for 'at least once'.  It is the case that
>>> duplicates can be received and duplicates can be sent then.  But it is
>>> also the case that generally speaking you will not lose data
>>> (obviously nifi cannot control what external processes do to the
>>> data).
>>>
>>> Achieving exactly once behavior requires the two processes to have
>>> some agreed upon protocol which offers this behavior.  It could be
>>> something formal like 'two phase commit' or it could be something
>>> informal where you as an administrator have fixed all the variables to
>>> the best you can to ensure there will be no issues.  So lets look at
>>> your example.  You want to pull files via FTP.
>>>
>>> There are two critical steps to this:
>>> 1) Copy bytes from remote system to nifi using FTP
>>> 2) Change the state of the remote system so we don't keep pulling the
>>> file (either removal or rename).
>>>
>>> If you do step 1 then step 2 you are basically implementing at least
>>> once behavior.  You wont lose data using such a protocol but you could
>>> have duplicates.
>>>
>>> If you do step 2 then step 1 you are implementing at least once (not
>>> exactly once).  It is at least once because what if there was some
>>> error in nifi when committing the state of things.
>>>
>>> The other thing you mentioned was sequence of processing.  NiFi will
>>> by default process data in the order it received and as it goes
>>> between processes it is placed in a prioritized queue.  Obviously if
>>> you're running multiple threads and such then ordering can vary.
>>>
>>> There is a lot here to discuss and cases to consider so happy to keep
>>> the discussion going.  Hopefully this helps you see the difference
>>> between 'nifi the framework and its durability model' versus 'nifi
>>> processors and patterns of using them together'.
>>>
>>> [1] https://nifi.apache.org/docs/nifi-docs/html/nifi-in-depth.html
>>>
>>> Thanks
>>> Joe
>>>
>>> On Mon, Jan 2, 2017 at 3:19 PM,  <[email protected]> wrote:
>>>> I am reviewing NiFi as a possible replacement for an internally developed
>>>> data transfer tool.
>>>>
>>>> NiFi appears to be of the "the data is guaranteed to be delivered at least
>>>> once" variety. Where my needs are "the data is guaranteed to be delivered
>>>> once"; to the point that I'm willing to manually review and resolve 
>>>> failures
>>>> that occur "beyond the point of no return" to ensure 1x delivery and no 
>>>> data
>>>> loss.
>>>> Some of my data transfers are of the sequential transactional type, where
>>>> they must be transferred and processed, in sequence.
>>>>
>>>> Take for instance GetFTP, I see holes in the commit model (from the
>>>> perspective of what I'm trying to accomplish).
>>>> Looking at GetFTP (via FetchFileTransfer.java), session.commit() occurs
>>>> before deleting or renaming the source file. So, if that step fails the 
>>>> file
>>>> will be retrieved and processed again and again.
>>>> PutSQL appears to have similar issues as it relates to updating a database
>>>> more than once should the transfer die before the db commit is recognized 
>>>> by
>>>> NiFi, so the the FlowFile(s) get rolled back.
>>>>
>>>> Are my needs outside of Nifi's objectives?
>>>>
>>


Reply via email to