Lee, Mark, Joe, I have to say I am overwhelmed with the enthusiastic,
quick, and helpful responses from you all. Seems there is clearly very
active interest in the project. You've given me some good ideas to
consider.
So, pondering all that you've suggested, here's where I am now - a
three-stage flow.
1. ExecuteProcess - Runs streamer.sh below which streams out the
contents of a specific file type ${StatType} from within the many
(160) zip files that exist across GW1...GW13 directories that also
have a specific yyyymmdd date in the filename. I also just strip off
the headers using grep here (any line that contains the work "Device"
is a header). This script will give me a long running (5 minutes or
so) of streaming data around 10 GBs total. I set Batch Duration = 5
secs
====== streamer.sh =======
datefilter=$1
stattype=$2
for f in $(ls
/import/nms/prod/stats/Terminal/GW{1..13}/ConsolidatedTermStats_${datefilter}*)
do
unzip -p $f *${stattype}* | grep -v Device
done
=======================
2. MergeContent - Mark suggested this and I like this idea to control
the size of my HDFS files. I use all defaults except Maximum Group
Size = 512 MB
3. PutHDFS - Using the built in compression codec here to shrink the
512 MB files
I like this approach because I don't want to copy 10 GBs to a local
store and then unzip all the files and copy them again to HDFS. I
really like to maintain a streaming architecture as much as possible
and so this is what I think I achieve here. However, I still have some
confusion.
1. I don't fully understand what is happening in the ExecuteProcess.
There, I set Batch Duration = 5 secs. I was thinking this would read
in 5 seconds of stream, then generate a flow file that would be sent
to the next state (MergeContent) where it would be merged and
eventually output from that state as another merged flow file to the
putHDFS. Or is it like this... Say there are 5 batched flowfiles that
get merged and reach the 512 MB threshold. Internally, are references
to those 5 just passed along to the next stage for processing or is a
6th flow file produced that is the concatenation of the 5 and that is
passed to the next stage and the 5 inputs released because we are done
with them?
Mainly, my confusion is with the interplay of this batching and the
Run Schedule and Run Duration.
- My understanding it that, if both Run Schedule and Duration are
zero, and I set this processor up as a cron schedule, then, when it is
time to run, it will launch my streaming script and run until my
script terminates. Is that correct?
- And during that running time, it will generate a new flow file every
5 seconds due to the batching configuration. Correct?
- So, under the hood, what really happens with all the flow files that
are batched and merged once the merged file is output? Does that disk
space held by the batched flow files get released in the content
repository? Because, it seemed like disk usage kept going up in the
content repository while this flow was running. I was anticipating
that the batching would sort of limit the growth on my local disk
since I was streaming my puts to HDFS in batches.
Sorry for so many questions but documention is very sparse on this
project (i guess being so new to OSS) and I really want to understand
it thoroughly. I did clone the project and have been scanning the code
as well.
On Sun, Oct 25, 2015 at 11:45 AM, Joe Witt <[email protected]> wrote:
> Thanks for jumping in Lee!
>
> Mark,
>
> This is a great writeup. We should turn this into a blog w/full
> explanation and template. Great use case and you just gave us a
> perfect user perspective/explanation of how you're thinking of it.
>
> We will make that happen quickly.
>
> https://issues.apache.org/jira/browse/NIFI-1064
>
> Thanks
> Joe
>
> On Sun, Oct 25, 2015 at 9:45 AM, Mark Payne <[email protected]> wrote:
>> Hey Mark,
>>
>> Thanks for sharing your use case with us in pretty good details so that we
>> can understand
>> what you're trying to do here.
>>
>> There are actually a few processors coming in the next release that I think
>> should help here.
>> First, there's the FetchFile processor that you noticed in NIFI-631.
>> Hopefully the ListFile will
>> make its way in there as well because it's much easier that way :) In either
>> case, you can right-click
>> on the Processor and click Configure. If you go to the Scheduling tab, you
>> can change the Scheduling
>> Strategy to CRON-Driven and set the schedule to run whenever you'd like.
>>
>> As-is, the GetFile is expected
>> to remove the file from the current location, as the idea was that NiFi
>> would sort of assume
>> ownership of the file. It turns out that in the Open Source world, that's
>> often not desirable, so
>> we are moving more toward the List/Fetch pattern as described in that ticket.
>>
>> Once you pull the files into NiFI, though, UnpackContent should unzip the
>> files, each into its
>> own FlowFile. You could then use a RouteOnAttribute to pull out just the
>> file that you care about,
>> based on its filename. You can then allow the others to be routed to
>> Unmatched and auto-terminate
>> them from the flow.
>>
>> Stripping off the first line could probably be done using the ReplaceText,
>> but in the next version
>> of NiFi, we will have a RouteText processor that should make working with
>> CSV's far easier. You could,
>> for instance, route any line that begins with # to one relationship and the
>> rest to a second relationship.
>> This effectively allows you to filter out the header line.
>>
>> Finally, you can use PutHDFS and set the Compression Codec to whatever you
>> prefer. GZIP, Snappy, etc.
>> Prior to that, if you need to, you could also add in a MergeContent
>> processor in order to concatenate
>> together these CSV files in order to make them larger.
>>
>> Thanks
>> -Mark
>>
>>
>>> On Oct 25, 2015, at 12:25 AM, Mark Petronic <[email protected]> wrote:
>>>
>>> Reading some other posts, stumbled on this JIRA [1] which seems to
>>> directly relate to my question in this post.
>>>
>>> [1] https://issues.apache.org/jira/browse/NIFI-631
>>>
>>> On Sat, Oct 24, 2015 at 11:44 PM, Mark Petronic <[email protected]>
>>> wrote:
>>>> So, I stumbled onto Nifi at a Laurel, MD Spark meetup and was pretty
>>>> excited about using it. I'm running HDP and need to construct an ETL
>>>> like flow and would like to try to start, as a new user to Nifi, using
>>>> a "best practice" approach. Wondering if some of you more seasoned
>>>> users might provide some thoughts on my problem?
>>>>
>>>> 1. 160 zip files/day show up on an NFS share in various sub
>>>> directories and their filenames contain the yyyymmddHHMMSS of when the
>>>> stats where generated.
>>>> 2. Each zip file contains 4 or more large CSV files
>>>> 3. I need just one of those CSVs from each zip file each day and they
>>>> all add up to about 10GB uncompressed
>>>> 4. I need to extract that one file from each zip, strip off the first
>>>> line (the headers), and store it in HDFS compressed again using gzip
>>>> or snappy
>>>> 5. I cannot delete the NFS file after the copy to HDFS because others
>>>> need access to it for some time
>>>>
>>>> So, where I am having a hard time visualizing doing this in Nifi is
>>>> with the first step. I need to scan the NFS files after 8 AM every day
>>>> (when I know all files for the previous 24 hours will be present),
>>>> find that set of files for that day using the yyymmdd part of file
>>>> names, then perform the extract of the one file I need and process it
>>>> into HDFS.
>>>>
>>>> I could imagine a processor that runs once every 24 hours on a cron
>>>> schedule. I could imaging running an ExecuteProcess processor against
>>>> a bash script to get the list of all the files that match the
>>>> yyyymmdd. Then I get stuck. How to take this list of 160 file paths
>>>> and start the job of processing each one of them in parallel to run
>>>> the ETL flow?
>>>>
>>>> Thanks in advance for any ideas
>>