Brian, Take a look at these two JIRAs [1] [2], specifically the first, which should be released in 1.2.0. The second is mostly an FYI so that you are aware of some of the other aggregation capabilities in NiFi. It has not yet been merged to master.
There is a way to implement a flow that would handle this scneario before 1.2.0 is released. You could stash the file in a temp/staging directory, while creating an attribute with the original filename with UpdateAttribute, perform your splits on the original CSV file, transform each split to AVRO, post the AVRO FFs to your web service, and then route those AVRO FFs to MergeContent. When that merge completes, you should still be able to access the original filename attribute (since it's a common attribute on all the split FFs and will be retained if you set MergeContent to keep common attributes), and you could use FetchFile to retrieve the file from the temp/staging dir and delete it (which can be handled by FetchFile itself) or do further processing on it. [1] https://issues.apache.org/jira/browse/NIFI-190 <https://issues.apache.org/jira/browse/NIFI-2735> [2] https://issues.apache.org/jira/browse/NIFI-2735 On Fri, Dec 23, 2016 at 3:33 AM BD International < [email protected]> wrote: > Jeff, > > Thanks for that just tried it out and it works perfectly! > > On a similar topic I have a flow which picks up a CSV and turns each row > into an AVRO object and posts that to a web service I've setup. I would > like to do something similar where I dont delete the original CSV file > until i have successfully posted all the avro objects. > > I would prefer to handle this within nifi but cant seem to see work out > how without writing custom code. > > Thanks > > Brian > > On 22 Dec 2016 19:04, "Jeff" <[email protected]> wrote: > > Brian, > > You can use MergeContent in Defragment mode. Just be sure to set the > number of bins used by MergeContent equal to or greater than the number of > concurrent merges you expect to have going on in your flow, and to route > successfully processed and failed flowfiles (after they've been gracefully > handled, however it suits your use case) to the MergeContent processor. If > a fragment (one of the child flowfiles) is not sent to MergeContent, it > will never be able to complete the defragmentation since MergeContent would > not have received all the fragments. > > UnpackContent keeps track of the "batch" of files that are unpacked from > the original archive by assigning to each child flowfile a set of fragment > attributes that provide an ID to correlate merging (defragmenting in this > case), the total number of fragments, and the fragment index. > > After the merge is complete, you'll have a recreation of the original zip > file, and it signifies that all the child flowfiles have completed > processing. > > - Jeff > > On Thu, Dec 22, 2016 at 12:29 PM BD International < > [email protected]> wrote: > > Hello, > > I've got a data flow which picks up a zip file and uses UnpackContent to > extract the contents. The subsequent files are them converted to json and > stored in a database. > > I would like to store the original zip file and only delete the file once > all the extracted files have been stored correctly, has anyone else come > across a way to do this? > > Thanks in advance, > > Brian > >
