Hi Richard
Have you tried JoltTransformJSON or JoltTransformRecord

I believe you should be able to do this

Quick start here:  
https://community.cloudera.com/t5/Community-Articles/Jolt-quick-reference-for-Nifi-Jolt-Processors/ta-p/244350


> On Dec 20, 2022, at 4:13 AM, Richard Beare <[email protected]> wrote:
> 
> Hi Everyone,
> Still struggling to fix this issue and may need to try some different things.
> 
> What is the recommended way of transforming a record structure? At the moment 
> I have a groovy script doing this but the downstream processing is very slow, 
> as discussed in the preceding thread.
> 
> The transformation is very simple - starting structure is:
> 
> {
>  "result" : {
> "text" : " document text",
>   "metadata" : {
>      "X-TIKA:Parsed-By": [
>      "org.apache.tika.parser.pdf.PDFParser"
>      ],
>     "X-OCR-Applied" : true,
>     "dcterms:created": "2018;07-24T15:04:51Z",
>     "Content-Type" : "application/pdf",
>     "Page-Count" : 2,
>   },
> "success" : true,
> "timestamp" :  "2022-12-20T09:02:27.902Z",
> "processingElapsedTime" : 6
> }
> }
> 
> 
> final structure is':
> 
> [ {
> "doc_id" : 58,
> "doc_text" : "   ",
> "processing_timestamp" : "2022-12-20T09:02:27.902Z",
> "metadata_x_ocr_applies" : true,
> "metadata_x_parsed_by" : "org.apache.tika.parser.pdf.PDFParser",
> "metadata_content_type" : "application/pdf",
> "metadata_page_count" : 1
> "metadata_creation_date": null,
> "metadata_last_modified: nill
> }]
> 
> So a kind of flattening of the structure. Is there a processor I should be 
> using to do this instead of a groovy script?
> 
> Thanks
> 
> On Wed, Dec 14, 2022 at 7:57 AM Richard Beare <[email protected] 
> <mailto:[email protected]>> wrote:
> Any thoughts on this? Are there some extra steps required when creating an 
> avro file from a user defined schema?
> 
> On Thu, Dec 8, 2022 at 2:56 PM Richard Beare <[email protected] 
> <mailto:[email protected]>> wrote:
> Here's another result that I think suggests there's something wrong with the 
> avro files created by the groovy script, although I can't see what the 
> problem might be.
> 
> The test is as follows. Output of the groovy script creating avro files is 
> passed to convertrecord, configured with an avro reader and json writer. This 
> is slow. The json output is then converted back to avro with another 
> convertrecord processor, configured with a jsontreereader and an avro writer 
> - this is fast, instantly emptying the queue. The result of that is fed into 
> the previously problematic merge processor which works exactly as expected, 
> producing flowfiles with 100 records each.
> 
> The difference I can see between the two flow files is the way in which the 
> schema is specified. Perhaps some extras are required in the groovy file to 
> set that up?
> 
> The slow one has:
> 
> {"type":"record","name":"document", "fields":[{
> 
> The fast one 
> {"type":"record","name":"nifiRecord","namespace":"org.apache.nifi","fields":
> 
> 
> Initial characters are also slightly different.
> Slow one:
> 
> 0000000   O   b   j 001 002 026   a   v   r   o   .   s   c   h   e   m
> 0000020   a 346  \n   {   "   t   y   p   e   "   :   "   r   e   c   o
> 
> Fast one
> 
> 0000000   O   b   j 001 004 026   a   v   r   o   .   s   c   h   e   m
> 0000020   a 362  \b   {   "   t   y   p   e   "   :   "   r   e   c   o
> 
> 
> The groovy script is
> CogStack-NiFi/parse-tika-result-json-to-avro.groovy at master · 
> CogStack/CogStack-NiFi · GitHub 
> <https://github.com/CogStack/CogStack-NiFi/blob/master/nifi/user-scripts/parse-tika-result-json-to-avro.groovy>
> 
> The schema is
> CogStack-NiFi/document.avsc at master · CogStack/CogStack-NiFi · GitHub 
> <https://github.com/CogStack/CogStack-NiFi/blob/master/nifi/user-schemas/document.avsc>
> 
> 
> On Thu, Dec 8, 2022 at 1:59 PM Richard Beare <[email protected] 
> <mailto:[email protected]>> wrote:
> I'm diving into the convertrecord tests a bit deeper on the production server.
> 
> The first test case - 259 documents, total of 1M when in avro format in the 
> input queue to the convert record processor. These avro files were not 
> created by the groovy script - they start life as a database query and the 
> text field is in one of the columns. The convertrecord processor runs very 
> quickly - click start, press refresh and it is done. The avro ends up like 
> this:
> 
> [ {
>   "sampleid" : 1075,
>   "typeid" : 98,
>   "dct" : "2020-01-25T21:40:25.515Z",
>   "filename" : "__tmp/txt/mtsamples-type-98-sample-1075.txt",
>   "document" : "Text removed",
>   "docid" : "9"
> } ]
> 
> In the second test, where the text fields are extracted from pdf tika before 
> avro files are created by the groovy script (from the tika json output), the 
> total queue size for the 259 documents is larger - 1.77MB, and the 
> performance is very different - press start, click refresh and only two 
> flowfiles are processed.
> 
> [ {
>   "doc_id" : "70",
>   "doc_text" : "text removed",
>   "processing_timestamp" : "2022-12-07T23:09:52.354Z",
>   "metadata_x_ocr_applied" : true,
>   "metadata_x_parsed_by" : "org.apache.tika.parser.pdf.PDFParser",
>   "metadata_content_type" : "application/pdf",
>   "metadata_page_count" : 1,
>   "metadata_creation_date" : null,
>   "metadata_last_modified" : null
> } ]
> 
> I've noticed that the second one has a content.type attribute of 
> 'application/json' which doesn't seem right and doesn't match the fast case. 
> I'll see what happens if I change that.
> 
> On Thu, Dec 8, 2022 at 9:39 AM Richard Beare <[email protected] 
> <mailto:[email protected]>> wrote:
> Hi All,
> Some progress on debugging options. I've found a flow that exhibits the 
> problem using synthetic data. However the results are host dependent. On my 
> laptop a "run-once" click of merge record gives me two flowfiles of 100 
> records, while the same flow on the production server produces several much 
> smaller flowfiles. This makes me think that something funny is happening with 
> my storage setup that I'll need to chase.
> 
> I had tested the convertrecord option (simply avroreader->avrowriter) and it 
> did seem slow, but I'll investigate this further as it may be related to my 
> storage issue.
> 
> 
> 
> On Thu, Dec 8, 2022 at 1:23 AM Mark Payne <[email protected] 
> <mailto:[email protected]>> wrote:
> > Is there something about this structure that is likely to be causing the 
> > problem? Could there be other issues with the avro generated by the script?
> 
> I don’t think the structure should matter. And as long as the avro produced 
> is proper Avro, I don’t think it should matter. Unless perhaps there’s some 
> issue with the Avro library itself that’s causing it to take a really long 
> time to parse the Avro or something? I’d be curious - if you take the output 
> of your script and then you run it through a ConvertRecord (Avro Reader -> 
> Json Writer) is the ConvertRecord fast? Or is it really slow to process it?
> 
>> On Dec 5, 2022, at 5:58 AM, Richard Beare <[email protected] 
>> <mailto:[email protected]>> wrote:
>> 
>> Further - I performed another test in which I replaced the custom json to 
>> avro script with a ConvertRecord processor - merge record appears to work as 
>> expected in that case.
>> 
>> Output of convertrecord looks like this:
>> 
>> [ {
>>   "text" : "  No Alert Found \n\n",
>>   "metadata" : {
>>     "X_TIKA_Parsed_By" : null,
>>     "X_OCR_Applied" : null,
>>     "Content_Type" : null
>>   },
>>   "success" : true,
>>   "timestamp" : "2022-12-05T10:49:18.568Z",
>>   "processingElapsedTime" : 0,
>>   "doc_id" : "5.60178607E8"
>> } ]
>> 
>> while the output of the script looks like:
>> 
>> [ {
>>   "doc_id" : "5.61996505E8",
>>   "doc_text" : "  No Alert Found \n\n",
>>   "processing_timestamp" : "2022-11-28T01:16:46.775Z",
>>   "metadata_x_ocr_applied" : true,
>>   "metadata_x_parsed_by" : 
>> "org.apache.tika.parser.DefaultParser;org.apache.tika.parser.microsoft.rtf.RTFParser;org.apache.tika.parser.AutoDetectParser",
>>   "metadata_content_type" : "application/rtf",
>>   "metadata_page_count" : null,
>>   "metadata_creation_date" : null,
>>   "metadata_last_modified" : null
>> } ]
>> 
>> Is there something about this structure that is likely to be causing the 
>> problem? Could there be other issues with the avro generated by the script?
>> 
>> On Mon, Dec 5, 2022 at 9:31 PM Richard Beare <[email protected] 
>> <mailto:[email protected]>> wrote:
>> I've reset the backpressure to the default
>> 
>> This remains something of a mystery. The merge with synthetic data happily 
>> creates flowfiles with 100 records, and the join says "Records merged due 
>> to: Bin is full" or "Records merged due to: Bin is full enough". No timeouts 
>> in that case, even with the max bin age at 4.5 seconds. The resulting 
>> flowfiles were about 300K.
>> 
>> The real data is doing much the same as before, producing flowfiles of about 
>> 30K, with 7 records or so. If I increase the maximum bin age to 30 seconds 
>> the size and record count is higher - 12 to 15. Nothing like the behaviour 
>> with synthetic data, where the 100 record flowfiles are created almost 
>> instantly. Joins are always due to bin age.
>> 
>> Can the problem relate to the structure of the avro files? Any way to dive 
>> into that? Everything else about the mergerecord settings appear the same, 
>> so I can't see an explanation as to why the behaviour is different on the 
>> same hardware.
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> On Mon, Dec 5, 2022 at 2:09 AM Mark Payne <[email protected] 
>> <mailto:[email protected]>> wrote:
>> Hey Richard,
>> 
>> So a few things that I’ve done/looked at.
>> 
>> I generated some Avro data (random JSON that I downloaded from a Random JSON 
>> Generator and then converted to Avro).
>> 
>> I then ran this avro data into both the MergeRecord processors.
>> 
>> Firstly, I noticed that both are very slow. Found that was because Run 
>> Schedule was set to 5 seconds. This should *ALWAYS* be 0 secs for 
>> MergeRecord. And really for basically all processors except for the first 
>> one in the flow.
>> 
>> I also notice that you have backpressure set on your connections to 40,000 
>> FlowFiles and 4 GB. This can significantly slow things down. If you have 
>> performance concerns you definitely want backpressure set back to the 
>> default of 10,000 FlowFiles. Otherwise, as the queues fill up they start 
>> “swapping out” FlowFiles to disk, and this can significantly slow things 
>> down.
>> 
>> I noticed that MergeRecord is set to 1 concurrent task. Probably worth 
>> considering increasing that, if performance is a concern.
>> 
>> That said, I am seeing nice, full bins of 100 records merged from each of 
>> the MergeRecord processors.
>> So it is certainly possible that if you’re seeing smaller bins it’s becuase 
>> you’re timing out. The 4.5 seconds timeout is quite short. Have you tried 
>> increasing that to say 30 seconds to see if it gives you larger bins?
>> I also recommend that you take a look at the data provenance to see why it’s 
>> creating the bins.
>> 
>> If unclear how to do that:
>> Right-click on the MergeRecord processor
>> Go to View data provenance
>> Scroll down the list until you see a “JOIN” event type. You can ignore the 
>> ATTRIBUTES_MODIFIED and DROP events for now.
>> Click the ‘i’ icon on the left-hand side.
>> This will show you details about the merge. In the Details tab, if you 
>> scroll down, it will show you a Details field, which tells you why the data 
>> was merged. It should either say: "Records Merged due to: Bin has reached 
>> Max Bin Age” or “ Records Merged due to: Bin is full”
>> 
>> If it is due to Max Bin Age reached, then I’d recommend increasing number of 
>> concurrent tasks, reducing backpressure to no more than 10,000 FlowFiles in 
>> the queue, and/or increasing the Max Bin Age. 
>> Also worth asking - what kind of machines is this running on? A 64 core VM 
>> with 1 TB volume will, of course, run MUCH differently than a 4 core VM with 
>> a 10 GB volume, especially in the cloud.
>> 
>> If still having trouble, let me know what the provenance tells you about the 
>> reason for merging the data, and we can go from there.
>> 
>> Thanks!
>> -Mark
>> 
>> 
>>> On Dec 3, 2022, at 4:38 PM, Mark Payne <[email protected] 
>>> <mailto:[email protected]>> wrote:
>>> 
>>> Richard,
>>> 
>>> I think just the flow structure shoudl be sufficient.
>>> 
>>> Thanks
>>> -Mark
>>> 
>>> 
>>>> On Dec 3, 2022, at 4:32 PM, Richard Beare <[email protected] 
>>>> <mailto:[email protected]>> wrote:
>>>> 
>>>> Thanks for responding,
>>>> I re-tested with max bins = 2, but the behaviour remained the same. I can 
>>>> easily share a version of the functioning workflow (and data), which is 
>>>> part of a public project. The problem workflow (which shares many of the 
>>>> same components) is part of a health research project, so more difficult. 
>>>> I definitely can't share any data from that one. Do you need to see the 
>>>> data or is the overall structure sufficient at this point? Happy to 
>>>> demonstrate via video conference too.
>>>> 
>>>> Thanks
>>>> 
>>>> On Sun, Dec 4, 2022 at 1:37 AM Mark Payne <[email protected] 
>>>> <mailto:[email protected]>> wrote:
>>>> Hi Richard,
>>>> 
>>>> Can you try increasing the Maximum Number of Bins? I think there was an 
>>>> issue that was recently addressed in which the merge processors had an 
>>>> issue when Max Number of Bins = 1.
>>>> 
>>>> If you still see the same issue, please provide a copy of the flow that 
>>>> can be used to replicate the issue.
>>>> 
>>>> Thanks
>>>> -Mark
>>>> 
>>>> 
>>>>> On Dec 3, 2022, at 5:21 AM, Richard Beare <[email protected] 
>>>>> <mailto:[email protected]>> wrote:
>>>>> 
>>>>> Hi,
>>>>> 
>>>>> Pretty much the same - I seem to end up with flowfiles containing about 7 
>>>>> records, presumably always triggered by the timeout.
>>>>> 
>>>>> I had thought the timeout needed to be less than the run schedule, but it 
>>>>> looks like it can be the same.
>>>>> 
>>>>> Here's a debug dump
>>>>> 
>>>>> 10:13:43 UTC
>>>>> DEBUG
>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>> 
>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1066297 
>>>>> to RecordBin[size=4, full=false, isComplete=false, id=4021]
>>>>> 
>>>>> 10:13:43 UTC
>>>>> DEBUG
>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>> 
>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred 
>>>>> id=1066297 to RecordBin[size=5, full=false, isComplete=false, id=4021]
>>>>> 
>>>>> 10:13:44 UTC
>>>>> DEBUG
>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>> 
>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID 
>>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
>>>>>  for FlowFile[filename=9e9908f6-b28e-4615-b6c8-4bd163a3bc00]
>>>>> 
>>>>> 10:13:44 UTC
>>>>> DEBUG
>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>> 
>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1066372 
>>>>> to RecordBin[size=5, full=false, isComplete=false, id=4021]
>>>>> 
>>>>> 10:13:44 UTC
>>>>> DEBUG
>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>> 
>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred 
>>>>> id=1066372 to RecordBin[size=6, full=false, isComplete=false, id=4021]
>>>>> 
>>>>> 10:13:45 UTC
>>>>> DEBUG
>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>> 
>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred 
>>>>> id=1066575 to RecordBin[size=7, full=false, isComplete=true, id=4021]
>>>>> 
>>>>> 10:13:46 UTC
>>>>> DEBUG
>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>> 
>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID 
>>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
>>>>>  for FlowFile[filename=9e9908f6-b28e-4615-b6c8-4bd163a3bc00]
>>>>> 
>>>>> 10:13:46 UTC
>>>>> DEBUG
>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>> 
>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1066612 
>>>>> to RecordBin[size=0, full=false, isComplete=false, id=4022]
>>>>> 
>>>>> 10:13:46 UTC
>>>>> DEBUG
>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>> 
>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Created OutputStream 
>>>>> using session StandardProcessSession[id=83204] for RecordBin[size=0, 
>>>>> full=false, isComplete=false, id=4022]
>>>>> 
>>>>> 10:13:46 UTC
>>>>> DEBUG
>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>> 
>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred 
>>>>> id=1066612 to RecordBin[size=1, full=false, isComplete=false, id=4022]
>>>>> 
>>>>> 10:13:48 UTC
>>>>> DEBUG
>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>> 
>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1066896 
>>>>> to RecordBin[size=2, full=false, isComplete=false, id=4022]
>>>>> 
>>>>> 10:13:48 UTC
>>>>> DEBUG
>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>> 
>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred 
>>>>> id=1066896 to RecordBin[size=3, full=false, isComplete=false, id=4022]
>>>>> 
>>>>> 10:13:49 UTC
>>>>> DEBUG
>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>> 
>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID 
>>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
>>>>>  for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e]
>>>>> 
>>>>> 10:13:49 UTC
>>>>> DEBUG
>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>> 
>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1067051 
>>>>> to RecordBin[size=3, full=false, isComplete=false, id=4022]
>>>>> 
>>>>> 10:13:49 UTC
>>>>> DEBUG
>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>> 
>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred 
>>>>> id=1067051 to RecordBin[size=4, full=false, isComplete=false, id=4022]
>>>>> 
>>>>> 10:13:52 UTC
>>>>> DEBUG
>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>> 
>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred 
>>>>> id=1067254 to RecordBin[size=7, full=false, isComplete=true, id=4022]
>>>>> 
>>>>> 10:13:53 UTC
>>>>> DEBUG
>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>> 
>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID 
>>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
>>>>>  for FlowFile[filename=b7f4498d-647a-46d1-ad9f-badaed8591f8]
>>>>> 
>>>>> 10:13:53 UTC
>>>>> DEBUG
>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>> 
>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1067395 
>>>>> to RecordBin[size=0, full=false, isComplete=false, id=4023]
>>>>> 
>>>>> 10:13:53 UTC
>>>>> DEBUG
>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>> 
>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Created OutputStream 
>>>>> using session StandardProcessSession[id=83205] for RecordBin[size=0, 
>>>>> full=false, isComplete=false, id=4023]
>>>>> 
>>>>> 10:13:53 UTC
>>>>> DEBUG
>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>> 
>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred 
>>>>> id=1067395 to RecordBin[size=1, full=false, isComplete=false, id=4023]
>>>>> 
>>>>> 10:13:54 UTC
>>>>> DEBUG
>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>> 
>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1068472 
>>>>> to RecordBin[size=1, full=false, isComplete=false, id=4023]
>>>>> 
>>>>> 10:13:54 UTC
>>>>> DEBUG
>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>> 
>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred 
>>>>> id=1068472 to RecordBin[size=2, full=false, isComplete=false, id=4023]
>>>>> 
>>>>> 10:13:55 UTC
>>>>> DEBUG
>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>> 
>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID 
>>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
>>>>>  for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e]
>>>>> 
>>>>> 10:13:55 UTC
>>>>> DEBUG
>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>> 
>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1068597 
>>>>> to RecordBin[size=2, full=false, isComplete=false, id=4023]
>>>>> 
>>>>> 10:13:55 UTC
>>>>> DEBUG
>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>> 
>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred 
>>>>> id=1068597 to RecordBin[size=3, full=false, isComplete=false, id=4023]
>>>>> 
>>>>> 10:13:58 UTC
>>>>> DEBUG
>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>> 
>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] RecordBin[size=6, 
>>>>> full=false, isComplete=false, id=4023] is now expired. Completing bin.
>>>>> 
>>>>> 10:13:58 UTC
>>>>> DEBUG
>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>> 
>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Marked 
>>>>> RecordBin[size=6, full=false, isComplete=true, id=4023] as complete 
>>>>> because complete() was called
>>>>> 
>>>>> 10:13:58 UTC
>>>>> DEBUG
>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>> 
>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Closed Record Writer 
>>>>> using session StandardProcessSession[id=83205] for RecordBin[size=6, 
>>>>> full=false, isComplete=true, id=4023]
>>>>> 
>>>>> 10:13:58 UTC
>>>>> DEBUG
>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>> 
>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Completed bin 
>>>>> RecordBin[size=6, full=false, isComplete=true, id=4023] with 6 records 
>>>>> with Merged FlowFile 
>>>>> FlowFile[filename=6824b503-82b9-444e-a77e-9b081e878948] using input 
>>>>> FlowFiles [id=1067395, id=1068472, id=1068597, id=1068663, id=1068800, 
>>>>> id=1068845]
>>>>> 
>>>>> 10:13:58 UTC
>>>>> DEBUG
>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>> 
>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred 
>>>>> id=1068845 to RecordBin[size=6, full=false, isComplete=true, id=4023]
>>>>> 
>>>>> 10:14:01 UTC
>>>>> DEBUG
>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>> 
>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1069272 
>>>>> to RecordBin[size=2, full=false, isComplete=false, id=4024]
>>>>> 
>>>>> 10:14:01 UTC
>>>>> DEBUG
>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>> 
>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred 
>>>>> id=1069272 to RecordBin[size=3, full=false, isComplete=false, id=4024]
>>>>> 
>>>>> 10:14:02 UTC
>>>>> DEBUG
>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>> 
>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID 
>>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
>>>>>  for FlowFile[filename=b7f4498d-647a-46d1-ad9f-badaed8591f8]
>>>>> 
>>>>> 10:14:02 UTC
>>>>> DEBUG
>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>> 
>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1069316 
>>>>> to RecordBin[size=3, full=false, isComplete=false, id=4024]
>>>>> 
>>>>> 10:14:02 UTC
>>>>> DEBUG
>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>> 
>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred 
>>>>> id=1069316 to RecordBin[size=4, full=false, isComplete=false, id=4024]
>>>>> 
>>>>> 10:14:05 UTC
>>>>> DEBUG
>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>> 
>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] RecordBin[size=6, 
>>>>> full=false, isComplete=false, id=4024] is now expired. Completing bin.
>>>>> 
>>>>> 10:14:05 UTC
>>>>> DEBUG
>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>> 
>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Marked 
>>>>> RecordBin[size=6, full=false, isComplete=true, id=4024] as complete 
>>>>> because complete() was called
>>>>> 
>>>>> 10:14:05 UTC
>>>>> DEBUG
>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>> 
>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Closed Record Writer 
>>>>> using session StandardProcessSession[id=83206] for RecordBin[size=6, 
>>>>> full=false, isComplete=true, id=4024]
>>>>> 
>>>>> 10:14:05 UTC
>>>>> DEBUG
>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>> 
>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Completed bin 
>>>>> RecordBin[size=6, full=false, isComplete=true, id=4024] with 6 records 
>>>>> with Merged FlowFile 
>>>>> FlowFile[filename=6c13e518-655b-4507-ad6c-d37f6b9c0a5d] using input 
>>>>> FlowFiles [id=1069044, id=1069103, id=1069272, id=1069316, id=1069451, 
>>>>> id=1069492]
>>>>> 
>>>>> 10:14:05 UTC
>>>>> DEBUG
>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>> 
>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred 
>>>>> id=1069492 to RecordBin[size=6, full=false, isComplete=true, id=4024]
>>>>> 
>>>>> 10:14:07 UTC
>>>>> DEBUG
>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>> 
>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1072118 
>>>>> to RecordBin[size=2, full=false, isComplete=false, id=4025]
>>>>> 
>>>>> 10:14:07 UTC
>>>>> DEBUG
>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>> 
>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred 
>>>>> id=1072118 to RecordBin[size=3, full=false, isComplete=false, id=4025]
>>>>> 
>>>>> 10:14:08 UTC
>>>>> DEBUG
>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>> 
>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID 
>>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
>>>>>  for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e]
>>>>> 
>>>>> 10:14:08 UTC
>>>>> DEBUG
>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>> 
>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1072197 
>>>>> to RecordBin[size=3, full=false, isComplete=false, id=4025]
>>>>> 
>>>>> 10:14:08 UTC
>>>>> DEBUG
>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>> 
>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred 
>>>>> id=1072197 to RecordBin[size=4, full=false, isComplete=false, id=4025]
>>>>> 
>>>>> 
>>>>> On Sat, Dec 3, 2022 at 4:21 PM Joe Witt <[email protected] 
>>>>> <mailto:[email protected]>> wrote:
>>>>> Hello
>>>>> 
>>>>> Run schedule should be 0.
>>>>> 
>>>>> 50 should be the min number of records
>>>>> 
>>>>> 5 seconds is the max bin age it sounds like you want.
>>>>> 
>>>>> Start with these changes and let us know what youre seeing.
>>>>> 
>>>>> Thanks
>>>>> 
>>>>> On Fri, Dec 2, 2022 at 10:12 PM Richard Beare <[email protected] 
>>>>> <mailto:[email protected]>> wrote:
>>>>> Hi,
>>>>> I'm having a great deal of trouble configuring the mergerecord processor 
>>>>> to deliver reasonable performance and I'm not sure where to look to 
>>>>> correct it. One of my upstream processors requires a single record per 
>>>>> flowfile, but I'd like to create larger flowfiles before passing to the 
>>>>> next stage. The flowfiles are independent at this stage so there's no 
>>>>> special processing required of the merging. I'd like to create flowfiles 
>>>>> of about 50 to 100 records.
>>>>> 
>>>>> I have two tests, both running on the same nifi system. One uses 
>>>>> synthetic data, the other the production data. The performance of the 
>>>>> mergerecord processor for the synthetic data is as I'd expect, and I 
>>>>> can't figure out why the  production data is so much slower. Here's the 
>>>>> configuration:
>>>>> 
>>>>> mergerecord has the following settings. Timer driven, 1 concurrent task, 
>>>>> 5 second run schedule, bin packing merge strategy, min records = 1, max 
>>>>> records = 100, max bin age = 4.5 secs, maximum number of bins = 1.
>>>>> 
>>>>> In the case of synthetic data the typical flowfile size is in the range 2 
>>>>> to 7KB.
>>>>> 
>>>>> The size of flowfiles for the production case is smaller - typically 
>>>>> around 1KB.
>>>>> 
>>>>> The structure in the tests is slightly different. Synthetic is (note that 
>>>>> I've removed the text part):
>>>>> 
>>>>> [ {
>>>>>   "sampleid" : 1075,
>>>>>   "typeid" : 98,
>>>>>   "dct" : "2020-01-25T21:40:25.515Z",
>>>>>   "filename" : "__tmp/txt/mtsamples-type-98-sample-1075.txt",
>>>>>   "document" : "Text removed - typically a few hundred words",
>>>>>   "docid" : "9"
>>>>> } ]
>>>>> 
>>>>> Production is:
>>>>> [ {
>>>>>   "doc_id" : "5.60622895E8",
>>>>>   "doc_text" : " Text deleted - typically a few hundred words",
>>>>>   "processing_timestamp" : "2022-11-27T23:56:35.601Z",
>>>>>   "metadata_x_ocr_applied" : true,
>>>>>   "metadata_x_parsed_by" : 
>>>>> "org.apache.tika.parser.DefaultParser;org.apache.tika.parser.microsoft.rtf.RTFParser;org.apache.tika.parser.AutoDetectParser",
>>>>>   "metadata_content_type" : "application/rtf",
>>>>>   "metadata_page_count" : null,
>>>>>   "metadata_creation_date" : null,
>>>>>   "metadata_last_modified" : null
>>>>> } ]
>>>>> 
>>>>> 
>>>>> 
>>>>> I load up the queue feeding the mergerecord processor with several 
>>>>> hundred individual flowfiles and activate it.
>>>>> 
>>>>> The synthetic data is nicely placed into chunks of 100, with any 
>>>>> remainder being flushed in a smaller chunk.
>>>>> 
>>>>> The production data is generally bundled into groups of 6 records, 
>>>>> sometimes less. Certainly it never gets close to 100 records.
>>>>> 
>>>>> Any ideas as to what I should look at to track down the difference?
>>>>> 
>>>>> Thanks
>>>> 
>>> 
>> 
> 

Reply via email to