Thanks - I'll have a look at that. It is helpfully to get guidance like this when the system is so large.
On Wed, Dec 21, 2022 at 5:30 AM Matt Burgess <[email protected]> wrote: > Thanks Vijay! I agree those processors should do the trick but there > were things in the transformation between input and desired output > that I wasn't sure of their origin. If you are setting constants you > can use either a Shift or Default spec, if you are moving fields > around you can use a Shift spec, and in general whether you end up > with one spec or multiple, I find it's easiest to use a Chain spec (an > array of specs) in the processor configuration. You can play around > with the spec(s) at the Jolt playground [1] > > An important difference to note between JoltTransformJSON and > JoltTransformRecord is that for the former, the spec is applied to the > entire input (and it is entirely read into memory) where in > JoltTransformRecord the spec is applied to each record. > > Regards, > Matt > > [1] http://jolt-demo.appspot.com/#inception > > On Tue, Dec 20, 2022 at 10:52 AM Vijay Chhipa <[email protected]> wrote: > > > > Hi Richard > > Have you tried JoltTransformJSON or JoltTransformRecord > > > > I believe you should be able to do this > > > > Quick start here: > https://community.cloudera.com/t5/Community-Articles/Jolt-quick-reference-for-Nifi-Jolt-Processors/ta-p/244350 > > > > > > On Dec 20, 2022, at 4:13 AM, Richard Beare <[email protected]> > wrote: > > > > Hi Everyone, > > Still struggling to fix this issue and may need to try some different > things. > > > > What is the recommended way of transforming a record structure? At the > moment I have a groovy script doing this but the downstream processing is > very slow, as discussed in the preceding thread. > > > > The transformation is very simple - starting structure is: > > > > { > > "result" : { > > "text" : " document text", > > "metadata" : { > > "X-TIKA:Parsed-By": [ > > "org.apache.tika.parser.pdf.PDFParser" > > ], > > "X-OCR-Applied" : true, > > "dcterms:created": "2018;07-24T15:04:51Z", > > "Content-Type" : "application/pdf", > > "Page-Count" : 2, > > }, > > "success" : true, > > "timestamp" : "2022-12-20T09:02:27.902Z", > > "processingElapsedTime" : 6 > > } > > } > > > > > > final structure is': > > > > [ { > > "doc_id" : 58, > > "doc_text" : " ", > > "processing_timestamp" : "2022-12-20T09:02:27.902Z", > > "metadata_x_ocr_applies" : true, > > "metadata_x_parsed_by" : "org.apache.tika.parser.pdf.PDFParser", > > "metadata_content_type" : "application/pdf", > > "metadata_page_count" : 1 > > "metadata_creation_date": null, > > "metadata_last_modified: nill > > }] > > > > So a kind of flattening of the structure. Is there a processor I should > be using to do this instead of a groovy script? > > > > Thanks > > > > On Wed, Dec 14, 2022 at 7:57 AM Richard Beare <[email protected]> > wrote: > >> > >> Any thoughts on this? Are there some extra steps required when creating > an avro file from a user defined schema? > >> > >> On Thu, Dec 8, 2022 at 2:56 PM Richard Beare <[email protected]> > wrote: > >>> > >>> Here's another result that I think suggests there's something wrong > with the avro files created by the groovy script, although I can't see what > the problem might be. > >>> > >>> The test is as follows. Output of the groovy script creating avro > files is passed to convertrecord, configured with an avro reader and json > writer. This is slow. The json output is then converted back to avro with > another convertrecord processor, configured with a jsontreereader and an > avro writer - this is fast, instantly emptying the queue. The result of > that is fed into the previously problematic merge processor which works > exactly as expected, producing flowfiles with 100 records each. > >>> > >>> The difference I can see between the two flow files is the way in > which the schema is specified. Perhaps some extras are required in the > groovy file to set that up? > >>> > >>> The slow one has: > >>> > >>> {"type":"record","name":"document", "fields":[{ > >>> > >>> The fast one > >>> > {"type":"record","name":"nifiRecord","namespace":"org.apache.nifi","fields": > >>> > >>> > >>> Initial characters are also slightly different. > >>> Slow one: > >>> > >>> 0000000 O b j 001 002 026 a v r o . s c h e m > >>> 0000020 a 346 \n { " t y p e " : " r e c o > >>> > >>> Fast one > >>> > >>> 0000000 O b j 001 004 026 a v r o . s c h e m > >>> 0000020 a 362 \b { " t y p e " : " r e c o > >>> > >>> > >>> The groovy script is > >>> CogStack-NiFi/parse-tika-result-json-to-avro.groovy at master · > CogStack/CogStack-NiFi · GitHub > >>> > >>> The schema is > >>> CogStack-NiFi/document.avsc at master · CogStack/CogStack-NiFi · GitHub > >>> > >>> > >>> On Thu, Dec 8, 2022 at 1:59 PM Richard Beare <[email protected]> > wrote: > >>>> > >>>> I'm diving into the convertrecord tests a bit deeper on the > production server. > >>>> > >>>> The first test case - 259 documents, total of 1M when in avro format > in the input queue to the convert record processor. These avro files were > not created by the groovy script - they start life as a database query and > the text field is in one of the columns. The convertrecord processor runs > very quickly - click start, press refresh and it is done. The avro ends up > like this: > >>>> > >>>> [ { > >>>> "sampleid" : 1075, > >>>> "typeid" : 98, > >>>> "dct" : "2020-01-25T21:40:25.515Z", > >>>> "filename" : "__tmp/txt/mtsamples-type-98-sample-1075.txt", > >>>> "document" : "Text removed", > >>>> "docid" : "9" > >>>> } ] > >>>> > >>>> In the second test, where the text fields are extracted from pdf tika > before avro files are created by the groovy script (from the tika json > output), the total queue size for the 259 documents is larger - 1.77MB, and > the performance is very different - press start, click refresh and only two > flowfiles are processed. > >>>> > >>>> [ { > >>>> "doc_id" : "70", > >>>> "doc_text" : "text removed", > >>>> "processing_timestamp" : "2022-12-07T23:09:52.354Z", > >>>> "metadata_x_ocr_applied" : true, > >>>> "metadata_x_parsed_by" : "org.apache.tika.parser.pdf.PDFParser", > >>>> "metadata_content_type" : "application/pdf", > >>>> "metadata_page_count" : 1, > >>>> "metadata_creation_date" : null, > >>>> "metadata_last_modified" : null > >>>> } ] > >>>> > >>>> I've noticed that the second one has a content.type attribute of > 'application/json' which doesn't seem right and doesn't match the fast > case. I'll see what happens if I change that. > >>>> > >>>> On Thu, Dec 8, 2022 at 9:39 AM Richard Beare <[email protected]> > wrote: > >>>>> > >>>>> Hi All, > >>>>> Some progress on debugging options. I've found a flow that exhibits > the problem using synthetic data. However the results are host dependent. > On my laptop a "run-once" click of merge record gives me two flowfiles of > 100 records, while the same flow on the production server produces several > much smaller flowfiles. This makes me think that something funny is > happening with my storage setup that I'll need to chase. > >>>>> > >>>>> I had tested the convertrecord option (simply > avroreader->avrowriter) and it did seem slow, but I'll investigate this > further as it may be related to my storage issue. > >>>>> > >>>>> > >>>>> > >>>>> On Thu, Dec 8, 2022 at 1:23 AM Mark Payne <[email protected]> > wrote: > >>>>>> > >>>>>> > Is there something about this structure that is likely to be > causing the problem? Could there be other issues with the avro generated by > the script? > >>>>>> > >>>>>> I don’t think the structure should matter. And as long as the avro > produced is proper Avro, I don’t think it should matter. Unless perhaps > there’s some issue with the Avro library itself that’s causing it to take a > really long time to parse the Avro or something? I’d be curious - if you > take the output of your script and then you run it through a ConvertRecord > (Avro Reader -> Json Writer) is the ConvertRecord fast? Or is it really > slow to process it? > >>>>>> > >>>>>> On Dec 5, 2022, at 5:58 AM, Richard Beare <[email protected]> > wrote: > >>>>>> > >>>>>> Further - I performed another test in which I replaced the custom > json to avro script with a ConvertRecord processor - merge record appears > to work as expected in that case. > >>>>>> > >>>>>> Output of convertrecord looks like this: > >>>>>> > >>>>>> [ { > >>>>>> "text" : " No Alert Found \n\n", > >>>>>> "metadata" : { > >>>>>> "X_TIKA_Parsed_By" : null, > >>>>>> "X_OCR_Applied" : null, > >>>>>> "Content_Type" : null > >>>>>> }, > >>>>>> "success" : true, > >>>>>> "timestamp" : "2022-12-05T10:49:18.568Z", > >>>>>> "processingElapsedTime" : 0, > >>>>>> "doc_id" : "5.60178607E8" > >>>>>> } ] > >>>>>> > >>>>>> while the output of the script looks like: > >>>>>> > >>>>>> [ { > >>>>>> "doc_id" : "5.61996505E8", > >>>>>> "doc_text" : " No Alert Found \n\n", > >>>>>> "processing_timestamp" : "2022-11-28T01:16:46.775Z", > >>>>>> "metadata_x_ocr_applied" : true, > >>>>>> "metadata_x_parsed_by" : > "org.apache.tika.parser.DefaultParser;org.apache.tika.parser.microsoft.rtf.RTFParser;org.apache.tika.parser.AutoDetectParser", > >>>>>> "metadata_content_type" : "application/rtf", > >>>>>> "metadata_page_count" : null, > >>>>>> "metadata_creation_date" : null, > >>>>>> "metadata_last_modified" : null > >>>>>> } ] > >>>>>> > >>>>>> Is there something about this structure that is likely to be > causing the problem? Could there be other issues with the avro generated by > the script? > >>>>>> > >>>>>> On Mon, Dec 5, 2022 at 9:31 PM Richard Beare < > [email protected]> wrote: > >>>>>>> > >>>>>>> I've reset the backpressure to the default > >>>>>>> > >>>>>>> This remains something of a mystery. The merge with synthetic data > happily creates flowfiles with 100 records, and the join says "Records > merged due to: Bin is full" or "Records merged due to: Bin is full enough". > No timeouts in that case, even with the max bin age at 4.5 seconds. The > resulting flowfiles were about 300K. > >>>>>>> > >>>>>>> The real data is doing much the same as before, producing > flowfiles of about 30K, with 7 records or so. If I increase the maximum bin > age to 30 seconds the size and record count is higher - 12 to 15. Nothing > like the behaviour with synthetic data, where the 100 record flowfiles are > created almost instantly. Joins are always due to bin age. > >>>>>>> > >>>>>>> Can the problem relate to the structure of the avro files? Any way > to dive into that? Everything else about the mergerecord settings appear > the same, so I can't see an explanation as to why the behaviour is > different on the same hardware. > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> On Mon, Dec 5, 2022 at 2:09 AM Mark Payne <[email protected]> > wrote: > >>>>>>>> > >>>>>>>> Hey Richard, > >>>>>>>> > >>>>>>>> So a few things that I’ve done/looked at. > >>>>>>>> > >>>>>>>> I generated some Avro data (random JSON that I downloaded from a > Random JSON Generator and then converted to Avro). > >>>>>>>> > >>>>>>>> I then ran this avro data into both the MergeRecord processors. > >>>>>>>> > >>>>>>>> Firstly, I noticed that both are very slow. Found that was > because Run Schedule was set to 5 seconds. This should *ALWAYS* be 0 secs > for MergeRecord. And really for basically all processors except for the > first one in the flow. > >>>>>>>> > >>>>>>>> I also notice that you have backpressure set on your connections > to 40,000 FlowFiles and 4 GB. This can significantly slow things down. If > you have performance concerns you definitely want backpressure set back to > the default of 10,000 FlowFiles. Otherwise, as the queues fill up they > start “swapping out” FlowFiles to disk, and this can significantly slow > things down. > >>>>>>>> > >>>>>>>> I noticed that MergeRecord is set to 1 concurrent task. Probably > worth considering increasing that, if performance is a concern. > >>>>>>>> > >>>>>>>> That said, I am seeing nice, full bins of 100 records merged from > each of the MergeRecord processors. > >>>>>>>> So it is certainly possible that if you’re seeing smaller bins > it’s becuase you’re timing out. The 4.5 seconds timeout is quite short. > Have you tried increasing that to say 30 seconds to see if it gives you > larger bins? > >>>>>>>> I also recommend that you take a look at the data provenance to > see why it’s creating the bins. > >>>>>>>> > >>>>>>>> If unclear how to do that: > >>>>>>>> Right-click on the MergeRecord processor > >>>>>>>> Go to View data provenance > >>>>>>>> Scroll down the list until you see a “JOIN” event type. You can > ignore the ATTRIBUTES_MODIFIED and DROP events for now. > >>>>>>>> Click the ‘i’ icon on the left-hand side. > >>>>>>>> This will show you details about the merge. In the Details tab, > if you scroll down, it will show you a Details field, which tells you why > the data was merged. It should either say: "Records Merged due to: Bin has > reached Max Bin Age” or “ Records Merged due to: Bin is full” > >>>>>>>> > >>>>>>>> If it is due to Max Bin Age reached, then I’d recommend > increasing number of concurrent tasks, reducing backpressure to no more > than 10,000 FlowFiles in the queue, and/or increasing the Max Bin Age. > >>>>>>>> Also worth asking - what kind of machines is this running on? A > 64 core VM with 1 TB volume will, of course, run MUCH differently than a 4 > core VM with a 10 GB volume, especially in the cloud. > >>>>>>>> > >>>>>>>> If still having trouble, let me know what the provenance tells > you about the reason for merging the data, and we can go from there. > >>>>>>>> > >>>>>>>> Thanks! > >>>>>>>> -Mark > >>>>>>>> > >>>>>>>> > >>>>>>>> On Dec 3, 2022, at 4:38 PM, Mark Payne <[email protected]> > wrote: > >>>>>>>> > >>>>>>>> Richard, > >>>>>>>> > >>>>>>>> I think just the flow structure shoudl be sufficient. > >>>>>>>> > >>>>>>>> Thanks > >>>>>>>> -Mark > >>>>>>>> > >>>>>>>> > >>>>>>>> On Dec 3, 2022, at 4:32 PM, Richard Beare < > [email protected]> wrote: > >>>>>>>> > >>>>>>>> Thanks for responding, > >>>>>>>> I re-tested with max bins = 2, but the behaviour remained the > same. I can easily share a version of the functioning workflow (and data), > which is part of a public project. The problem workflow (which shares many > of the same components) is part of a health research project, so more > difficult. I definitely can't share any data from that one. Do you need to > see the data or is the overall structure sufficient at this point? Happy to > demonstrate via video conference too. > >>>>>>>> > >>>>>>>> Thanks > >>>>>>>> > >>>>>>>> On Sun, Dec 4, 2022 at 1:37 AM Mark Payne <[email protected]> > wrote: > >>>>>>>>> > >>>>>>>>> Hi Richard, > >>>>>>>>> > >>>>>>>>> Can you try increasing the Maximum Number of Bins? I think there > was an issue that was recently addressed in which the merge processors had > an issue when Max Number of Bins = 1. > >>>>>>>>> > >>>>>>>>> If you still see the same issue, please provide a copy of the > flow that can be used to replicate the issue. > >>>>>>>>> > >>>>>>>>> Thanks > >>>>>>>>> -Mark > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Dec 3, 2022, at 5:21 AM, Richard Beare < > [email protected]> wrote: > >>>>>>>>> > >>>>>>>>> Hi, > >>>>>>>>> > >>>>>>>>> Pretty much the same - I seem to end up with flowfiles > containing about 7 records, presumably always triggered by the timeout. > >>>>>>>>> > >>>>>>>>> I had thought the timeout needed to be less than the run > schedule, but it looks like it can be the same. > >>>>>>>>> > >>>>>>>>> Here's a debug dump > >>>>>>>>> > >>>>>>>>> 10:13:43 UTC > >>>>>>>>> DEBUG > >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > >>>>>>>>> > >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating > id=1066297 to RecordBin[size=4, full=false, isComplete=false, id=4021] > >>>>>>>>> > >>>>>>>>> 10:13:43 UTC > >>>>>>>>> DEBUG > >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > >>>>>>>>> > >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred > id=1066297 to RecordBin[size=5, full=false, isComplete=false, id=4021] > >>>>>>>>> > >>>>>>>>> 10:13:44 UTC > >>>>>>>>> DEBUG > >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > >>>>>>>>> > >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group > ID > {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} > for FlowFile[filename=9e9908f6-b28e-4615-b6c8-4bd163a3bc00] > >>>>>>>>> > >>>>>>>>> 10:13:44 UTC > >>>>>>>>> DEBUG > >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > >>>>>>>>> > >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating > id=1066372 to RecordBin[size=5, full=false, isComplete=false, id=4021] > >>>>>>>>> > >>>>>>>>> 10:13:44 UTC > >>>>>>>>> DEBUG > >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > >>>>>>>>> > >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred > id=1066372 to RecordBin[size=6, full=false, isComplete=false, id=4021] > >>>>>>>>> > >>>>>>>>> 10:13:45 UTC > >>>>>>>>> DEBUG > >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > >>>>>>>>> > >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred > id=1066575 to RecordBin[size=7, full=false, isComplete=true, id=4021] > >>>>>>>>> > >>>>>>>>> 10:13:46 UTC > >>>>>>>>> DEBUG > >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > >>>>>>>>> > >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group > ID > {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} > for FlowFile[filename=9e9908f6-b28e-4615-b6c8-4bd163a3bc00] > >>>>>>>>> > >>>>>>>>> 10:13:46 UTC > >>>>>>>>> DEBUG > >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > >>>>>>>>> > >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating > id=1066612 to RecordBin[size=0, full=false, isComplete=false, id=4022] > >>>>>>>>> > >>>>>>>>> 10:13:46 UTC > >>>>>>>>> DEBUG > >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > >>>>>>>>> > >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Created > OutputStream using session StandardProcessSession[id=83204] for > RecordBin[size=0, full=false, isComplete=false, id=4022] > >>>>>>>>> > >>>>>>>>> 10:13:46 UTC > >>>>>>>>> DEBUG > >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > >>>>>>>>> > >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred > id=1066612 to RecordBin[size=1, full=false, isComplete=false, id=4022] > >>>>>>>>> > >>>>>>>>> 10:13:48 UTC > >>>>>>>>> DEBUG > >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > >>>>>>>>> > >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating > id=1066896 to RecordBin[size=2, full=false, isComplete=false, id=4022] > >>>>>>>>> > >>>>>>>>> 10:13:48 UTC > >>>>>>>>> DEBUG > >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > >>>>>>>>> > >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred > id=1066896 to RecordBin[size=3, full=false, isComplete=false, id=4022] > >>>>>>>>> > >>>>>>>>> 10:13:49 UTC > >>>>>>>>> DEBUG > >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > >>>>>>>>> > >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group > ID > {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} > for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e] > >>>>>>>>> > >>>>>>>>> 10:13:49 UTC > >>>>>>>>> DEBUG > >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > >>>>>>>>> > >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating > id=1067051 to RecordBin[size=3, full=false, isComplete=false, id=4022] > >>>>>>>>> > >>>>>>>>> 10:13:49 UTC > >>>>>>>>> DEBUG > >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > >>>>>>>>> > >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred > id=1067051 to RecordBin[size=4, full=false, isComplete=false, id=4022] > >>>>>>>>> > >>>>>>>>> 10:13:52 UTC > >>>>>>>>> DEBUG > >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > >>>>>>>>> > >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred > id=1067254 to RecordBin[size=7, full=false, isComplete=true, id=4022] > >>>>>>>>> > >>>>>>>>> 10:13:53 UTC > >>>>>>>>> DEBUG > >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > >>>>>>>>> > >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group > ID > {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} > for FlowFile[filename=b7f4498d-647a-46d1-ad9f-badaed8591f8] > >>>>>>>>> > >>>>>>>>> 10:13:53 UTC > >>>>>>>>> DEBUG > >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > >>>>>>>>> > >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating > id=1067395 to RecordBin[size=0, full=false, isComplete=false, id=4023] > >>>>>>>>> > >>>>>>>>> 10:13:53 UTC > >>>>>>>>> DEBUG > >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > >>>>>>>>> > >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Created > OutputStream using session StandardProcessSession[id=83205] for > RecordBin[size=0, full=false, isComplete=false, id=4023] > >>>>>>>>> > >>>>>>>>> 10:13:53 UTC > >>>>>>>>> DEBUG > >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > >>>>>>>>> > >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred > id=1067395 to RecordBin[size=1, full=false, isComplete=false, id=4023] > >>>>>>>>> > >>>>>>>>> 10:13:54 UTC > >>>>>>>>> DEBUG > >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > >>>>>>>>> > >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating > id=1068472 to RecordBin[size=1, full=false, isComplete=false, id=4023] > >>>>>>>>> > >>>>>>>>> 10:13:54 UTC > >>>>>>>>> DEBUG > >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > >>>>>>>>> > >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred > id=1068472 to RecordBin[size=2, full=false, isComplete=false, id=4023] > >>>>>>>>> > >>>>>>>>> 10:13:55 UTC > >>>>>>>>> DEBUG > >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > >>>>>>>>> > >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group > ID > {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} > for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e] > >>>>>>>>> > >>>>>>>>> 10:13:55 UTC > >>>>>>>>> DEBUG > >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > >>>>>>>>> > >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating > id=1068597 to RecordBin[size=2, full=false, isComplete=false, id=4023] > >>>>>>>>> > >>>>>>>>> 10:13:55 UTC > >>>>>>>>> DEBUG > >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > >>>>>>>>> > >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred > id=1068597 to RecordBin[size=3, full=false, isComplete=false, id=4023] > >>>>>>>>> > >>>>>>>>> 10:13:58 UTC > >>>>>>>>> DEBUG > >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > >>>>>>>>> > >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] > RecordBin[size=6, full=false, isComplete=false, id=4023] is now expired. > Completing bin. > >>>>>>>>> > >>>>>>>>> 10:13:58 UTC > >>>>>>>>> DEBUG > >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > >>>>>>>>> > >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Marked > RecordBin[size=6, full=false, isComplete=true, id=4023] as complete because > complete() was called > >>>>>>>>> > >>>>>>>>> 10:13:58 UTC > >>>>>>>>> DEBUG > >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > >>>>>>>>> > >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Closed > Record Writer using session StandardProcessSession[id=83205] for > RecordBin[size=6, full=false, isComplete=true, id=4023] > >>>>>>>>> > >>>>>>>>> 10:13:58 UTC > >>>>>>>>> DEBUG > >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > >>>>>>>>> > >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Completed > bin RecordBin[size=6, full=false, isComplete=true, id=4023] with 6 records > with Merged FlowFile > FlowFile[filename=6824b503-82b9-444e-a77e-9b081e878948] using input > FlowFiles [id=1067395, id=1068472, id=1068597, id=1068663, id=1068800, > id=1068845] > >>>>>>>>> > >>>>>>>>> 10:13:58 UTC > >>>>>>>>> DEBUG > >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > >>>>>>>>> > >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred > id=1068845 to RecordBin[size=6, full=false, isComplete=true, id=4023] > >>>>>>>>> > >>>>>>>>> 10:14:01 UTC > >>>>>>>>> DEBUG > >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > >>>>>>>>> > >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating > id=1069272 to RecordBin[size=2, full=false, isComplete=false, id=4024] > >>>>>>>>> > >>>>>>>>> 10:14:01 UTC > >>>>>>>>> DEBUG > >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > >>>>>>>>> > >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred > id=1069272 to RecordBin[size=3, full=false, isComplete=false, id=4024] > >>>>>>>>> > >>>>>>>>> 10:14:02 UTC > >>>>>>>>> DEBUG > >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > >>>>>>>>> > >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group > ID > {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} > for FlowFile[filename=b7f4498d-647a-46d1-ad9f-badaed8591f8] > >>>>>>>>> > >>>>>>>>> 10:14:02 UTC > >>>>>>>>> DEBUG > >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > >>>>>>>>> > >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating > id=1069316 to RecordBin[size=3, full=false, isComplete=false, id=4024] > >>>>>>>>> > >>>>>>>>> 10:14:02 UTC > >>>>>>>>> DEBUG > >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > >>>>>>>>> > >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred > id=1069316 to RecordBin[size=4, full=false, isComplete=false, id=4024] > >>>>>>>>> > >>>>>>>>> 10:14:05 UTC > >>>>>>>>> DEBUG > >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > >>>>>>>>> > >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] > RecordBin[size=6, full=false, isComplete=false, id=4024] is now expired. > Completing bin. > >>>>>>>>> > >>>>>>>>> 10:14:05 UTC > >>>>>>>>> DEBUG > >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > >>>>>>>>> > >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Marked > RecordBin[size=6, full=false, isComplete=true, id=4024] as complete because > complete() was called > >>>>>>>>> > >>>>>>>>> 10:14:05 UTC > >>>>>>>>> DEBUG > >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > >>>>>>>>> > >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Closed > Record Writer using session StandardProcessSession[id=83206] for > RecordBin[size=6, full=false, isComplete=true, id=4024] > >>>>>>>>> > >>>>>>>>> 10:14:05 UTC > >>>>>>>>> DEBUG > >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > >>>>>>>>> > >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Completed > bin RecordBin[size=6, full=false, isComplete=true, id=4024] with 6 records > with Merged FlowFile > FlowFile[filename=6c13e518-655b-4507-ad6c-d37f6b9c0a5d] using input > FlowFiles [id=1069044, id=1069103, id=1069272, id=1069316, id=1069451, > id=1069492] > >>>>>>>>> > >>>>>>>>> 10:14:05 UTC > >>>>>>>>> DEBUG > >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > >>>>>>>>> > >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred > id=1069492 to RecordBin[size=6, full=false, isComplete=true, id=4024] > >>>>>>>>> > >>>>>>>>> 10:14:07 UTC > >>>>>>>>> DEBUG > >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > >>>>>>>>> > >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating > id=1072118 to RecordBin[size=2, full=false, isComplete=false, id=4025] > >>>>>>>>> > >>>>>>>>> 10:14:07 UTC > >>>>>>>>> DEBUG > >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > >>>>>>>>> > >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred > id=1072118 to RecordBin[size=3, full=false, isComplete=false, id=4025] > >>>>>>>>> > >>>>>>>>> 10:14:08 UTC > >>>>>>>>> DEBUG > >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > >>>>>>>>> > >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group > ID > {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} > for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e] > >>>>>>>>> > >>>>>>>>> 10:14:08 UTC > >>>>>>>>> DEBUG > >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > >>>>>>>>> > >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating > id=1072197 to RecordBin[size=3, full=false, isComplete=false, id=4025] > >>>>>>>>> > >>>>>>>>> 10:14:08 UTC > >>>>>>>>> DEBUG > >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > >>>>>>>>> > >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred > id=1072197 to RecordBin[size=4, full=false, isComplete=false, id=4025] > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Sat, Dec 3, 2022 at 4:21 PM Joe Witt <[email protected]> > wrote: > >>>>>>>>>> > >>>>>>>>>> Hello > >>>>>>>>>> > >>>>>>>>>> Run schedule should be 0. > >>>>>>>>>> > >>>>>>>>>> 50 should be the min number of records > >>>>>>>>>> > >>>>>>>>>> 5 seconds is the max bin age it sounds like you want. > >>>>>>>>>> > >>>>>>>>>> Start with these changes and let us know what youre seeing. > >>>>>>>>>> > >>>>>>>>>> Thanks > >>>>>>>>>> > >>>>>>>>>> On Fri, Dec 2, 2022 at 10:12 PM Richard Beare < > [email protected]> wrote: > >>>>>>>>>>> > >>>>>>>>>>> Hi, > >>>>>>>>>>> I'm having a great deal of trouble configuring the mergerecord > processor to deliver reasonable performance and I'm not sure where to look > to correct it. One of my upstream processors requires a single record per > flowfile, but I'd like to create larger flowfiles before passing to the > next stage. The flowfiles are independent at this stage so there's no > special processing required of the merging. I'd like to create flowfiles of > about 50 to 100 records. > >>>>>>>>>>> > >>>>>>>>>>> I have two tests, both running on the same nifi system. One > uses synthetic data, the other the production data. The performance of the > mergerecord processor for the synthetic data is as I'd expect, and I can't > figure out why the production data is so much slower. Here's the > >>>>>>>>>>> configuration: > >>>>>>>>>>> > >>>>>>>>>>> mergerecord has the following settings. Timer driven, 1 > concurrent task, 5 second run schedule, bin packing merge strategy, min > records = 1, max records = 100, max bin age = 4.5 secs, maximum number of > bins = 1. > >>>>>>>>>>> > >>>>>>>>>>> In the case of synthetic data the typical flowfile size is in > the range 2 to 7KB. > >>>>>>>>>>> > >>>>>>>>>>> The size of flowfiles for the production case is smaller - > typically around 1KB. > >>>>>>>>>>> > >>>>>>>>>>> The structure in the tests is slightly different. Synthetic is > (note that I've removed the text part): > >>>>>>>>>>> > >>>>>>>>>>> [ { > >>>>>>>>>>> "sampleid" : 1075, > >>>>>>>>>>> "typeid" : 98, > >>>>>>>>>>> "dct" : "2020-01-25T21:40:25.515Z", > >>>>>>>>>>> "filename" : "__tmp/txt/mtsamples-type-98-sample-1075.txt", > >>>>>>>>>>> "document" : "Text removed - typically a few hundred words", > >>>>>>>>>>> "docid" : "9" > >>>>>>>>>>> } ] > >>>>>>>>>>> > >>>>>>>>>>> Production is: > >>>>>>>>>>> [ { > >>>>>>>>>>> "doc_id" : "5.60622895E8", > >>>>>>>>>>> "doc_text" : " Text deleted - typically a few hundred words", > >>>>>>>>>>> "processing_timestamp" : "2022-11-27T23:56:35.601Z", > >>>>>>>>>>> "metadata_x_ocr_applied" : true, > >>>>>>>>>>> "metadata_x_parsed_by" : > "org.apache.tika.parser.DefaultParser;org.apache.tika.parser.microsoft.rtf.RTFParser;org.apache.tika.parser.AutoDetectParser", > >>>>>>>>>>> "metadata_content_type" : "application/rtf", > >>>>>>>>>>> "metadata_page_count" : null, > >>>>>>>>>>> "metadata_creation_date" : null, > >>>>>>>>>>> "metadata_last_modified" : null > >>>>>>>>>>> } ] > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> I load up the queue feeding the mergerecord processor with > several hundred individual flowfiles and activate it. > >>>>>>>>>>> > >>>>>>>>>>> The synthetic data is nicely placed into chunks of 100, with > any remainder being flushed in a smaller chunk. > >>>>>>>>>>> > >>>>>>>>>>> The production data is generally bundled into groups of 6 > records, sometimes less. Certainly it never gets close to 100 records. > >>>>>>>>>>> > >>>>>>>>>>> Any ideas as to what I should look at to track down the > difference? > >>>>>>>>>>> > >>>>>>>>>>> Thanks > >>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>> > > >
