Andy,

The ScanAttribute processor allows you to match 1 or more attributes against a 
dictionary.

Consuming data that is still being written is always a tough problem to tackle. 
We've seen people
take many different approaches to this. One approach is to have the producer of 
the data use a
"dot naming" convention, where they write to a file named .myFile.csv and then 
rename it to
to myFile.csv when done. This is often the easiest approach if you control the 
producers as well.

A more S3-centric approach is to configure the S3 bucket so that when data is 
finished being
written to the bucket, S3 can send a notification to SQS. Then you can use 
GetSQS to get this
notification and then use EvaluateXPath for instance to extract the information 
needed and then
use FetchS3.

Thanks
-Mark


On Aug 16, 2017, at 10:13 AM, Andy Loughran 
<[email protected]<mailto:[email protected]>> wrote:

Hi Mark,

Yeah, I think that's what I have now.  The issue being that I could end up with 
a duplicate of a file.

I guess I could use the DetectDuplicate processor to make sure that I de-dupe 
the Flowfiles before I increment the counter.  The issue here is that I want 
the latest available FlowFile to replace one if it exists (users could update a 
file's contents before a batch is complete).

Given there are 5 'types', is there a processor that allows me to match a 
'type' attribute against a dictionary?

On Wed, 16 Aug 2017 at 15:07 Mark Payne 
<[email protected]<mailto:[email protected]>> wrote:
Hi Andy and welcome to the community!

I think that what you're doing here seems very reasonable. If you want to wait 
for 5 'like flowfiles' instead of
just 5 flowfiles, you should be able to use the "Signal Counter Name" of the 
Wait processor. For example,
if your UpdateAttribute processor creates a 'type' and a 'batch' attribute, you 
can set the Wait processor's
Signal Counter Name to "${type}" or to "${type}${batch}", depending on how you 
want to group them together.
This will wait until you reach 5 flowfiles with the same "type" attribute (or 
combination of "type" and "batch" attributes),
according to what you set as the Signal Counter Name.

Does this make sense?

Thanks
-Mark

> On Aug 16, 2017, at 9:55 AM, Andy Loughran 
> <[email protected]<mailto:[email protected]>> wrote:
>
> Hey everyone,
>
> This is my first post.
>
> I'm building out a pipeline with Nifi, but am stuck on an architectural 
> decision around some fairly basic design.  I think I'm stuck as I'm operating 
> on the wrong paradigm, but the application receiving my flow is the 
> limitation in this context.
>
> I'm using ListS3 to poll for csv files.  There need to be 5 different types 
> of file uploaded with a unique batch identifier for them to be released.  I'm 
> using UpdateAttribute to rip the type and batch from the filename, then using 
> wait to hold the batch.
>
> At the moment though, I'm holding until a batch has 5 files, rather than 5 
> files with each attribute type matching the expected types.
>
> Is this the wrong way to be thinking about this problem, or does this sound 
> like a good use case for Nifi - but using a better combination of processors. 
>  If anyone could give me guidance or point me toward an example template for 
> batch process I'd be grateful.
>
> Look forward to helping out in the community where I can.
>
> Thanks,
>
> Andy


Reply via email to