Hello, I'm trying something very simple but I cannot get to make it work properly (Nifi 1.1.1)
You will see the flow here: https://goo.gl/photos/9ewQV6ovKDDZe2pw9 My goal is to pull a json file out of HDFS, get some information and store it back as a CSV something really trivial. My flow is structured like that: 1) GetHDFS (here I check every 2 minutes and only take 1 file 2) EvaluateJsonPath: The JSON is looking like that {version: xxxx, attributesX[{.....}], I need to add to each line of my CSV the version #. So I use this processor to store the version in an attribute 3) SplitJson: I split the JSON on the attributesX field ($.attributesY), this provide me each line as expected 4) EvaluateJsonPath: I'm extracting the different information and I store them as attribute 5) UpdateAttribute: I use that one to remove new line characters in one attribute 6) ReplaceText: I transform my attributes in CSV format 7) MergeContent: This is where I have a problem: now I want to regroup all my lines to recreate the original file but in CSV. I have tried different strategies but I'm unable to get a consistent result. I use the ${filename} as the correlation attribute but what happen is that most of the time I have only 25% of the lines when it finalizes the bin and send it to the next processor (I'm using the last updateattribute to change the {filename} to .csv), then I get the rest of the line but with the same filename which will either fail or overwrite the first file. Can you tell me what I'm doing wrong in the flow, how can I ensure that all the line are merged before the merge process finish? Is there something as minimal bin age that will let me make sure that I have waited enough to get all the lines? Thank you AG
