Hi Richard Have you tried JoltTransformJSON or JoltTransformRecord I believe you should be able to do this
Quick start here: https://community.cloudera.com/t5/Community-Articles/Jolt-quick-reference-for-Nifi-Jolt-Processors/ta-p/244350 > On Dec 20, 2022, at 4:13 AM, Richard Beare <[email protected]> wrote: > > Hi Everyone, > Still struggling to fix this issue and may need to try some different things. > > What is the recommended way of transforming a record structure? At the moment > I have a groovy script doing this but the downstream processing is very slow, > as discussed in the preceding thread. > > The transformation is very simple - starting structure is: > > { > "result" : { > "text" : " document text", > "metadata" : { > "X-TIKA:Parsed-By": [ > "org.apache.tika.parser.pdf.PDFParser" > ], > "X-OCR-Applied" : true, > "dcterms:created": "2018;07-24T15:04:51Z", > "Content-Type" : "application/pdf", > "Page-Count" : 2, > }, > "success" : true, > "timestamp" : "2022-12-20T09:02:27.902Z", > "processingElapsedTime" : 6 > } > } > > > final structure is': > > [ { > "doc_id" : 58, > "doc_text" : " ", > "processing_timestamp" : "2022-12-20T09:02:27.902Z", > "metadata_x_ocr_applies" : true, > "metadata_x_parsed_by" : "org.apache.tika.parser.pdf.PDFParser", > "metadata_content_type" : "application/pdf", > "metadata_page_count" : 1 > "metadata_creation_date": null, > "metadata_last_modified: nill > }] > > So a kind of flattening of the structure. Is there a processor I should be > using to do this instead of a groovy script? > > Thanks > > On Wed, Dec 14, 2022 at 7:57 AM Richard Beare <[email protected] > <mailto:[email protected]>> wrote: > Any thoughts on this? Are there some extra steps required when creating an > avro file from a user defined schema? > > On Thu, Dec 8, 2022 at 2:56 PM Richard Beare <[email protected] > <mailto:[email protected]>> wrote: > Here's another result that I think suggests there's something wrong with the > avro files created by the groovy script, although I can't see what the > problem might be. > > The test is as follows. Output of the groovy script creating avro files is > passed to convertrecord, configured with an avro reader and json writer. This > is slow. The json output is then converted back to avro with another > convertrecord processor, configured with a jsontreereader and an avro writer > - this is fast, instantly emptying the queue. The result of that is fed into > the previously problematic merge processor which works exactly as expected, > producing flowfiles with 100 records each. > > The difference I can see between the two flow files is the way in which the > schema is specified. Perhaps some extras are required in the groovy file to > set that up? > > The slow one has: > > {"type":"record","name":"document", "fields":[{ > > The fast one > {"type":"record","name":"nifiRecord","namespace":"org.apache.nifi","fields": > > > Initial characters are also slightly different. > Slow one: > > 0000000 O b j 001 002 026 a v r o . s c h e m > 0000020 a 346 \n { " t y p e " : " r e c o > > Fast one > > 0000000 O b j 001 004 026 a v r o . s c h e m > 0000020 a 362 \b { " t y p e " : " r e c o > > > The groovy script is > CogStack-NiFi/parse-tika-result-json-to-avro.groovy at master · > CogStack/CogStack-NiFi · GitHub > <https://github.com/CogStack/CogStack-NiFi/blob/master/nifi/user-scripts/parse-tika-result-json-to-avro.groovy> > > The schema is > CogStack-NiFi/document.avsc at master · CogStack/CogStack-NiFi · GitHub > <https://github.com/CogStack/CogStack-NiFi/blob/master/nifi/user-schemas/document.avsc> > > > On Thu, Dec 8, 2022 at 1:59 PM Richard Beare <[email protected] > <mailto:[email protected]>> wrote: > I'm diving into the convertrecord tests a bit deeper on the production server. > > The first test case - 259 documents, total of 1M when in avro format in the > input queue to the convert record processor. These avro files were not > created by the groovy script - they start life as a database query and the > text field is in one of the columns. The convertrecord processor runs very > quickly - click start, press refresh and it is done. The avro ends up like > this: > > [ { > "sampleid" : 1075, > "typeid" : 98, > "dct" : "2020-01-25T21:40:25.515Z", > "filename" : "__tmp/txt/mtsamples-type-98-sample-1075.txt", > "document" : "Text removed", > "docid" : "9" > } ] > > In the second test, where the text fields are extracted from pdf tika before > avro files are created by the groovy script (from the tika json output), the > total queue size for the 259 documents is larger - 1.77MB, and the > performance is very different - press start, click refresh and only two > flowfiles are processed. > > [ { > "doc_id" : "70", > "doc_text" : "text removed", > "processing_timestamp" : "2022-12-07T23:09:52.354Z", > "metadata_x_ocr_applied" : true, > "metadata_x_parsed_by" : "org.apache.tika.parser.pdf.PDFParser", > "metadata_content_type" : "application/pdf", > "metadata_page_count" : 1, > "metadata_creation_date" : null, > "metadata_last_modified" : null > } ] > > I've noticed that the second one has a content.type attribute of > 'application/json' which doesn't seem right and doesn't match the fast case. > I'll see what happens if I change that. > > On Thu, Dec 8, 2022 at 9:39 AM Richard Beare <[email protected] > <mailto:[email protected]>> wrote: > Hi All, > Some progress on debugging options. I've found a flow that exhibits the > problem using synthetic data. However the results are host dependent. On my > laptop a "run-once" click of merge record gives me two flowfiles of 100 > records, while the same flow on the production server produces several much > smaller flowfiles. This makes me think that something funny is happening with > my storage setup that I'll need to chase. > > I had tested the convertrecord option (simply avroreader->avrowriter) and it > did seem slow, but I'll investigate this further as it may be related to my > storage issue. > > > > On Thu, Dec 8, 2022 at 1:23 AM Mark Payne <[email protected] > <mailto:[email protected]>> wrote: > > Is there something about this structure that is likely to be causing the > > problem? Could there be other issues with the avro generated by the script? > > I don’t think the structure should matter. And as long as the avro produced > is proper Avro, I don’t think it should matter. Unless perhaps there’s some > issue with the Avro library itself that’s causing it to take a really long > time to parse the Avro or something? I’d be curious - if you take the output > of your script and then you run it through a ConvertRecord (Avro Reader -> > Json Writer) is the ConvertRecord fast? Or is it really slow to process it? > >> On Dec 5, 2022, at 5:58 AM, Richard Beare <[email protected] >> <mailto:[email protected]>> wrote: >> >> Further - I performed another test in which I replaced the custom json to >> avro script with a ConvertRecord processor - merge record appears to work as >> expected in that case. >> >> Output of convertrecord looks like this: >> >> [ { >> "text" : " No Alert Found \n\n", >> "metadata" : { >> "X_TIKA_Parsed_By" : null, >> "X_OCR_Applied" : null, >> "Content_Type" : null >> }, >> "success" : true, >> "timestamp" : "2022-12-05T10:49:18.568Z", >> "processingElapsedTime" : 0, >> "doc_id" : "5.60178607E8" >> } ] >> >> while the output of the script looks like: >> >> [ { >> "doc_id" : "5.61996505E8", >> "doc_text" : " No Alert Found \n\n", >> "processing_timestamp" : "2022-11-28T01:16:46.775Z", >> "metadata_x_ocr_applied" : true, >> "metadata_x_parsed_by" : >> "org.apache.tika.parser.DefaultParser;org.apache.tika.parser.microsoft.rtf.RTFParser;org.apache.tika.parser.AutoDetectParser", >> "metadata_content_type" : "application/rtf", >> "metadata_page_count" : null, >> "metadata_creation_date" : null, >> "metadata_last_modified" : null >> } ] >> >> Is there something about this structure that is likely to be causing the >> problem? Could there be other issues with the avro generated by the script? >> >> On Mon, Dec 5, 2022 at 9:31 PM Richard Beare <[email protected] >> <mailto:[email protected]>> wrote: >> I've reset the backpressure to the default >> >> This remains something of a mystery. The merge with synthetic data happily >> creates flowfiles with 100 records, and the join says "Records merged due >> to: Bin is full" or "Records merged due to: Bin is full enough". No timeouts >> in that case, even with the max bin age at 4.5 seconds. The resulting >> flowfiles were about 300K. >> >> The real data is doing much the same as before, producing flowfiles of about >> 30K, with 7 records or so. If I increase the maximum bin age to 30 seconds >> the size and record count is higher - 12 to 15. Nothing like the behaviour >> with synthetic data, where the 100 record flowfiles are created almost >> instantly. Joins are always due to bin age. >> >> Can the problem relate to the structure of the avro files? Any way to dive >> into that? Everything else about the mergerecord settings appear the same, >> so I can't see an explanation as to why the behaviour is different on the >> same hardware. >> >> >> >> >> >> >> >> >> On Mon, Dec 5, 2022 at 2:09 AM Mark Payne <[email protected] >> <mailto:[email protected]>> wrote: >> Hey Richard, >> >> So a few things that I’ve done/looked at. >> >> I generated some Avro data (random JSON that I downloaded from a Random JSON >> Generator and then converted to Avro). >> >> I then ran this avro data into both the MergeRecord processors. >> >> Firstly, I noticed that both are very slow. Found that was because Run >> Schedule was set to 5 seconds. This should *ALWAYS* be 0 secs for >> MergeRecord. And really for basically all processors except for the first >> one in the flow. >> >> I also notice that you have backpressure set on your connections to 40,000 >> FlowFiles and 4 GB. This can significantly slow things down. If you have >> performance concerns you definitely want backpressure set back to the >> default of 10,000 FlowFiles. Otherwise, as the queues fill up they start >> “swapping out” FlowFiles to disk, and this can significantly slow things >> down. >> >> I noticed that MergeRecord is set to 1 concurrent task. Probably worth >> considering increasing that, if performance is a concern. >> >> That said, I am seeing nice, full bins of 100 records merged from each of >> the MergeRecord processors. >> So it is certainly possible that if you’re seeing smaller bins it’s becuase >> you’re timing out. The 4.5 seconds timeout is quite short. Have you tried >> increasing that to say 30 seconds to see if it gives you larger bins? >> I also recommend that you take a look at the data provenance to see why it’s >> creating the bins. >> >> If unclear how to do that: >> Right-click on the MergeRecord processor >> Go to View data provenance >> Scroll down the list until you see a “JOIN” event type. You can ignore the >> ATTRIBUTES_MODIFIED and DROP events for now. >> Click the ‘i’ icon on the left-hand side. >> This will show you details about the merge. In the Details tab, if you >> scroll down, it will show you a Details field, which tells you why the data >> was merged. It should either say: "Records Merged due to: Bin has reached >> Max Bin Age” or “ Records Merged due to: Bin is full” >> >> If it is due to Max Bin Age reached, then I’d recommend increasing number of >> concurrent tasks, reducing backpressure to no more than 10,000 FlowFiles in >> the queue, and/or increasing the Max Bin Age. >> Also worth asking - what kind of machines is this running on? A 64 core VM >> with 1 TB volume will, of course, run MUCH differently than a 4 core VM with >> a 10 GB volume, especially in the cloud. >> >> If still having trouble, let me know what the provenance tells you about the >> reason for merging the data, and we can go from there. >> >> Thanks! >> -Mark >> >> >>> On Dec 3, 2022, at 4:38 PM, Mark Payne <[email protected] >>> <mailto:[email protected]>> wrote: >>> >>> Richard, >>> >>> I think just the flow structure shoudl be sufficient. >>> >>> Thanks >>> -Mark >>> >>> >>>> On Dec 3, 2022, at 4:32 PM, Richard Beare <[email protected] >>>> <mailto:[email protected]>> wrote: >>>> >>>> Thanks for responding, >>>> I re-tested with max bins = 2, but the behaviour remained the same. I can >>>> easily share a version of the functioning workflow (and data), which is >>>> part of a public project. The problem workflow (which shares many of the >>>> same components) is part of a health research project, so more difficult. >>>> I definitely can't share any data from that one. Do you need to see the >>>> data or is the overall structure sufficient at this point? Happy to >>>> demonstrate via video conference too. >>>> >>>> Thanks >>>> >>>> On Sun, Dec 4, 2022 at 1:37 AM Mark Payne <[email protected] >>>> <mailto:[email protected]>> wrote: >>>> Hi Richard, >>>> >>>> Can you try increasing the Maximum Number of Bins? I think there was an >>>> issue that was recently addressed in which the merge processors had an >>>> issue when Max Number of Bins = 1. >>>> >>>> If you still see the same issue, please provide a copy of the flow that >>>> can be used to replicate the issue. >>>> >>>> Thanks >>>> -Mark >>>> >>>> >>>>> On Dec 3, 2022, at 5:21 AM, Richard Beare <[email protected] >>>>> <mailto:[email protected]>> wrote: >>>>> >>>>> Hi, >>>>> >>>>> Pretty much the same - I seem to end up with flowfiles containing about 7 >>>>> records, presumably always triggered by the timeout. >>>>> >>>>> I had thought the timeout needed to be less than the run schedule, but it >>>>> looks like it can be the same. >>>>> >>>>> Here's a debug dump >>>>> >>>>> 10:13:43 UTC >>>>> DEBUG >>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1066297 >>>>> to RecordBin[size=4, full=false, isComplete=false, id=4021] >>>>> >>>>> 10:13:43 UTC >>>>> DEBUG >>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>>> id=1066297 to RecordBin[size=5, full=false, isComplete=false, id=4021] >>>>> >>>>> 10:13:44 UTC >>>>> DEBUG >>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID >>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} >>>>> for FlowFile[filename=9e9908f6-b28e-4615-b6c8-4bd163a3bc00] >>>>> >>>>> 10:13:44 UTC >>>>> DEBUG >>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1066372 >>>>> to RecordBin[size=5, full=false, isComplete=false, id=4021] >>>>> >>>>> 10:13:44 UTC >>>>> DEBUG >>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>>> id=1066372 to RecordBin[size=6, full=false, isComplete=false, id=4021] >>>>> >>>>> 10:13:45 UTC >>>>> DEBUG >>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>>> id=1066575 to RecordBin[size=7, full=false, isComplete=true, id=4021] >>>>> >>>>> 10:13:46 UTC >>>>> DEBUG >>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID >>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} >>>>> for FlowFile[filename=9e9908f6-b28e-4615-b6c8-4bd163a3bc00] >>>>> >>>>> 10:13:46 UTC >>>>> DEBUG >>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1066612 >>>>> to RecordBin[size=0, full=false, isComplete=false, id=4022] >>>>> >>>>> 10:13:46 UTC >>>>> DEBUG >>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Created OutputStream >>>>> using session StandardProcessSession[id=83204] for RecordBin[size=0, >>>>> full=false, isComplete=false, id=4022] >>>>> >>>>> 10:13:46 UTC >>>>> DEBUG >>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>>> id=1066612 to RecordBin[size=1, full=false, isComplete=false, id=4022] >>>>> >>>>> 10:13:48 UTC >>>>> DEBUG >>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1066896 >>>>> to RecordBin[size=2, full=false, isComplete=false, id=4022] >>>>> >>>>> 10:13:48 UTC >>>>> DEBUG >>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>>> id=1066896 to RecordBin[size=3, full=false, isComplete=false, id=4022] >>>>> >>>>> 10:13:49 UTC >>>>> DEBUG >>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID >>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} >>>>> for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e] >>>>> >>>>> 10:13:49 UTC >>>>> DEBUG >>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1067051 >>>>> to RecordBin[size=3, full=false, isComplete=false, id=4022] >>>>> >>>>> 10:13:49 UTC >>>>> DEBUG >>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>>> id=1067051 to RecordBin[size=4, full=false, isComplete=false, id=4022] >>>>> >>>>> 10:13:52 UTC >>>>> DEBUG >>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>>> id=1067254 to RecordBin[size=7, full=false, isComplete=true, id=4022] >>>>> >>>>> 10:13:53 UTC >>>>> DEBUG >>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID >>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} >>>>> for FlowFile[filename=b7f4498d-647a-46d1-ad9f-badaed8591f8] >>>>> >>>>> 10:13:53 UTC >>>>> DEBUG >>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1067395 >>>>> to RecordBin[size=0, full=false, isComplete=false, id=4023] >>>>> >>>>> 10:13:53 UTC >>>>> DEBUG >>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Created OutputStream >>>>> using session StandardProcessSession[id=83205] for RecordBin[size=0, >>>>> full=false, isComplete=false, id=4023] >>>>> >>>>> 10:13:53 UTC >>>>> DEBUG >>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>>> id=1067395 to RecordBin[size=1, full=false, isComplete=false, id=4023] >>>>> >>>>> 10:13:54 UTC >>>>> DEBUG >>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1068472 >>>>> to RecordBin[size=1, full=false, isComplete=false, id=4023] >>>>> >>>>> 10:13:54 UTC >>>>> DEBUG >>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>>> id=1068472 to RecordBin[size=2, full=false, isComplete=false, id=4023] >>>>> >>>>> 10:13:55 UTC >>>>> DEBUG >>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID >>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} >>>>> for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e] >>>>> >>>>> 10:13:55 UTC >>>>> DEBUG >>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1068597 >>>>> to RecordBin[size=2, full=false, isComplete=false, id=4023] >>>>> >>>>> 10:13:55 UTC >>>>> DEBUG >>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>>> id=1068597 to RecordBin[size=3, full=false, isComplete=false, id=4023] >>>>> >>>>> 10:13:58 UTC >>>>> DEBUG >>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] RecordBin[size=6, >>>>> full=false, isComplete=false, id=4023] is now expired. Completing bin. >>>>> >>>>> 10:13:58 UTC >>>>> DEBUG >>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Marked >>>>> RecordBin[size=6, full=false, isComplete=true, id=4023] as complete >>>>> because complete() was called >>>>> >>>>> 10:13:58 UTC >>>>> DEBUG >>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Closed Record Writer >>>>> using session StandardProcessSession[id=83205] for RecordBin[size=6, >>>>> full=false, isComplete=true, id=4023] >>>>> >>>>> 10:13:58 UTC >>>>> DEBUG >>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Completed bin >>>>> RecordBin[size=6, full=false, isComplete=true, id=4023] with 6 records >>>>> with Merged FlowFile >>>>> FlowFile[filename=6824b503-82b9-444e-a77e-9b081e878948] using input >>>>> FlowFiles [id=1067395, id=1068472, id=1068597, id=1068663, id=1068800, >>>>> id=1068845] >>>>> >>>>> 10:13:58 UTC >>>>> DEBUG >>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>>> id=1068845 to RecordBin[size=6, full=false, isComplete=true, id=4023] >>>>> >>>>> 10:14:01 UTC >>>>> DEBUG >>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1069272 >>>>> to RecordBin[size=2, full=false, isComplete=false, id=4024] >>>>> >>>>> 10:14:01 UTC >>>>> DEBUG >>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>>> id=1069272 to RecordBin[size=3, full=false, isComplete=false, id=4024] >>>>> >>>>> 10:14:02 UTC >>>>> DEBUG >>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID >>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} >>>>> for FlowFile[filename=b7f4498d-647a-46d1-ad9f-badaed8591f8] >>>>> >>>>> 10:14:02 UTC >>>>> DEBUG >>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1069316 >>>>> to RecordBin[size=3, full=false, isComplete=false, id=4024] >>>>> >>>>> 10:14:02 UTC >>>>> DEBUG >>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>>> id=1069316 to RecordBin[size=4, full=false, isComplete=false, id=4024] >>>>> >>>>> 10:14:05 UTC >>>>> DEBUG >>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] RecordBin[size=6, >>>>> full=false, isComplete=false, id=4024] is now expired. Completing bin. >>>>> >>>>> 10:14:05 UTC >>>>> DEBUG >>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Marked >>>>> RecordBin[size=6, full=false, isComplete=true, id=4024] as complete >>>>> because complete() was called >>>>> >>>>> 10:14:05 UTC >>>>> DEBUG >>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Closed Record Writer >>>>> using session StandardProcessSession[id=83206] for RecordBin[size=6, >>>>> full=false, isComplete=true, id=4024] >>>>> >>>>> 10:14:05 UTC >>>>> DEBUG >>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Completed bin >>>>> RecordBin[size=6, full=false, isComplete=true, id=4024] with 6 records >>>>> with Merged FlowFile >>>>> FlowFile[filename=6c13e518-655b-4507-ad6c-d37f6b9c0a5d] using input >>>>> FlowFiles [id=1069044, id=1069103, id=1069272, id=1069316, id=1069451, >>>>> id=1069492] >>>>> >>>>> 10:14:05 UTC >>>>> DEBUG >>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>>> id=1069492 to RecordBin[size=6, full=false, isComplete=true, id=4024] >>>>> >>>>> 10:14:07 UTC >>>>> DEBUG >>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1072118 >>>>> to RecordBin[size=2, full=false, isComplete=false, id=4025] >>>>> >>>>> 10:14:07 UTC >>>>> DEBUG >>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>>> id=1072118 to RecordBin[size=3, full=false, isComplete=false, id=4025] >>>>> >>>>> 10:14:08 UTC >>>>> DEBUG >>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID >>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} >>>>> for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e] >>>>> >>>>> 10:14:08 UTC >>>>> DEBUG >>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1072197 >>>>> to RecordBin[size=3, full=false, isComplete=false, id=4025] >>>>> >>>>> 10:14:08 UTC >>>>> DEBUG >>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>>> id=1072197 to RecordBin[size=4, full=false, isComplete=false, id=4025] >>>>> >>>>> >>>>> On Sat, Dec 3, 2022 at 4:21 PM Joe Witt <[email protected] >>>>> <mailto:[email protected]>> wrote: >>>>> Hello >>>>> >>>>> Run schedule should be 0. >>>>> >>>>> 50 should be the min number of records >>>>> >>>>> 5 seconds is the max bin age it sounds like you want. >>>>> >>>>> Start with these changes and let us know what youre seeing. >>>>> >>>>> Thanks >>>>> >>>>> On Fri, Dec 2, 2022 at 10:12 PM Richard Beare <[email protected] >>>>> <mailto:[email protected]>> wrote: >>>>> Hi, >>>>> I'm having a great deal of trouble configuring the mergerecord processor >>>>> to deliver reasonable performance and I'm not sure where to look to >>>>> correct it. One of my upstream processors requires a single record per >>>>> flowfile, but I'd like to create larger flowfiles before passing to the >>>>> next stage. The flowfiles are independent at this stage so there's no >>>>> special processing required of the merging. I'd like to create flowfiles >>>>> of about 50 to 100 records. >>>>> >>>>> I have two tests, both running on the same nifi system. One uses >>>>> synthetic data, the other the production data. The performance of the >>>>> mergerecord processor for the synthetic data is as I'd expect, and I >>>>> can't figure out why the production data is so much slower. Here's the >>>>> configuration: >>>>> >>>>> mergerecord has the following settings. Timer driven, 1 concurrent task, >>>>> 5 second run schedule, bin packing merge strategy, min records = 1, max >>>>> records = 100, max bin age = 4.5 secs, maximum number of bins = 1. >>>>> >>>>> In the case of synthetic data the typical flowfile size is in the range 2 >>>>> to 7KB. >>>>> >>>>> The size of flowfiles for the production case is smaller - typically >>>>> around 1KB. >>>>> >>>>> The structure in the tests is slightly different. Synthetic is (note that >>>>> I've removed the text part): >>>>> >>>>> [ { >>>>> "sampleid" : 1075, >>>>> "typeid" : 98, >>>>> "dct" : "2020-01-25T21:40:25.515Z", >>>>> "filename" : "__tmp/txt/mtsamples-type-98-sample-1075.txt", >>>>> "document" : "Text removed - typically a few hundred words", >>>>> "docid" : "9" >>>>> } ] >>>>> >>>>> Production is: >>>>> [ { >>>>> "doc_id" : "5.60622895E8", >>>>> "doc_text" : " Text deleted - typically a few hundred words", >>>>> "processing_timestamp" : "2022-11-27T23:56:35.601Z", >>>>> "metadata_x_ocr_applied" : true, >>>>> "metadata_x_parsed_by" : >>>>> "org.apache.tika.parser.DefaultParser;org.apache.tika.parser.microsoft.rtf.RTFParser;org.apache.tika.parser.AutoDetectParser", >>>>> "metadata_content_type" : "application/rtf", >>>>> "metadata_page_count" : null, >>>>> "metadata_creation_date" : null, >>>>> "metadata_last_modified" : null >>>>> } ] >>>>> >>>>> >>>>> >>>>> I load up the queue feeding the mergerecord processor with several >>>>> hundred individual flowfiles and activate it. >>>>> >>>>> The synthetic data is nicely placed into chunks of 100, with any >>>>> remainder being flushed in a smaller chunk. >>>>> >>>>> The production data is generally bundled into groups of 6 records, >>>>> sometimes less. Certainly it never gets close to 100 records. >>>>> >>>>> Any ideas as to what I should look at to track down the difference? >>>>> >>>>> Thanks >>>> >>> >> >
