Quick observation for now off latest data: - GC looks pretty good though it is surprising there were any full GCs during that short test - cpu has low utilization - disk has low utilization
Can you share your sample input data, processor code, flow as a template? Attaching to a JIRA for example could be a good way. We can use this as a good example of how someone can troubleshoot/optimize. Thanks Joe On Thu, Jan 14, 2016 at 1:00 AM, obaidul karim <obaidc...@gmail.com> wrote: > Joe, > > Last time it was below: > java.arg.2=-Xms512m > java.arg.3=-Xmx512m > > Now I made as below: > java.arg.2=-Xms5120m > java.arg.3=-Xmx10240m > > latest jstate & iostate output are attached. > To me it is still slow, no significant improvements. > > -Obaid > > On Thu, Jan 14, 2016 at 12:41 PM, Joe Witt <joe.w...@gmail.com> wrote: >> >> Obaid, >> >> Great so this is helpful info. Iostat output shows both CPU and disk >> are generally bored and ready for more work. Looking at the gc output >> though suggests trouble. We see there are 32 samples at 1 second >> spread each and in that time spent more than 6 seconds of it doing >> garbage collection including 5 full collections. That is usually a >> sign of inefficient heap usage and/or simply an undersized heap. What >> size do you have your heap settings at in the conf/bootstrap.conf >> file? >> >> Thanks >> Joe >> >> On Wed, Jan 13, 2016 at 11:32 PM, obaidul karim <obaidc...@gmail.com> >> wrote: >> > Hi Joe, >> > >> > Please find attached jstat & iostat output. >> > >> > So far it seems to me that it is CPU bound. However, your eyes are >> > better >> > tan mine :). >> > >> > -Obaid >> > >> > On Thu, Jan 14, 2016 at 11:51 AM, Joe Witt <joe.w...@gmail.com> wrote: >> >> >> >> Hello >> >> >> >> Let's narrow in on potential issues. So while this process is running >> >> and appears sluggish in nature please run the following on the command >> >> line >> >> >> >> 'jps' >> >> >> >> This command will tell you the process id of NiFi. You'll want the >> >> pid associated with the Java process other than what is called 'jps' >> >> presuming there aren't other things running than NiFi at the time. >> >> >> >> Lets say the result is a pid of '12345' >> >> >> >> Then run this command >> >> >> >> 'jstat -gcutil 12345 1000' >> >> >> >> This will generate garbage collection information every one second >> >> until you decide to stop it with cntl-c. So let that run for a while >> >> say 30 seconds or so then hit cntl-c. Can you please paste that >> >> output in response. That will show us how the general health of GC >> >> is. >> >> >> >> Another really important/powerful set of output can be gleaned by >> >> running 'iostat' which gives you statistics about input/output to >> >> things like the underlying storage system. That is part of the >> >> 'sysstat' package in case you need to install that. But then you can >> >> run >> >> >> >> ''iostat xmh 1" >> >> >> >> Or something even as simple as 'iostat 1'. Your specific command >> >> string may vary. Please let that run for say 10-20 seconds and paste >> >> those results as well. That will give a sense of io utilization while >> >> the operation is running. >> >> >> >> Between these two outputs (Garbage Collection/IO) we should have a >> >> pretty good idea of where to focus the effort to find why it is slow. >> >> >> >> Thanks >> >> Joe >> >> >> >> >> >> On Wed, Jan 13, 2016 at 9:23 PM, obaidul karim <obaidc...@gmail.com> >> >> wrote: >> >> > Hi Joe & Others, >> >> > >> >> > Thanks for all of your suggestions. >> >> > >> >> > Now I am using below code: >> >> > 1. Buffered reader (I tried to use NLKBufferedReader, but it requires >> >> > too >> >> > many libs & Nifi failed to start. I was lost.) >> >> > 2. Buffered writer >> >> > 3. Using appending line end instead to concat new line >> >> > >> >> > Still no performance gain. Am I doing something wrong, anything else >> >> > I >> >> > can >> >> > change here. >> >> > >> >> > flowfile = session.write(flowfile, new StreamCallback() { >> >> > @Override >> >> > public void process(InputStream in, OutputStream out) throws >> >> > IOException >> >> > { >> >> > try (BufferedReader reader = new BufferedReader(new >> >> > InputStreamReader(in, charset), maxBufferSize); >> >> > BufferedWriter writer = new BufferedWriter(new >> >> > OutputStreamWriter(out, charset));) { >> >> > >> >> > if(skipHeader == true && headerExists==true) { // to skip header, do >> >> > an >> >> > additional line fetch before going to next step >> >> > if(reader.ready()) reader.readLine(); >> >> > } else if( skipHeader == false && headerExists == true) { // if >> >> > header >> >> > is >> >> > not skipped then no need to mask, just pass through >> >> > if(reader.ready()) { >> >> > writer.write(reader.readLine()); >> >> > writer.write(lineEndingBuilder.toString()); >> >> > } >> >> > } >> >> > // decide about empty line earlier >> >> > String line; >> >> > while ((line = reader.readLine()) != null) { >> >> > writer.write(parseLine(line, seperator, quote, escape, maskColumns)); >> >> > writer.write(lineEndingBuilder.toString()); >> >> > }; >> >> > writer.flush(); >> >> > } >> >> > } >> >> > >> >> > }); >> >> > >> >> > >> >> > -Obaid >> >> > >> >> > On Wed, Jan 13, 2016 at 1:38 PM, Joe Witt <joe.w...@gmail.com> wrote: >> >> >> >> >> >> Hello >> >> >> >> >> >> So the performance went from what sounded pretty good to what sounds >> >> >> pretty problematic. The rate now sounds like it is around 5MB/s >> >> >> which >> >> >> is indeed quite poor. Building on what Bryan said there does appear >> >> >> to be some good opportunities to improve the performance. The link >> >> >> he >> >> >> provided just expanded to cover the full range to look at is here >> >> >> [1]. >> >> >> >> >> >> Couple key points to note: >> >> >> 1) Use of a buffered line oriented reader than preserves the new >> >> >> lines >> >> >> 2) write to a buffered writer that accepts strings and understands >> >> >> which charset you intend to write out >> >> >> 3) avoid strong concat with newline >> >> >> >> >> >> Also keep in mind you how large any single line could be because if >> >> >> they can be quite large you may need to consider the GC pressure >> >> >> that >> >> >> can be caused. But let's take a look at how things are after these >> >> >> easier steps first. >> >> >> >> >> >> [1] >> >> >> >> >> >> >> >> >> https://github.com/apache/nifi/blob/ee14d8f9dd0c3f18920d910fcddd6d79b8b9f9cf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java#L334-L361 >> >> >> >> >> >> Thanks >> >> >> Joe >> >> >> >> >> >> On Tue, Jan 12, 2016 at 10:35 PM, Juan Sequeiros >> >> >> <helloj...@gmail.com> >> >> >> wrote: >> >> >> > Obaid, >> >> >> > >> >> >> > Since you mention that you will have dedicated ETL servers and >> >> >> > assume >> >> >> > they >> >> >> > will also have a decent amount of ram on them, then I would not >> >> >> > shy >> >> >> > away >> >> >> > from increasing your threads. >> >> >> > >> >> >> > Also in your staging directory if you do not need to keep >> >> >> > originals, >> >> >> > then >> >> >> > might consider GetFile and on that one use one thread. >> >> >> > >> >> >> > Hi Joe, >> >> >> > >> >> >> > Yes, I took consideration of existinh RAID and HW settings. We >> >> >> > have >> >> >> > 10G >> >> >> > NIC >> >> >> > for all hadoop intra-connectivity and the server in question is an >> >> >> > edge >> >> >> > node >> >> >> > of our hadoop cluster. >> >> >> > In production scenario we will use dedicated ETL servers having >> >> >> > high >> >> >> > performance(>500MB/s) local disks. >> >> >> > >> >> >> > Sharing a good news, I have successfully mask & load to HDFS 110 >> >> >> > GB >> >> >> > data >> >> >> > using below flow: >> >> >> > >> >> >> > ExecuteProcess(touch and mv to input dir) > ListFile (1 thread) > >> >> >> > FetchFile >> >> >> > (1 thread) > maskColumn(4 threads) > PutHDFS (1 threads). >> >> >> > >> >> >> > * used 4 threads for masking and 1 for other because I found it is >> >> >> > the >> >> >> > slowest component. >> >> >> > >> >> >> > However, It seems to be too slow. It was processing 2GB files in >> >> >> > 6 >> >> >> > minutes. >> >> >> > It may be because of my masking algorithm(although masking >> >> >> > algorithm >> >> >> > is >> >> >> > pretty simple FPE with some simple twist). >> >> >> > However I want to be sure that the way I have written custom >> >> >> > processor >> >> >> > is >> >> >> > the most efficient way. Please below code chunk and let me know >> >> >> > whether >> >> >> > it >> >> >> > is the fastest way to process flowfiles (csv source files) which >> >> >> > needs >> >> >> > modifications on specific columns: >> >> >> > >> >> >> > * parseLine method contains logic for masking. >> >> >> > >> >> >> > flowfile = session.write(flowfile, new StreamCallback() { >> >> >> > @Override >> >> >> > public void process(InputStream in, OutputStream out) >> >> >> > throws >> >> >> > IOException { >> >> >> > >> >> >> > BufferedReader reader = new BufferedReader(new >> >> >> > InputStreamReader(in)); >> >> >> > String line; >> >> >> > if(skipHeader == true && headerExists==true) { // to skip >> >> >> > header, do >> >> >> > an additional line fetch before going to next step >> >> >> > if(reader.ready()) reader.readLine(); >> >> >> > } else if( skipHeader == false && headerExists == true) { >> >> >> > // >> >> >> > if >> >> >> > header is not skipped then no need to mask, just pass through >> >> >> > if(reader.ready()) >> >> >> > out.write((reader.readLine()+"\n").getBytes()); >> >> >> > } >> >> >> > >> >> >> > // decide about empty line earlier >> >> >> > while ((line = reader.readLine()) != null) { >> >> >> > if(line.trim().length() > 0 ) { >> >> >> > out.write( parseLine(line, seperator, quote, escape, >> >> >> > maskColumns).getBytes() ); >> >> >> > } >> >> >> > }; >> >> >> > out.flush(); >> >> >> > } >> >> >> > }); >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > Thanks in advance. >> >> >> > -Obaid >> >> >> > >> >> >> > >> >> >> > On Tue, Jan 5, 2016 at 12:36 PM, Joe Witt <joe.w...@gmail.com> >> >> >> > wrote: >> >> >> >> >> >> >> >> Obaid, >> >> >> >> >> >> >> >> Really happy you're seeing the performance you need. That works >> >> >> >> out >> >> >> >> to about 110MB/s on average over that period. Any chance you >> >> >> >> have a >> >> >> >> 1GB NIC? If you really want to have fun with performance tuning >> >> >> >> you >> >> >> >> can use things like iostat and other commands to observe disk, >> >> >> >> network, cpu. Something else to consider too is the potential >> >> >> >> throughput gains of multiple RAID-1 containers rather than RAID-5 >> >> >> >> since NiFi can use both in parallel. Depends on your >> >> >> >> goals/workload >> >> >> >> so just an FYI. >> >> >> >> >> >> >> >> A good reference for how to build a processor which does altering >> >> >> >> of >> >> >> >> the data (transformation) is here [1]. It is a good idea to do a >> >> >> >> quick read through that document. Also, one of the great things >> >> >> >> you >> >> >> >> can do as well is look at existing processors. Some good >> >> >> >> examples >> >> >> >> relevant to transformation are [2], [3], and [4] which are quite >> >> >> >> simple stream transform types. Or take a look at [5] which is a >> >> >> >> more >> >> >> >> complicated example. You might also be excited to know that >> >> >> >> there >> >> >> >> is >> >> >> >> some really cool work done to bring various languages into NiFi >> >> >> >> which >> >> >> >> looks on track to be available in the upcoming 0.5.0 release >> >> >> >> which >> >> >> >> is >> >> >> >> NIFI-210 [6]. That will provide a really great option to quickly >> >> >> >> build transforms using languages like Groovy, JRuby, Javascript, >> >> >> >> Scala, Lua, Javascript, and Jython. >> >> >> >> >> >> >> >> [1] >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> https://nifi.apache.org/docs/nifi-docs/html/developer-guide.html#enrich-modify-content >> >> >> >> >> >> >> >> [2] >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java >> >> >> >> >> >> >> >> [3] >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformXml.java >> >> >> >> >> >> >> >> [4] >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ModifyBytes.java >> >> >> >> >> >> >> >> [5] >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java >> >> >> >> >> >> >> >> [6] https://issues.apache.org/jira/browse/NIFI-210 >> >> >> >> >> >> >> >> Thanks >> >> >> >> Joe >> >> >> >> >> >> >> >> On Mon, Jan 4, 2016 at 9:32 PM, obaidul karim >> >> >> >> <obaidc...@gmail.com> >> >> >> >> wrote: >> >> >> >> > Hi Joe, >> >> >> >> > >> >> >> >> > Just completed by test with 100GB data (on a local RAID 5 disk >> >> >> >> > on >> >> >> >> > a >> >> >> >> > single >> >> >> >> > server). >> >> >> >> > >> >> >> >> > I was able to load 100GB data within 15 minutes(awesome!!) >> >> >> >> > using >> >> >> >> > below >> >> >> >> > flow. >> >> >> >> > This throughput is enough to load 10TB data in a day with a >> >> >> >> > single >> >> >> >> > and >> >> >> >> > simple machine. >> >> >> >> > During the test, server disk I/O went up to 200MB/s. >> >> >> >> > >> >> >> >> > ExecuteProcess(touch and mv to input dir) > ListFile > >> >> >> >> > FetchFile >> >> >> >> > (4 >> >> >> >> > threads) > PutHDFS (4 threads) >> >> >> >> > >> >> >> >> > My Next action is to incorporate my java code for column >> >> >> >> > masking >> >> >> >> > with >> >> >> >> > a >> >> >> >> > custom processor. >> >> >> >> > I am now exploring on that. However, if you have any good >> >> >> >> > reference >> >> >> >> > on >> >> >> >> > custom processor(altering actual data) please let me know. >> >> >> >> > >> >> >> >> > Thanks, >> >> >> >> > Obaid >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >> > On Mon, Jan 4, 2016 at 9:11 AM, obaidul karim >> >> >> >> > <obaidc...@gmail.com> >> >> >> >> > wrote: >> >> >> >> >> >> >> >> >> >> Hi Joe, >> >> >> >> >> >> >> >> >> >> Yes, symlink is another option I was thinking when I was >> >> >> >> >> trying >> >> >> >> >> to >> >> >> >> >> use >> >> >> >> >> getfile. >> >> >> >> >> Thanks for your insights, I will update you on this mail chain >> >> >> >> >> when >> >> >> >> >> my >> >> >> >> >> entire workflow completes. So that thus could be an reference >> >> >> >> >> for >> >> >> >> >> other >> >> >> >> >> :). >> >> >> >> >> >> >> >> >> >> -Obaid >> >> >> >> >> >> >> >> >> >> On Monday, January 4, 2016, Joe Witt <joe.w...@gmail.com> >> >> >> >> >> wrote: >> >> >> >> >>> >> >> >> >> >>> Obaid, >> >> >> >> >>> >> >> >> >> >>> You make a great point. >> >> >> >> >>> >> >> >> >> >>> I agree we will ultimately need to do more to make that very >> >> >> >> >>> valid >> >> >> >> >>> approach work easily. The downside is that puts the onus on >> >> >> >> >>> NiFi >> >> >> >> >>> to >> >> >> >> >>> keep track of a variety of potentially quite large state >> >> >> >> >>> about >> >> >> >> >>> the >> >> >> >> >>> directory. One way to avoid that expense is if NiFi can pull >> >> >> >> >>> a >> >> >> >> >>> copy >> >> >> >> >>> of then delete the source file. If you'd like to keep a copy >> >> >> >> >>> around I >> >> >> >> >>> wonder if a good approach is to simply create a symlink to >> >> >> >> >>> the >> >> >> >> >>> original file you want NiFi to pull but have the symlink in >> >> >> >> >>> the >> >> >> >> >>> NiFi >> >> >> >> >>> pickup directory. NiFi is then free to read and delete which >> >> >> >> >>> means >> >> >> >> >>> it >> >> >> >> >>> simply pulls whatever shows up in that directory and doesn't >> >> >> >> >>> have >> >> >> >> >>> to >> >> >> >> >>> keep state about filenames and checksums. >> >> >> >> >>> >> >> >> >> >>> I realize we still need to do what you're suggesting as well >> >> >> >> >>> but >> >> >> >> >>> thought I'd run this by you. >> >> >> >> >>> >> >> >> >> >>> Joe >> >> >> >> >>> >> >> >> >> >>> On Sun, Jan 3, 2016 at 6:43 PM, obaidul karim >> >> >> >> >>> <obaidc...@gmail.com> >> >> >> >> >>> wrote: >> >> >> >> >>> > Hi Joe, >> >> >> >> >>> > >> >> >> >> >>> > Condider a scenerio, where we need to feed some older files >> >> >> >> >>> > and >> >> >> >> >>> > we >> >> >> >> >>> > are >> >> >> >> >>> > using >> >> >> >> >>> > "mv" to feed files to input directory( to reduce IO we may >> >> >> >> >>> > use >> >> >> >> >>> > "mv"). >> >> >> >> >>> > If we >> >> >> >> >>> > use "mv", last modified date will not changed. And this is >> >> >> >> >>> > very >> >> >> >> >>> > common >> >> >> >> >>> > on a >> >> >> >> >>> > busy file collection system. >> >> >> >> >>> > >> >> >> >> >>> > However, I think I can still manage it by adding additional >> >> >> >> >>> > "touch" >> >> >> >> >>> > before >> >> >> >> >>> > moving fole in the target directory. >> >> >> >> >>> > >> >> >> >> >>> > So, my suggestion is to add file selection criteria as an >> >> >> >> >>> > configurable >> >> >> >> >>> > option in listfile process on workflow. Options could be >> >> >> >> >>> > last >> >> >> >> >>> > modified >> >> >> >> >>> > date(as current one) unique file names, checksum etc. >> >> >> >> >>> > >> >> >> >> >>> > Thanks again man. >> >> >> >> >>> > -Obaid >> >> >> >> >>> > >> >> >> >> >>> > >> >> >> >> >>> > On Monday, January 4, 2016, Joe Witt <joe.w...@gmail.com> >> >> >> >> >>> > wrote: >> >> >> >> >>> >> >> >> >> >> >>> >> Hello Obaid, >> >> >> >> >>> >> >> >> >> >> >>> >> The default behavior of the ListFile processor is to keep >> >> >> >> >>> >> track >> >> >> >> >>> >> of >> >> >> >> >>> >> the >> >> >> >> >>> >> last modified time of the files it lists. When you >> >> >> >> >>> >> changed >> >> >> >> >>> >> the >> >> >> >> >>> >> name >> >> >> >> >>> >> of the file that doesn't change the last modified time as >> >> >> >> >>> >> tracked >> >> >> >> >>> >> by >> >> >> >> >>> >> the OS but when you altered content it does. Simply >> >> >> >> >>> >> 'touch' >> >> >> >> >>> >> on >> >> >> >> >>> >> the >> >> >> >> >>> >> file would do it too. >> >> >> >> >>> >> >> >> >> >> >>> >> I believe we could observe the last modified time of the >> >> >> >> >>> >> directory >> >> >> >> >>> >> in >> >> >> >> >>> >> which the file lives to detect something like a rename. >> >> >> >> >>> >> However, >> >> >> >> >>> >> we'd >> >> >> >> >>> >> not know which file was renamed just that something was >> >> >> >> >>> >> changed. >> >> >> >> >>> >> So >> >> >> >> >>> >> it require keeping some potentially problematic state to >> >> >> >> >>> >> deconflict >> >> >> >> >>> >> or >> >> >> >> >>> >> requiring the user to have a duplicate detection process >> >> >> >> >>> >> afterwards. >> >> >> >> >>> >> >> >> >> >> >>> >> So with that in mind is the current behavior sufficient >> >> >> >> >>> >> for >> >> >> >> >>> >> your >> >> >> >> >>> >> case? >> >> >> >> >>> >> >> >> >> >> >>> >> Thanks >> >> >> >> >>> >> Joe >> >> >> >> >>> >> >> >> >> >> >>> >> On Sun, Jan 3, 2016 at 6:17 AM, obaidul karim >> >> >> >> >>> >> <obaidc...@gmail.com> >> >> >> >> >>> >> wrote: >> >> >> >> >>> >> > Hi Joe, >> >> >> >> >>> >> > >> >> >> >> >>> >> > I am now exploring your solution. >> >> >> >> >>> >> > Starting with below flow: >> >> >> >> >>> >> > >> >> >> >> >>> >> > ListFIle > FetchFile > CompressContent > PutFile. >> >> >> >> >>> >> > >> >> >> >> >>> >> > Seems all fine. Except some confusion with how ListFile >> >> >> >> >>> >> > identifies >> >> >> >> >>> >> > new >> >> >> >> >>> >> > files. >> >> >> >> >>> >> > In order to test, I renamed a already processed file and >> >> >> >> >>> >> > put >> >> >> >> >>> >> > in >> >> >> >> >>> >> > in >> >> >> >> >>> >> > input >> >> >> >> >>> >> > folder and found that the file is not processing. >> >> >> >> >>> >> > Then I randomly changed the content of the file and it >> >> >> >> >>> >> > was >> >> >> >> >>> >> > immediately >> >> >> >> >>> >> > processed. >> >> >> >> >>> >> > >> >> >> >> >>> >> > My question is what is the new file selection criteria >> >> >> >> >>> >> > for >> >> >> >> >>> >> > "ListFile" ? >> >> >> >> >>> >> > Can >> >> >> >> >>> >> > I change it only to file name ? >> >> >> >> >>> >> > >> >> >> >> >>> >> > Thanks in advance. >> >> >> >> >>> >> > >> >> >> >> >>> >> > -Obaid >> >> >> >> >>> >> > >> >> >> >> >>> >> > >> >> >> >> >>> >> > >> >> >> >> >>> >> > >> >> >> >> >>> >> > >> >> >> >> >>> >> > >> >> >> >> >>> >> > >> >> >> >> >>> >> > On Fri, Jan 1, 2016 at 10:43 PM, Joe Witt >> >> >> >> >>> >> > <joe.w...@gmail.com> >> >> >> >> >>> >> > wrote: >> >> >> >> >>> >> >> >> >> >> >> >>> >> >> Hello Obaid, >> >> >> >> >>> >> >> >> >> >> >> >>> >> >> At 6 TB/day and average size of 2-3GB per dataset >> >> >> >> >>> >> >> you're >> >> >> >> >>> >> >> looking >> >> >> >> >>> >> >> at >> >> >> >> >>> >> >> a >> >> >> >> >>> >> >> sustained rate of 70+MB/s and a pretty low transaction >> >> >> >> >>> >> >> rate. >> >> >> >> >>> >> >> So >> >> >> >> >>> >> >> well >> >> >> >> >>> >> >> within a good range to work with on a single system. >> >> >> >> >>> >> >> >> >> >> >> >>> >> >> 'I's there any way to by pass writing flow files on >> >> >> >> >>> >> >> disk >> >> >> >> >>> >> >> or >> >> >> >> >>> >> >> directly >> >> >> >> >>> >> >> pass those files to HDFS as it is ?" >> >> >> >> >>> >> >> >> >> >> >> >>> >> >> There is no way to bypass NiFi taking a copy of that >> >> >> >> >>> >> >> data >> >> >> >> >>> >> >> by >> >> >> >> >>> >> >> design. >> >> >> >> >>> >> >> NiFi is helping you formulate a graph of dataflow >> >> >> >> >>> >> >> requirements >> >> >> >> >>> >> >> from >> >> >> >> >>> >> >> a >> >> >> >> >>> >> >> given source(s) through given processing steps and >> >> >> >> >>> >> >> ultimate >> >> >> >> >>> >> >> driving >> >> >> >> >>> >> >> data into given destination systems. As a result it >> >> >> >> >>> >> >> takes >> >> >> >> >>> >> >> on >> >> >> >> >>> >> >> the >> >> >> >> >>> >> >> challenge of handling transactionality of each >> >> >> >> >>> >> >> interaction >> >> >> >> >>> >> >> and >> >> >> >> >>> >> >> the >> >> >> >> >>> >> >> buffering and backpressure to deal with the realities >> >> >> >> >>> >> >> of >> >> >> >> >>> >> >> different >> >> >> >> >>> >> >> production/consumption patterns. >> >> >> >> >>> >> >> >> >> >> >> >>> >> >> "If the files on the spool directory are >> >> >> >> >>> >> >> compressed(zip/gzip), >> >> >> >> >>> >> >> can >> >> >> >> >>> >> >> we >> >> >> >> >>> >> >> store files on HDFS as uncompressed ?" >> >> >> >> >>> >> >> >> >> >> >> >>> >> >> Certainly. Both of those formats (zip/gzip) are >> >> >> >> >>> >> >> supported >> >> >> >> >>> >> >> in >> >> >> >> >>> >> >> NiFi >> >> >> >> >>> >> >> out of the box. You simply run the data through the >> >> >> >> >>> >> >> proper >> >> >> >> >>> >> >> process >> >> >> >> >>> >> >> prior to the PutHDFS process to unpack (zip) or >> >> >> >> >>> >> >> decompress >> >> >> >> >>> >> >> (gzip) >> >> >> >> >>> >> >> as >> >> >> >> >>> >> >> needed. >> >> >> >> >>> >> >> >> >> >> >> >>> >> >> "2.a Can we use our existing java code for masking ? if >> >> >> >> >>> >> >> yes >> >> >> >> >>> >> >> then >> >> >> >> >>> >> >> how ? >> >> >> >> >>> >> >> 2.b For this Scenario we also want to bypass storing >> >> >> >> >>> >> >> flow >> >> >> >> >>> >> >> files >> >> >> >> >>> >> >> on >> >> >> >> >>> >> >> disk. Can we do it on the fly, masking and storing on >> >> >> >> >>> >> >> HDFS >> >> >> >> >>> >> >> ? >> >> >> >> >>> >> >> 2.c If the source files are compressed (zip/gzip), is >> >> >> >> >>> >> >> there >> >> >> >> >>> >> >> any >> >> >> >> >>> >> >> issue >> >> >> >> >>> >> >> for masking here ?" >> >> >> >> >>> >> >> >> >> >> >> >>> >> >> You would build a custom NiFi processor that >> >> >> >> >>> >> >> leverages >> >> >> >> >>> >> >> your >> >> >> >> >>> >> >> existing >> >> >> >> >>> >> >> code. If your code is able to operate on an >> >> >> >> >>> >> >> InputStream >> >> >> >> >>> >> >> and >> >> >> >> >>> >> >> writes >> >> >> >> >>> >> >> to >> >> >> >> >>> >> >> an OutputStream then it is very likely you'll be able >> >> >> >> >>> >> >> to >> >> >> >> >>> >> >> handle >> >> >> >> >>> >> >> arbitrarily large objects with zero negative impact to >> >> >> >> >>> >> >> the >> >> >> >> >>> >> >> JVM >> >> >> >> >>> >> >> Heap >> >> >> >> >>> >> >> as >> >> >> >> >>> >> >> well. This is thanks to the fact that the data is >> >> >> >> >>> >> >> present >> >> >> >> >>> >> >> in >> >> >> >> >>> >> >> NiFi's >> >> >> >> >>> >> >> repository with copy-on-write/pass-by-reference >> >> >> >> >>> >> >> semantics >> >> >> >> >>> >> >> and >> >> >> >> >>> >> >> that >> >> >> >> >>> >> >> the >> >> >> >> >>> >> >> API is exposing those streams to your code in a >> >> >> >> >>> >> >> transactional >> >> >> >> >>> >> >> manner. >> >> >> >> >>> >> >> >> >> >> >> >>> >> >> If you want the process of writing to HDFS to also do >> >> >> >> >>> >> >> decompression >> >> >> >> >>> >> >> and masking in one pass you'll need to extend/alter the >> >> >> >> >>> >> >> PutHDFS >> >> >> >> >>> >> >> process to do that. It is probably best to implement >> >> >> >> >>> >> >> the >> >> >> >> >>> >> >> flow >> >> >> >> >>> >> >> using >> >> >> >> >>> >> >> cohesive processors (grab files, decompress files, mask >> >> >> >> >>> >> >> files, >> >> >> >> >>> >> >> write >> >> >> >> >>> >> >> to hdfs). Given how the repository construct in NiFi >> >> >> >> >>> >> >> works >> >> >> >> >>> >> >> and >> >> >> >> >>> >> >> given >> >> >> >> >>> >> >> how caching in Linux works it is very possible you'll >> >> >> >> >>> >> >> be >> >> >> >> >>> >> >> quite >> >> >> >> >>> >> >> surprised by the throughput you'll see. Even then you >> >> >> >> >>> >> >> can >> >> >> >> >>> >> >> optimize >> >> >> >> >>> >> >> once you're sure you need to. The other thing to keep >> >> >> >> >>> >> >> in >> >> >> >> >>> >> >> mind >> >> >> >> >>> >> >> here >> >> >> >> >>> >> >> is >> >> >> >> >>> >> >> that often a flow that starts out as specific as this >> >> >> >> >>> >> >> turns >> >> >> >> >>> >> >> into >> >> >> >> >>> >> >> a >> >> >> >> >>> >> >> great place to tap the stream of data to feed some new >> >> >> >> >>> >> >> system >> >> >> >> >>> >> >> or >> >> >> >> >>> >> >> new >> >> >> >> >>> >> >> algorithm with a different format or protocol. At that >> >> >> >> >>> >> >> moment >> >> >> >> >>> >> >> the >> >> >> >> >>> >> >> benefits become even more obvious. >> >> >> >> >>> >> >> >> >> >> >> >>> >> >> Regarding the Flume processes in NiFi and their memory >> >> >> >> >>> >> >> usage. >> >> >> >> >>> >> >> NiFi >> >> >> >> >>> >> >> offers a nice hosting mechanism for the Flume processes >> >> >> >> >>> >> >> and >> >> >> >> >>> >> >> brings >> >> >> >> >>> >> >> some of the benefits of NiFi's UI, provenance, >> >> >> >> >>> >> >> repository >> >> >> >> >>> >> >> concept. >> >> >> >> >>> >> >> However, we're still largely limited to the design >> >> >> >> >>> >> >> assumptions >> >> >> >> >>> >> >> one >> >> >> >> >>> >> >> gets when building a Flume process and that can be >> >> >> >> >>> >> >> quite >> >> >> >> >>> >> >> memory >> >> >> >> >>> >> >> limiting. We see what we have today as a great way to >> >> >> >> >>> >> >> help >> >> >> >> >>> >> >> people >> >> >> >> >>> >> >> transition their existing Flume flows into NiFi by >> >> >> >> >>> >> >> leveraging >> >> >> >> >>> >> >> their >> >> >> >> >>> >> >> existing code but would recommend working to phase the >> >> >> >> >>> >> >> use >> >> >> >> >>> >> >> of >> >> >> >> >>> >> >> those >> >> >> >> >>> >> >> out in time so that you can take full benefit of what >> >> >> >> >>> >> >> NiFi >> >> >> >> >>> >> >> brings >> >> >> >> >>> >> >> over >> >> >> >> >>> >> >> Flume. >> >> >> >> >>> >> >> >> >> >> >> >>> >> >> Thanks >> >> >> >> >>> >> >> Joe >> >> >> >> >>> >> >> >> >> >> >> >>> >> >> >> >> >> >> >>> >> >> On Fri, Jan 1, 2016 at 4:18 AM, obaidul karim >> >> >> >> >>> >> >> <obaidc...@gmail.com> >> >> >> >> >>> >> >> wrote: >> >> >> >> >>> >> >> > Hi, >> >> >> >> >>> >> >> > >> >> >> >> >>> >> >> > I am new in Nifi and exploring it as open source ETL >> >> >> >> >>> >> >> > tool. >> >> >> >> >>> >> >> > >> >> >> >> >>> >> >> > As per my understanding, flow files are stored on >> >> >> >> >>> >> >> > local >> >> >> >> >>> >> >> > disk >> >> >> >> >>> >> >> > and >> >> >> >> >>> >> >> > it >> >> >> >> >>> >> >> > contains >> >> >> >> >>> >> >> > actual data. >> >> >> >> >>> >> >> > If above is true, lets consider a below scenario: >> >> >> >> >>> >> >> > >> >> >> >> >>> >> >> > Scenario 1: >> >> >> >> >>> >> >> > - In a spool directory we have terabytes(5-6TB/day) >> >> >> >> >>> >> >> > of >> >> >> >> >>> >> >> > files >> >> >> >> >>> >> >> > coming >> >> >> >> >>> >> >> > from >> >> >> >> >>> >> >> > external sources >> >> >> >> >>> >> >> > - I want to push those files to HDFS as it is without >> >> >> >> >>> >> >> > any >> >> >> >> >>> >> >> > changes >> >> >> >> >>> >> >> > >> >> >> >> >>> >> >> > Scenario 2: >> >> >> >> >>> >> >> > - In a spool directory we have terabytes(5-6TB/day) >> >> >> >> >>> >> >> > of >> >> >> >> >>> >> >> > files >> >> >> >> >>> >> >> > coming >> >> >> >> >>> >> >> > from >> >> >> >> >>> >> >> > external sources >> >> >> >> >>> >> >> > - I want to mask some of the sensitive columns >> >> >> >> >>> >> >> > - Then send one copy to HDFS and another copy to >> >> >> >> >>> >> >> > Kafka >> >> >> >> >>> >> >> > >> >> >> >> >>> >> >> > Question for Scenario 1: >> >> >> >> >>> >> >> > 1.a In that case those 5-6TB data will be again >> >> >> >> >>> >> >> > written >> >> >> >> >>> >> >> > on >> >> >> >> >>> >> >> > local >> >> >> >> >>> >> >> > disk >> >> >> >> >>> >> >> > as >> >> >> >> >>> >> >> > flow files and will cause double I/O. Which >> >> >> >> >>> >> >> > eventually >> >> >> >> >>> >> >> > may >> >> >> >> >>> >> >> > cause >> >> >> >> >>> >> >> > slower >> >> >> >> >>> >> >> > performance due to I/O bottleneck. >> >> >> >> >>> >> >> > Is there any way to by pass writing flow files on >> >> >> >> >>> >> >> > disk >> >> >> >> >>> >> >> > or >> >> >> >> >>> >> >> > directly >> >> >> >> >>> >> >> > pass >> >> >> >> >>> >> >> > those files to HDFS as it is ? >> >> >> >> >>> >> >> > 1.b If the files on the spool directory are >> >> >> >> >>> >> >> > compressed(zip/gzip), >> >> >> >> >>> >> >> > can >> >> >> >> >>> >> >> > we >> >> >> >> >>> >> >> > store files on HDFS as uncompressed ? >> >> >> >> >>> >> >> > >> >> >> >> >>> >> >> > Question for Scenario 2: >> >> >> >> >>> >> >> > 2.a Can we use our existing java code for masking ? >> >> >> >> >>> >> >> > if >> >> >> >> >>> >> >> > yes >> >> >> >> >>> >> >> > then >> >> >> >> >>> >> >> > how ? >> >> >> >> >>> >> >> > 2.b For this Scenario we also want to bypass storing >> >> >> >> >>> >> >> > flow >> >> >> >> >>> >> >> > files >> >> >> >> >>> >> >> > on >> >> >> >> >>> >> >> > disk. >> >> >> >> >>> >> >> > Can >> >> >> >> >>> >> >> > we do it on the fly, masking and storing on HDFS ? >> >> >> >> >>> >> >> > 2.c If the source files are compressed (zip/gzip), is >> >> >> >> >>> >> >> > there >> >> >> >> >>> >> >> > any >> >> >> >> >>> >> >> > issue >> >> >> >> >>> >> >> > for >> >> >> >> >>> >> >> > masking here ? >> >> >> >> >>> >> >> > >> >> >> >> >>> >> >> > >> >> >> >> >>> >> >> > In fact, I tried above using flume+flume >> >> >> >> >>> >> >> > interceptors. >> >> >> >> >>> >> >> > Everything >> >> >> >> >>> >> >> > working >> >> >> >> >>> >> >> > fine with smaller files. But when source files >> >> >> >> >>> >> >> > greater >> >> >> >> >>> >> >> > that >> >> >> >> >>> >> >> > 50MB >> >> >> >> >>> >> >> > flume >> >> >> >> >>> >> >> > chocks :(. >> >> >> >> >>> >> >> > So, I am exploring options in NiFi. Hope I will get >> >> >> >> >>> >> >> > some >> >> >> >> >>> >> >> > guideline >> >> >> >> >>> >> >> > from >> >> >> >> >>> >> >> > you >> >> >> >> >>> >> >> > guys. >> >> >> >> >>> >> >> > >> >> >> >> >>> >> >> > >> >> >> >> >>> >> >> > Thanks in advance. >> >> >> >> >>> >> >> > -Obaid >> >> >> >> >>> >> > >> >> >> >> >>> >> > >> >> >> >> > >> >> >> >> > >> >> >> > >> >> >> > >> >> > >> >> > >> > >> > > >