Hello,

I'm trying something very simple but I cannot get to make it work properly
(Nifi 1.1.1)

You will see the flow here: https://goo.gl/photos/9ewQV6ovKDDZe2pw9

My goal is to pull a json file out of HDFS, get some information and store
it back as a CSV something really trivial.

My flow is structured like that:

1) GetHDFS (here I check every 2 minutes and only take 1 file
2) EvaluateJsonPath: The JSON is looking like that {version: xxxx,
attributesX[{.....}], I need to add to each line of my CSV the version #.
So I use this processor to store the version in an attribute
3) SplitJson: I split the JSON on the attributesX field ($.attributesY),
this provide me each line as expected
4) EvaluateJsonPath: I'm extracting the different information and I store
them as attribute
5) UpdateAttribute: I use that one to remove new line characters in one
attribute
6) ReplaceText: I transform my attributes in CSV format
7) MergeContent: This is where I have a problem: now I want to regroup all
my lines to recreate the original file but in CSV. I have tried different
strategies but I'm unable to get a consistent result. I use the ${filename}
as the correlation attribute but what happen is that most of the time I
have only 25% of the lines when it finalizes the bin and send it to the
next processor (I'm using the last updateattribute to change the {filename}
to .csv), then I get the rest of the line but with the same filename which
will either fail or overwrite the first file.

Can you tell me what I'm doing wrong in the flow, how can I ensure that all
the line are merged before the merge process finish? Is there something as
minimal bin age that will let me make sure that I have waited enough to get
all the lines?

Thank you


AG

Reply via email to