Andy, The ScanAttribute processor allows you to match 1 or more attributes against a dictionary.
Consuming data that is still being written is always a tough problem to tackle. We've seen people take many different approaches to this. One approach is to have the producer of the data use a "dot naming" convention, where they write to a file named .myFile.csv and then rename it to to myFile.csv when done. This is often the easiest approach if you control the producers as well. A more S3-centric approach is to configure the S3 bucket so that when data is finished being written to the bucket, S3 can send a notification to SQS. Then you can use GetSQS to get this notification and then use EvaluateXPath for instance to extract the information needed and then use FetchS3. Thanks -Mark On Aug 16, 2017, at 10:13 AM, Andy Loughran <[email protected]<mailto:[email protected]>> wrote: Hi Mark, Yeah, I think that's what I have now. The issue being that I could end up with a duplicate of a file. I guess I could use the DetectDuplicate processor to make sure that I de-dupe the Flowfiles before I increment the counter. The issue here is that I want the latest available FlowFile to replace one if it exists (users could update a file's contents before a batch is complete). Given there are 5 'types', is there a processor that allows me to match a 'type' attribute against a dictionary? On Wed, 16 Aug 2017 at 15:07 Mark Payne <[email protected]<mailto:[email protected]>> wrote: Hi Andy and welcome to the community! I think that what you're doing here seems very reasonable. If you want to wait for 5 'like flowfiles' instead of just 5 flowfiles, you should be able to use the "Signal Counter Name" of the Wait processor. For example, if your UpdateAttribute processor creates a 'type' and a 'batch' attribute, you can set the Wait processor's Signal Counter Name to "${type}" or to "${type}${batch}", depending on how you want to group them together. This will wait until you reach 5 flowfiles with the same "type" attribute (or combination of "type" and "batch" attributes), according to what you set as the Signal Counter Name. Does this make sense? Thanks -Mark > On Aug 16, 2017, at 9:55 AM, Andy Loughran > <[email protected]<mailto:[email protected]>> wrote: > > Hey everyone, > > This is my first post. > > I'm building out a pipeline with Nifi, but am stuck on an architectural > decision around some fairly basic design. I think I'm stuck as I'm operating > on the wrong paradigm, but the application receiving my flow is the > limitation in this context. > > I'm using ListS3 to poll for csv files. There need to be 5 different types > of file uploaded with a unique batch identifier for them to be released. I'm > using UpdateAttribute to rip the type and batch from the filename, then using > wait to hold the batch. > > At the moment though, I'm holding until a batch has 5 files, rather than 5 > files with each attribute type matching the expected types. > > Is this the wrong way to be thinking about this problem, or does this sound > like a good use case for Nifi - but using a better combination of processors. > If anyone could give me guidance or point me toward an example template for > batch process I'd be grateful. > > Look forward to helping out in the community where I can. > > Thanks, > > Andy
