Mike, Just out of curiosity, what would the original data for your example look like that produced that JSON?
Is it a CSV with two lines, like: ABC, XYZ DEF, LMN and then ExecuteScript is turning that into the JSON array? As far as reading the JSON, I created a simple flow of GeneratFlowFile -> ConvertRecord -> LogAttribute where ConvertRecord uses the JsonPathReader with $.value https://gist.github.com/bbende/3789a6907a9af09aa7c32413040e7e2b LogAttribute ends up logging: [ { "value" : "XYZ" }, { "value" : "LMN" } ] Which seems correct given that its reading in the JSON with a schema that only has the field "value" in it. Let me know if that is not what you are looking for. On Thu, Jun 8, 2017 at 4:13 PM, Mike Thomsen <[email protected]> wrote: > Bryan, > > I have the processor somewhat operational now, but I'm running into a > problem with the record readers. What I've done is basically this: > > Ex. JSON: > > [ > { > "key": "ABC", "value": "XYZ" > }, > { > "key": "DEF", "value": "LMN" > } > ] > > Avro schema: > > { > "type": "record", > "name": "GenomeRecord", > "fields": [{ > "name": "value", > "type": "string" > }, > ] > } > > 1. ExecuteScript iterates over a line and builds a JSON array as mentioned > above. > 2. PutHBaseRecord is wired to use a JsonPathReader that uses an > AvroSchemaRegistry. > - I put a lot of logging in and can verify it is identifying the schema > based on the attribute on the flowfile and looking at the appropriate field > while looping over the Record to turn it into a serializable form for a Put. > - All I get are nulls. > 3. My JsonPath has been variously $.value and $[*].value. It just does not > seem to want to parse that JSON. > > The strategy I was going for is to use the "key" attribute in each JSON > object to set the row key for the Put. > > Any ideas would be great. > > Thanks, > > Mike > > On Wed, Jun 7, 2017 at 4:40 PM, Bryan Bende <[email protected]> wrote: >> >> Mike, >> >> Glad to hear that the record API looks promising for what you are trying >> to do! >> >> Here are a couple of thoughts, and please correct me if I am not >> understanding your flow correctly... >> >> We should be able to make a generic PutHBaseRecord processor that uses >> any record reader to read the incoming flow file and then converts >> each record directly into a PutFlowFile (more on this in a minute). >> >> Once we have PutHBaseRecord, then there may be no need for you to >> convert your data from CSV to JSON (unless there is another reason I >> am missing) because you can send your CSV data directly into >> PutHBaseRecord configured with a CSVRecordReader. >> >> If you are doing other processing/enrichment while going from CSV to >> JSON, then you may be able to achieve some of the same things with >> processors like UpdateRecord, PartitionRecord, and LookupRecord. >> Essentially keeping the initial CSV intact and treating it like >> records through the entire flow. >> >> Now back to PutHBaseRecord and the question of how to go from a Record >> to a PutFlowFile... >> >> We basically need to know the rowId, column family, and then a list of >> column-qualifier/value pairs. I haven't fully though this through yet, >> but... >> >> For the row id, we could have a similar strategy as PutHBaseJson, >> where the value comes from a "Row Id" property in the processor or >> from a "Row Id Record Path" which would evaluate the record path >> against the record and use that value for the row id. >> >> For column family, we could probably do the same as above, where it >> could be from a property or a record path. >> >> For the list of column-qualifier/value pairs, we can loop over all >> fields in the record (skipping the row id and family if using record >> fields) and then convert each one into a PutColumn. The bulk of the >> work here is going to be taking the value of a field and turning it >> into an appropriate byte[], so you'll likely want to use the type of >> the field to cast into an appropriate Java type and then figure out >> how to represent that as bytes. >> >> I know this was a lot of information, but I hope this helps, and let >> me know if anything is not making sense. >> >> Thanks, >> >> Bryan >> >> >> On Wed, Jun 7, 2017 at 3:56 PM, Mike Thomsen <[email protected]> >> wrote: >> > Yeah, it's really getting hammered by the small files. I took a look at >> > the >> > new record APIs and that looked really promising. In fact, I'm taking a >> > shot >> > at creating a variant of PutHBaseJSON that uses the record API. Look >> > fairly >> > straight forward so far. My strategy is roughly like this: >> > >> > GetFile -> SplitText -> ExecuteScript -> RouteOnAttribute -> >> > PutHBaseJSONRecord >> > >> > ExecuteScript generates a larger flowfile that contains a structure like >> > this now: >> > >> > [ >> > { "key": "XYZ", "value": "ABC" } >> > ] >> > >> > >> > My intention is to have a JsonPathReader take that bigger flowfile which >> > is >> > a JSON array and iterate over it as a bunch of records to turn into Puts >> > with the new HBase processor. I'm borrowing some code for wiring in the >> > reader from the QueryRecord processor. >> > >> > So my only question now is, what is the best way to serialize the Record >> > objects to JSON? The PutHBaseJson processor already has a Jackson setup >> > internally. Any suggestions on doing this in a way that doesn't tie me >> > at >> > the hip to a particular reader implementation? >> > >> > Thanks, >> > >> > Mike >> > >> > >> > On Wed, Jun 7, 2017 at 6:12 PM, Bryan Bende <[email protected]> wrote: >> >> >> >> Mike, >> >> >> >> Just following up on this... >> >> >> >> I created this JIRA to track the idea of record-based HBase processors: >> >> https://issues.apache.org/jira/browse/NIFI-4034 >> >> >> >> Also wanted to mention that with the existing processors, the main way >> >> to scale up would be to increase the concurrent tasks on PutHBaseJson >> >> and also to increase the Batch Size property which defaults to 25. The >> >> Batch Size controls the maximum number of flow files that a concurrent >> >> task will attempt to pull from the queue and send to HBase in one put >> >> operation. >> >> >> >> Even with those tweaks your flow may still be getting hammered with >> >> lots of small flow files, but thought I would mention to see if it >> >> helps at all. >> >> >> >> -Bryan >> >> >> >> >> >> On Tue, Jun 6, 2017 at 7:40 PM, Bryan Bende <[email protected]> wrote: >> >> > Mike, >> >> > >> >> > With the recent record-oriented processors that have come out >> >> > recently, >> >> > a >> >> > good solution would be to implement a PutHBaseRecord processor that >> >> > would >> >> > have a Record Reader configured. This way the processor could read in >> >> > a >> >> > large CSV without having to convert to individual JSON documents. >> >> > >> >> > One thing to consider is how many records/puts to send in a single >> >> > call >> >> > to >> >> > HBase. Assuming multi-GB csv files you'll want to send portions at a >> >> > time to >> >> > avoid having the whole content in memory (some kind of record batch >> >> > size >> >> > property), but then you also have to deal with what happens when >> >> > things >> >> > fail >> >> > half way through. If the puts are idempotent then it may be fine to >> >> > route >> >> > the whole to failure and try again even if some data was already >> >> > inserted. >> >> > >> >> > Feel free to create a JIRA for hbase record processors, or I can do >> >> > it >> >> > later. >> >> > >> >> > Hope that helps. >> >> > >> >> > -Bryan >> >> > >> >> > >> >> > On Tue, Jun 6, 2017 at 7:21 PM Mike Thomsen <[email protected]> >> >> > wrote: >> >> >> >> >> >> We have a very large body of CSV files (well over 1TB) that need to >> >> >> be >> >> >> imported into HBase. For a single 20GB segment, we are looking at >> >> >> having to >> >> >> push easily 100M flowfiles into HBase and most of the JSON files >> >> >> generated >> >> >> are rather small (like 20-250 bytes). >> >> >> >> >> >> It's going very slowly, and I assume that is because we're taxing >> >> >> the >> >> >> disk >> >> >> very heavily because of the content and provenance repositories >> >> >> coming >> >> >> into >> >> >> play. So I'm wondering if anyone has a suggestion on a good >> >> >> NiFiesque >> >> >> way of >> >> >> solving this. Right now, I'm considering two options: >> >> >> >> >> >> 1. Looking for a way to inject the HBase controller service into an >> >> >> ExecuteScript processor so I can handle the data in large chunks >> >> >> (splitting >> >> >> text and generating a List<Put> inside the processor myself and >> >> >> doing >> >> >> one >> >> >> huge Put) >> >> >> >> >> >> 2. Creating a library that lets me generate HFiles from within an >> >> >> ExecuteScript processor. >> >> >> >> >> >> What I really need is something fast within NiFi that would let me >> >> >> generate huge blocks of updates for HBase and push them out. Any >> >> >> ideas? >> >> >> >> >> >> Thanks, >> >> >> >> >> >> Mike >> >> > >> >> > -- >> >> > Sent from Gmail Mobile >> > >> > > >
