Hi Arnaud,

If you are running from master (unreleased 1.2.0-SNAPSHOT) you can use an 
awesome feature Koji added recently called “Wait/Notify” [1]. He explains it 
much better with graphics and such, but the long and short of it is that the 
Split* processors output the total number of split records from an input, and 
then you can use counters to ensure you reconstitute the whole thing when 
merging.

If you are not on master, I’d say you might have to do a little more work. 
There is a “minimum number of entries” property in MergeContent, but I’m not 
sure off the top of my head how you can get that value from the Split and 
update it dynamically, as it could obviously be different for each input. 
Hopefully Koji or someone else on the list will have a better solution.

[1] https://github.com/apache/nifi/pull/1420 
<https://github.com/apache/nifi/pull/1420>

Andy LoPresto
[email protected]
[email protected]
PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69

> On Feb 17, 2017, at 1:12 AM, Arnaud G <[email protected]> wrote:
> 
> Hello,
> 
> I'm trying something very simple but I cannot get to make it work properly 
> (Nifi 1.1.1)
> 
> You will see the flow here: https://goo.gl/photos/9ewQV6ovKDDZe2pw9 
> <https://goo.gl/photos/9ewQV6ovKDDZe2pw9>
> 
> My goal is to pull a json file out of HDFS, get some information and store it 
> back as a CSV something really trivial.
> 
> My flow is structured like that:
> 
> 1) GetHDFS (here I check every 2 minutes and only take 1 file
> 2) EvaluateJsonPath: The JSON is looking like that {version: xxxx, 
> attributesX[{.....}], I need to add to each line of my CSV the version #. So 
> I use this processor to store the version in an attribute
> 3) SplitJson: I split the JSON on the attributesX field ($.attributesY), this 
> provide me each line as expected
> 4) EvaluateJsonPath: I'm extracting the different information and I store 
> them as attribute
> 5) UpdateAttribute: I use that one to remove new line characters in one 
> attribute
> 6) ReplaceText: I transform my attributes in CSV format
> 7) MergeContent: This is where I have a problem: now I want to regroup all my 
> lines to recreate the original file but in CSV. I have tried different 
> strategies but I'm unable to get a consistent result. I use the ${filename} 
> as the correlation attribute but what happen is that most of the time I have 
> only 25% of the lines when it finalizes the bin and send it to the next 
> processor (I'm using the last updateattribute to change the {filename} to 
> .csv), then I get the rest of the line but with the same filename which will 
> either fail or overwrite the first file.
> 
> Can you tell me what I'm doing wrong in the flow, how can I ensure that all 
> the line are merged before the merge process finish? Is there something as 
> minimal bin age that will let me make sure that I have waited enough to get 
> all the lines?
> 
> Thank you
> 
> 
> AG
> 

Attachment: signature.asc
Description: Message signed with OpenPGP using GPGMail

Reply via email to