Hi,
Pretty much the same - I seem to end up with flowfiles containing about 7
records, presumably always triggered by the timeout.
I had thought the timeout needed to be less than the run schedule, but it
looks like it can be the same.
Here's a debug dump
10:13:43 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1066297
to RecordBin[size=4, full=false, isComplete=false, id=4021]
10:13:43 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1066297
to RecordBin[size=5, full=false, isComplete=false, id=4021]
10:13:44 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID
{"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
for FlowFile[filename=9e9908f6-b28e-4615-b6c8-4bd163a3bc00]
10:13:44 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1066372
to RecordBin[size=5, full=false, isComplete=false, id=4021]
10:13:44 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1066372
to RecordBin[size=6, full=false, isComplete=false, id=4021]
10:13:45 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1066575
to RecordBin[size=7, full=false, isComplete=true, id=4021]
10:13:46 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID
{"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
for FlowFile[filename=9e9908f6-b28e-4615-b6c8-4bd163a3bc00]
10:13:46 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1066612
to RecordBin[size=0, full=false, isComplete=false, id=4022]
10:13:46 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Created OutputStream
using session StandardProcessSession[id=83204] for RecordBin[size=0,
full=false, isComplete=false, id=4022]
10:13:46 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1066612
to RecordBin[size=1, full=false, isComplete=false, id=4022]
10:13:48 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1066896
to RecordBin[size=2, full=false, isComplete=false, id=4022]
10:13:48 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1066896
to RecordBin[size=3, full=false, isComplete=false, id=4022]
10:13:49 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID
{"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e]
10:13:49 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1067051
to RecordBin[size=3, full=false, isComplete=false, id=4022]
10:13:49 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1067051
to RecordBin[size=4, full=false, isComplete=false, id=4022]
10:13:52 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1067254
to RecordBin[size=7, full=false, isComplete=true, id=4022]
10:13:53 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID
{"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
for FlowFile[filename=b7f4498d-647a-46d1-ad9f-badaed8591f8]
10:13:53 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1067395
to RecordBin[size=0, full=false, isComplete=false, id=4023]
10:13:53 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Created OutputStream
using session StandardProcessSession[id=83205] for RecordBin[size=0,
full=false, isComplete=false, id=4023]
10:13:53 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1067395
to RecordBin[size=1, full=false, isComplete=false, id=4023]
10:13:54 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1068472
to RecordBin[size=1, full=false, isComplete=false, id=4023]
10:13:54 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1068472
to RecordBin[size=2, full=false, isComplete=false, id=4023]
10:13:55 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID
{"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e]
10:13:55 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1068597
to RecordBin[size=2, full=false, isComplete=false, id=4023]
10:13:55 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1068597
to RecordBin[size=3, full=false, isComplete=false, id=4023]
10:13:58 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] RecordBin[size=6,
full=false, isComplete=false, id=4023] is now expired. Completing bin.
10:13:58 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Marked
RecordBin[size=6, full=false, isComplete=true, id=4023] as complete because
complete() was called
10:13:58 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Closed Record Writer
using session StandardProcessSession[id=83205] for RecordBin[size=6,
full=false, isComplete=true, id=4023]
10:13:58 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Completed bin
RecordBin[size=6, full=false, isComplete=true, id=4023] with 6 records with
Merged FlowFile FlowFile[filename=6824b503-82b9-444e-a77e-9b081e878948]
using input FlowFiles [id=1067395, id=1068472, id=1068597, id=1068663,
id=1068800, id=1068845]
10:13:58 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1068845
to RecordBin[size=6, full=false, isComplete=true, id=4023]
10:14:01 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1069272
to RecordBin[size=2, full=false, isComplete=false, id=4024]
10:14:01 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1069272
to RecordBin[size=3, full=false, isComplete=false, id=4024]
10:14:02 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID
{"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
for FlowFile[filename=b7f4498d-647a-46d1-ad9f-badaed8591f8]
10:14:02 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1069316
to RecordBin[size=3, full=false, isComplete=false, id=4024]
10:14:02 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1069316
to RecordBin[size=4, full=false, isComplete=false, id=4024]
10:14:05 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] RecordBin[size=6,
full=false, isComplete=false, id=4024] is now expired. Completing bin.
10:14:05 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Marked
RecordBin[size=6, full=false, isComplete=true, id=4024] as complete because
complete() was called
10:14:05 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Closed Record Writer
using session StandardProcessSession[id=83206] for RecordBin[size=6,
full=false, isComplete=true, id=4024]
10:14:05 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Completed bin
RecordBin[size=6, full=false, isComplete=true, id=4024] with 6 records with
Merged FlowFile FlowFile[filename=6c13e518-655b-4507-ad6c-d37f6b9c0a5d]
using input FlowFiles [id=1069044, id=1069103, id=1069272, id=1069316,
id=1069451, id=1069492]
10:14:05 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1069492
to RecordBin[size=6, full=false, isComplete=true, id=4024]
10:14:07 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1072118
to RecordBin[size=2, full=false, isComplete=false, id=4025]
10:14:07 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1072118
to RecordBin[size=3, full=false, isComplete=false, id=4025]
10:14:08 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID
{"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e]
10:14:08 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1072197
to RecordBin[size=3, full=false, isComplete=false, id=4025]
10:14:08 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1072197
to RecordBin[size=4, full=false, isComplete=false, id=4025]
On Sat, Dec 3, 2022 at 4:21 PM Joe Witt <[email protected]> wrote:
> Hello
>
> Run schedule should be 0.
>
> 50 should be the min number of records
>
> 5 seconds is the max bin age it sounds like you want.
>
> Start with these changes and let us know what youre seeing.
>
> Thanks
>
> On Fri, Dec 2, 2022 at 10:12 PM Richard Beare <[email protected]>
> wrote:
>
>> Hi,
>> I'm having a great deal of trouble configuring the mergerecord processor
>> to deliver reasonable performance and I'm not sure where to look to correct
>> it. One of my upstream processors requires a single record per flowfile,
>> but I'd like to create larger flowfiles before passing to the next stage.
>> The flowfiles are independent at this stage so there's no special
>> processing required of the merging. I'd like to create flowfiles of about
>> 50 to 100 records.
>>
>> I have two tests, both running on the same nifi system. One uses
>> synthetic data, the other the production data. The performance of the
>> mergerecord processor for the synthetic data is as I'd expect, and I can't
>> figure out why the production data is so much slower. Here's the
>> configuration:
>>
>> mergerecord has the following settings. Timer driven, 1 concurrent task,
>> 5 second run schedule, bin packing merge strategy, min records = 1, max
>> records = 100, max bin age = 4.5 secs, maximum number of bins = 1.
>>
>> In the case of synthetic data the typical flowfile size is in the range 2
>> to 7KB.
>>
>> The size of flowfiles for the production case is smaller - typically
>> around 1KB.
>>
>> The structure in the tests is slightly different. Synthetic is (note that
>> I've removed the text part):
>>
>> [ {
>> "sampleid" : 1075,
>> "typeid" : 98,
>> "dct" : "2020-01-25T21:40:25.515Z",
>> "filename" : "__tmp/txt/mtsamples-type-98-sample-1075.txt",
>> "document" : "Text removed - typically a few hundred words",
>> "docid" : "9"
>> } ]
>>
>> Production is:
>> [ {
>> "doc_id" : "5.60622895E8",
>> "doc_text" : " Text deleted - typically a few hundred words",
>> "processing_timestamp" : "2022-11-27T23:56:35.601Z",
>> "metadata_x_ocr_applied" : true,
>> "metadata_x_parsed_by" :
>> "org.apache.tika.parser.DefaultParser;org.apache.tika.parser.microsoft.rtf.RTFParser;org.apache.tika.parser.AutoDetectParser",
>> "metadata_content_type" : "application/rtf",
>> "metadata_page_count" : null,
>> "metadata_creation_date" : null,
>> "metadata_last_modified" : null
>> } ]
>>
>>
>>
>> I load up the queue feeding the mergerecord processor with several
>> hundred individual flowfiles and activate it.
>>
>> The synthetic data is nicely placed into chunks of 100, with any
>> remainder being flushed in a smaller chunk.
>>
>> The production data is generally bundled into groups of 6 records,
>> sometimes less. Certainly it never gets close to 100 records.
>>
>> Any ideas as to what I should look at to track down the difference?
>>
>> Thanks
>>
>