Mark,
Thanks for the comments regarding the bin packing settings.
To confirm, I get data from multiple sources and merge them together.
Basically, I start with a list of key values that I then need, enhance at a 1-1
level to a different data set, then go to another data source to get 1-N
records. Then I extract data from the JSON flow files to retrieve data and make
decisions on that data. I run the dataset through UpdateAttributes to do some
math and formatting, and then AttributesToCSV to get it ready to be merged.
This whole amount of work takes about 20-25 minutes. If I run MergeContent
manually at this point when all flow files are there, it works like a charm and
1 file is created.
After I read your section on bin packing (and reading the Show Usage portion of
MergeContent again), and understanding it better, I tried to make it work with
large values I'd never hit, to attempt to get the bin age portion to trigger,
but that didn't work right either. I think it has to do with when the bins are
created based on the 'Show Usage' information. Even though all FlowFIles were
at MergeContent at 20 minutes, and the Bin Age set for 25 minutes (and I
purposefully started the MergeContent AFTER the initial processor in the flow
to initiate the bin age) I still got 2 flow files with the first one being
small, and stating TIMEOUT as the merge reason.
I don't think I can use Defragment, since based on the documentation I need to
know the amount of files, but since I have to get a 1-N amount of records,
there doesn't seem to be a way to have each flowfile with a different
fragment.index that matches the 1*N amount of records generated.
I looked at MergeRecord, and it appears to have the same type of conditional
values as MergeContent, so I figure that won't work either. Nothing else comes
up for Merge or Join unless I'm missing something.
John
On Tuesday, February 21, 2023, 03:36:32 PM EST, Mark Payne
<[email protected]> wrote:
John,
You should not be using CRON driven for any processors in the middle of a flow.
In fact, we really should probably just disable that all together.
In fact, it’s exceedingly rare that you’d want anything other than Timer-Driven
with a Run Schedule of 0 sec.
MergeContent will not create any merged output on its first iteration after
it’s scheduled to run. It requires at least a second iteration before anything
is transferred. Its algorithm has evolved over time, and it may well have
happened to work previously but it’s really not being configured as intended.
When you say that you’re retrieving data from a few sources and then “merges
that all back into a single file” - does that mean that you started with one
FlowFile, split it up, and then want to re-assemble the data after performing
enrichment? If so you’ll want to use a Merge Strategy of Defragment.
If you’re trying to just bring in some data and merge it together by
correlation attribute, then Bin Packing makes sense. Here, you have a few
properties that you can use to try to get the best bin packing. In short, a bin
will be created when any of these conditions is met:
- The Minimum Group Size is reached AND the Minimum Number of Entries is met
- The Maximum Group Size OR the Maximum Number of Entries is met
- A bin has been sitting for “Max Bin Age” amount of time
- If a correlation attribute is used, and a FlowFile comes in that can’t go
into any bin, it will evict the oldest.
If you’re seeing bins smaller than expected, you can look at the Data
Provenance for the merged FlowFile, and it will tell you exactly which of the
conditions above triggered the data to be merged. This may help to adjust these
settings.
Hope this is helpful.
Thanks
-Mark
> On Feb 17, 2023, at 1:39 PM, John McGinn via users <[email protected]>
> wrote:
>
> Hello,
>
> NiFi 1.19.0 - I need some help in trying to make my idea work, or figure out
> the better way to do this.
>
> I've got a flow that retrieves data from a few data sources, enhances
> individual flow files, converts attributes to CSV and then merges that all
> back into a single file. It takes roughly 20 minutes for the process to run
> from start to the MergeContent part, so when I do it manually, I stop the
> MergeContent processor until all flowfiles are in the queue waiting, and then
> I start the MergeContent processor. (Run One Time doesn't work for some
> reason.) That works fine, manually.
>
> When I try to put cron scheduling in, it never kicks off. For instance, the
> initial processor in the flow has a cron schedule of the top of the hour. (0
> 0 * * * ?) I then put 25 past the hour for Merge Content (0 25 * * * ?). When
> I start the flow, the flowfiles are generated and queue up in front of
> MergeContent by 25 minutes past the hour, but the MergeContent never kicks
> off.
>
> I added a correlation attribute recently and removed the cron entry, but the
> MergeContent just creates small bunches of merged files.
>
> I even attempted to put a cron on the AttributesToCSV with a maximum bin age
> on the Merge Content, since it takes less than a minute for the
> AttribuesToCSV to process the flowfiles at that point, but the cron didn't
> kick off there either.
>
> Any ideas on how to get this to work?
>
> Thanks,
> John