Further - I performed another test in which I replaced the custom json to
avro script with a ConvertRecord processor - merge record appears to work
as expected in that case.
Output of convertrecord looks like this:
[ {
"text" : " No Alert Found \n\n",
"metadata" : {
"X_TIKA_Parsed_By" : null,
"X_OCR_Applied" : null,
"Content_Type" : null
},
"success" : true,
"timestamp" : "2022-12-05T10:49:18.568Z",
"processingElapsedTime" : 0,
"doc_id" : "5.60178607E8"
} ]
while the output of the script looks like:
[ {
"doc_id" : "5.61996505E8",
"doc_text" : " No Alert Found \n\n",
"processing_timestamp" : "2022-11-28T01:16:46.775Z",
"metadata_x_ocr_applied" : true,
"metadata_x_parsed_by" :
"org.apache.tika.parser.DefaultParser;org.apache.tika.parser.microsoft.rtf.RTFParser;org.apache.tika.parser.AutoDetectParser",
"metadata_content_type" : "application/rtf",
"metadata_page_count" : null,
"metadata_creation_date" : null,
"metadata_last_modified" : null
} ]
Is there something about this structure that is likely to be causing the
problem? Could there be other issues with the avro generated by the script?
On Mon, Dec 5, 2022 at 9:31 PM Richard Beare <[email protected]>
wrote:
> I've reset the backpressure to the default
>
> This remains something of a mystery. The merge with synthetic data happily
> creates flowfiles with 100 records, and the join says "Records merged due
> to: Bin is full" or "Records merged due to: Bin is full enough". No
> timeouts in that case, even with the max bin age at 4.5 seconds. The
> resulting flowfiles were about 300K.
>
> The real data is doing much the same as before, producing flowfiles of
> about 30K, with 7 records or so. If I increase the maximum bin age to 30
> seconds the size and record count is higher - 12 to 15. Nothing like the
> behaviour with synthetic data, where the 100 record flowfiles are created
> almost instantly. Joins are always due to bin age.
>
> Can the problem relate to the structure of the avro files? Any way to dive
> into that? Everything else about the mergerecord settings appear the same,
> so I can't see an explanation as to why the behaviour is different on the
> same hardware.
>
>
>
>
>
>
>
>
> On Mon, Dec 5, 2022 at 2:09 AM Mark Payne <[email protected]> wrote:
>
>> Hey Richard,
>>
>> So a few things that I’ve done/looked at.
>>
>> I generated some Avro data (random JSON that I downloaded from a Random
>> JSON Generator and then converted to Avro).
>>
>> I then ran this avro data into both the MergeRecord processors.
>>
>> Firstly, I noticed that both are very slow. Found that was because Run
>> Schedule was set to 5 seconds. This should *ALWAYS* be 0 secs for
>> MergeRecord. And really for basically all processors except for the first
>> one in the flow.
>>
>> I also notice that you have backpressure set on your connections to
>> 40,000 FlowFiles and 4 GB. This can significantly slow things down. If you
>> have performance concerns you definitely want backpressure set back to the
>> default of 10,000 FlowFiles. Otherwise, as the queues fill up they start
>> “swapping out” FlowFiles to disk, and this can significantly slow things
>> down.
>>
>> I noticed that MergeRecord is set to 1 concurrent task. Probably worth
>> considering increasing that, if performance is a concern.
>>
>> That said, I am seeing nice, full bins of 100 records merged from each of
>> the MergeRecord processors.
>> So it is certainly possible that if you’re seeing smaller bins it’s
>> becuase you’re timing out. The 4.5 seconds timeout is quite short. Have you
>> tried increasing that to say 30 seconds to see if it gives you larger bins?
>> I also recommend that you take a look at the data provenance to see why
>> it’s creating the bins.
>>
>> If unclear how to do that:
>> Right-click on the MergeRecord processor
>> Go to View data provenance
>> Scroll down the list until you see a “JOIN” event type. You can ignore
>> the ATTRIBUTES_MODIFIED and DROP events for now.
>> Click the ‘i’ icon on the left-hand side.
>> This will show you details about the merge. In the Details tab, if you
>> scroll down, it will show you a Details field, which tells you why the data
>> was merged. It should either say: "Records Merged due to: Bin has reached
>> Max Bin Age” or “ Records Merged due to: Bin is full”
>>
>> If it is due to Max Bin Age reached, then I’d recommend increasing number
>> of concurrent tasks, reducing backpressure to no more than 10,000 FlowFiles
>> in the queue, and/or increasing the Max Bin Age.
>> Also worth asking - what kind of machines is this running on? A 64 core
>> VM with 1 TB volume will, of course, run MUCH differently than a 4 core VM
>> with a 10 GB volume, especially in the cloud.
>>
>> If still having trouble, let me know what the provenance tells you about
>> the reason for merging the data, and we can go from there.
>>
>> Thanks!
>> -Mark
>>
>>
>> On Dec 3, 2022, at 4:38 PM, Mark Payne <[email protected]> wrote:
>>
>> Richard,
>>
>> I think just the flow structure shoudl be sufficient.
>>
>> Thanks
>> -Mark
>>
>>
>> On Dec 3, 2022, at 4:32 PM, Richard Beare <[email protected]>
>> wrote:
>>
>> Thanks for responding,
>> I re-tested with max bins = 2, but the behaviour remained the same. I can
>> easily share a version of the functioning workflow (and data), which is
>> part of a public project. The problem workflow (which shares many of the
>> same components) is part of a health research project, so more difficult. I
>> definitely can't share any data from that one. Do you need to see the data
>> or is the overall structure sufficient at this point? Happy to demonstrate
>> via video conference too.
>>
>> Thanks
>>
>> On Sun, Dec 4, 2022 at 1:37 AM Mark Payne <[email protected]> wrote:
>>
>>> Hi Richard,
>>>
>>> Can you try increasing the Maximum Number of Bins? I think there was an
>>> issue that was recently addressed in which the merge processors had an
>>> issue when Max Number of Bins = 1.
>>>
>>> If you still see the same issue, please provide a copy of the flow that
>>> can be used to replicate the issue.
>>>
>>> Thanks
>>> -Mark
>>>
>>>
>>> On Dec 3, 2022, at 5:21 AM, Richard Beare <[email protected]>
>>> wrote:
>>>
>>> Hi,
>>>
>>> Pretty much the same - I seem to end up with flowfiles containing about
>>> 7 records, presumably always triggered by the timeout.
>>>
>>> I had thought the timeout needed to be less than the run schedule, but
>>> it looks like it can be the same.
>>>
>>> Here's a debug dump
>>>
>>> 10:13:43 UTC
>>> DEBUG
>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>
>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>> id=1066297 to RecordBin[size=4, full=false, isComplete=false, id=4021]
>>>
>>> 10:13:43 UTC
>>> DEBUG
>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>
>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>> id=1066297 to RecordBin[size=5, full=false, isComplete=false, id=4021]
>>>
>>> 10:13:44 UTC
>>> DEBUG
>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>
>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID
>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
>>> for FlowFile[filename=9e9908f6-b28e-4615-b6c8-4bd163a3bc00]
>>>
>>> 10:13:44 UTC
>>> DEBUG
>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>
>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>> id=1066372 to RecordBin[size=5, full=false, isComplete=false, id=4021]
>>>
>>> 10:13:44 UTC
>>> DEBUG
>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>
>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>> id=1066372 to RecordBin[size=6, full=false, isComplete=false, id=4021]
>>>
>>> 10:13:45 UTC
>>> DEBUG
>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>
>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>> id=1066575 to RecordBin[size=7, full=false, isComplete=true, id=4021]
>>>
>>> 10:13:46 UTC
>>> DEBUG
>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>
>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID
>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
>>> for FlowFile[filename=9e9908f6-b28e-4615-b6c8-4bd163a3bc00]
>>>
>>> 10:13:46 UTC
>>> DEBUG
>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>
>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>> id=1066612 to RecordBin[size=0, full=false, isComplete=false, id=4022]
>>>
>>> 10:13:46 UTC
>>> DEBUG
>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>
>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Created
>>> OutputStream using session StandardProcessSession[id=83204] for
>>> RecordBin[size=0, full=false, isComplete=false, id=4022]
>>>
>>> 10:13:46 UTC
>>> DEBUG
>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>
>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>> id=1066612 to RecordBin[size=1, full=false, isComplete=false, id=4022]
>>>
>>> 10:13:48 UTC
>>> DEBUG
>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>
>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>> id=1066896 to RecordBin[size=2, full=false, isComplete=false, id=4022]
>>>
>>> 10:13:48 UTC
>>> DEBUG
>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>
>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>> id=1066896 to RecordBin[size=3, full=false, isComplete=false, id=4022]
>>>
>>> 10:13:49 UTC
>>> DEBUG
>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>
>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID
>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
>>> for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e]
>>>
>>> 10:13:49 UTC
>>> DEBUG
>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>
>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>> id=1067051 to RecordBin[size=3, full=false, isComplete=false, id=4022]
>>>
>>> 10:13:49 UTC
>>> DEBUG
>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>
>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>> id=1067051 to RecordBin[size=4, full=false, isComplete=false, id=4022]
>>>
>>> 10:13:52 UTC
>>> DEBUG
>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>
>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>> id=1067254 to RecordBin[size=7, full=false, isComplete=true, id=4022]
>>>
>>> 10:13:53 UTC
>>> DEBUG
>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>
>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID
>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
>>> for FlowFile[filename=b7f4498d-647a-46d1-ad9f-badaed8591f8]
>>>
>>> 10:13:53 UTC
>>> DEBUG
>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>
>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>> id=1067395 to RecordBin[size=0, full=false, isComplete=false, id=4023]
>>>
>>> 10:13:53 UTC
>>> DEBUG
>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>
>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Created
>>> OutputStream using session StandardProcessSession[id=83205] for
>>> RecordBin[size=0, full=false, isComplete=false, id=4023]
>>>
>>> 10:13:53 UTC
>>> DEBUG
>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>
>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>> id=1067395 to RecordBin[size=1, full=false, isComplete=false, id=4023]
>>>
>>> 10:13:54 UTC
>>> DEBUG
>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>
>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>> id=1068472 to RecordBin[size=1, full=false, isComplete=false, id=4023]
>>>
>>> 10:13:54 UTC
>>> DEBUG
>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>
>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>> id=1068472 to RecordBin[size=2, full=false, isComplete=false, id=4023]
>>>
>>> 10:13:55 UTC
>>> DEBUG
>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>
>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID
>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
>>> for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e]
>>>
>>> 10:13:55 UTC
>>> DEBUG
>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>
>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>> id=1068597 to RecordBin[size=2, full=false, isComplete=false, id=4023]
>>>
>>> 10:13:55 UTC
>>> DEBUG
>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>
>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>> id=1068597 to RecordBin[size=3, full=false, isComplete=false, id=4023]
>>>
>>> 10:13:58 UTC
>>> DEBUG
>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>
>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] RecordBin[size=6,
>>> full=false, isComplete=false, id=4023] is now expired. Completing bin.
>>>
>>> 10:13:58 UTC
>>> DEBUG
>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>
>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Marked
>>> RecordBin[size=6, full=false, isComplete=true, id=4023] as complete because
>>> complete() was called
>>>
>>> 10:13:58 UTC
>>> DEBUG
>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>
>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Closed Record
>>> Writer using session StandardProcessSession[id=83205] for RecordBin[size=6,
>>> full=false, isComplete=true, id=4023]
>>>
>>> 10:13:58 UTC
>>> DEBUG
>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>
>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Completed bin
>>> RecordBin[size=6, full=false, isComplete=true, id=4023] with 6 records with
>>> Merged FlowFile FlowFile[filename=6824b503-82b9-444e-a77e-9b081e878948]
>>> using input FlowFiles [id=1067395, id=1068472, id=1068597, id=1068663,
>>> id=1068800, id=1068845]
>>>
>>> 10:13:58 UTC
>>> DEBUG
>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>
>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>> id=1068845 to RecordBin[size=6, full=false, isComplete=true, id=4023]
>>>
>>> 10:14:01 UTC
>>> DEBUG
>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>
>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>> id=1069272 to RecordBin[size=2, full=false, isComplete=false, id=4024]
>>>
>>> 10:14:01 UTC
>>> DEBUG
>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>
>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>> id=1069272 to RecordBin[size=3, full=false, isComplete=false, id=4024]
>>>
>>> 10:14:02 UTC
>>> DEBUG
>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>
>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID
>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
>>> for FlowFile[filename=b7f4498d-647a-46d1-ad9f-badaed8591f8]
>>>
>>> 10:14:02 UTC
>>> DEBUG
>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>
>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>> id=1069316 to RecordBin[size=3, full=false, isComplete=false, id=4024]
>>>
>>> 10:14:02 UTC
>>> DEBUG
>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>
>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>> id=1069316 to RecordBin[size=4, full=false, isComplete=false, id=4024]
>>>
>>> 10:14:05 UTC
>>> DEBUG
>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>
>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] RecordBin[size=6,
>>> full=false, isComplete=false, id=4024] is now expired. Completing bin.
>>>
>>> 10:14:05 UTC
>>> DEBUG
>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>
>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Marked
>>> RecordBin[size=6, full=false, isComplete=true, id=4024] as complete because
>>> complete() was called
>>>
>>> 10:14:05 UTC
>>> DEBUG
>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>
>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Closed Record
>>> Writer using session StandardProcessSession[id=83206] for RecordBin[size=6,
>>> full=false, isComplete=true, id=4024]
>>>
>>> 10:14:05 UTC
>>> DEBUG
>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>
>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Completed bin
>>> RecordBin[size=6, full=false, isComplete=true, id=4024] with 6 records with
>>> Merged FlowFile FlowFile[filename=6c13e518-655b-4507-ad6c-d37f6b9c0a5d]
>>> using input FlowFiles [id=1069044, id=1069103, id=1069272, id=1069316,
>>> id=1069451, id=1069492]
>>>
>>> 10:14:05 UTC
>>> DEBUG
>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>
>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>> id=1069492 to RecordBin[size=6, full=false, isComplete=true, id=4024]
>>>
>>> 10:14:07 UTC
>>> DEBUG
>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>
>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>> id=1072118 to RecordBin[size=2, full=false, isComplete=false, id=4025]
>>>
>>> 10:14:07 UTC
>>> DEBUG
>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>
>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>> id=1072118 to RecordBin[size=3, full=false, isComplete=false, id=4025]
>>>
>>> 10:14:08 UTC
>>> DEBUG
>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>
>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID
>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
>>> for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e]
>>>
>>> 10:14:08 UTC
>>> DEBUG
>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>
>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>> id=1072197 to RecordBin[size=3, full=false, isComplete=false, id=4025]
>>>
>>> 10:14:08 UTC
>>> DEBUG
>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>
>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>> id=1072197 to RecordBin[size=4, full=false, isComplete=false, id=4025]
>>>
>>>
>>> On Sat, Dec 3, 2022 at 4:21 PM Joe Witt <[email protected]> wrote:
>>>
>>>> Hello
>>>>
>>>> Run schedule should be 0.
>>>>
>>>> 50 should be the min number of records
>>>>
>>>> 5 seconds is the max bin age it sounds like you want.
>>>>
>>>> Start with these changes and let us know what youre seeing.
>>>>
>>>> Thanks
>>>>
>>>> On Fri, Dec 2, 2022 at 10:12 PM Richard Beare <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> I'm having a great deal of trouble configuring the mergerecord
>>>>> processor to deliver reasonable performance and I'm not sure where to look
>>>>> to correct it. One of my upstream processors requires a single record per
>>>>> flowfile, but I'd like to create larger flowfiles before passing to the
>>>>> next stage. The flowfiles are independent at this stage so there's no
>>>>> special processing required of the merging. I'd like to create flowfiles
>>>>> of
>>>>> about 50 to 100 records.
>>>>>
>>>>> I have two tests, both running on the same nifi system. One uses
>>>>> synthetic data, the other the production data. The performance of the
>>>>> mergerecord processor for the synthetic data is as I'd expect, and I can't
>>>>> figure out why the production data is so much slower. Here's the
>>>>> configuration:
>>>>>
>>>>> mergerecord has the following settings. Timer driven, 1 concurrent
>>>>> task, 5 second run schedule, bin packing merge strategy, min records = 1,
>>>>> max records = 100, max bin age = 4.5 secs, maximum number of bins = 1.
>>>>>
>>>>> In the case of synthetic data the typical flowfile size is in the
>>>>> range 2 to 7KB.
>>>>>
>>>>> The size of flowfiles for the production case is smaller - typically
>>>>> around 1KB.
>>>>>
>>>>> The structure in the tests is slightly different. Synthetic is (note
>>>>> that I've removed the text part):
>>>>>
>>>>> [ {
>>>>> "sampleid" : 1075,
>>>>> "typeid" : 98,
>>>>> "dct" : "2020-01-25T21:40:25.515Z",
>>>>> "filename" : "__tmp/txt/mtsamples-type-98-sample-1075.txt",
>>>>> "document" : "Text removed - typically a few hundred words",
>>>>> "docid" : "9"
>>>>> } ]
>>>>>
>>>>> Production is:
>>>>> [ {
>>>>> "doc_id" : "5.60622895E8",
>>>>> "doc_text" : " Text deleted - typically a few hundred words",
>>>>> "processing_timestamp" : "2022-11-27T23:56:35.601Z",
>>>>> "metadata_x_ocr_applied" : true,
>>>>> "metadata_x_parsed_by" :
>>>>> "org.apache.tika.parser.DefaultParser;org.apache.tika.parser.microsoft.rtf.RTFParser;org.apache.tika.parser.AutoDetectParser",
>>>>> "metadata_content_type" : "application/rtf",
>>>>> "metadata_page_count" : null,
>>>>> "metadata_creation_date" : null,
>>>>> "metadata_last_modified" : null
>>>>> } ]
>>>>>
>>>>>
>>>>>
>>>>> I load up the queue feeding the mergerecord processor with several
>>>>> hundred individual flowfiles and activate it.
>>>>>
>>>>> The synthetic data is nicely placed into chunks of 100, with any
>>>>> remainder being flushed in a smaller chunk.
>>>>>
>>>>> The production data is generally bundled into groups of 6 records,
>>>>> sometimes less. Certainly it never gets close to 100 records.
>>>>>
>>>>> Any ideas as to what I should look at to track down the difference?
>>>>>
>>>>> Thanks
>>>>>
>>>>
>>>
>>
>>