Hi Richard,

Can you try increasing the Maximum Number of Bins? I think there was an issue 
that was recently addressed in which the merge processors had an issue when Max 
Number of Bins = 1.

If you still see the same issue, please provide a copy of the flow that can be 
used to replicate the issue.

Thanks
-Mark


On Dec 3, 2022, at 5:21 AM, Richard Beare <[email protected]> wrote:

Hi,

Pretty much the same - I seem to end up with flowfiles containing about 7 
records, presumably always triggered by the timeout.

I had thought the timeout needed to be less than the run schedule, but it looks 
like it can be the same.

Here's a debug dump

10:13:43 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1066297 to 
RecordBin[size=4, full=false, isComplete=false, id=4021]

10:13:43 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1066297 to 
RecordBin[size=5, full=false, isComplete=false, id=4021]

10:13:44 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID 
{"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
 for FlowFile[filename=9e9908f6-b28e-4615-b6c8-4bd163a3bc00]

10:13:44 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1066372 to 
RecordBin[size=5, full=false, isComplete=false, id=4021]

10:13:44 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1066372 to 
RecordBin[size=6, full=false, isComplete=false, id=4021]

10:13:45 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1066575 to 
RecordBin[size=7, full=false, isComplete=true, id=4021]

10:13:46 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID 
{"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
 for FlowFile[filename=9e9908f6-b28e-4615-b6c8-4bd163a3bc00]

10:13:46 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1066612 to 
RecordBin[size=0, full=false, isComplete=false, id=4022]

10:13:46 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Created OutputStream using 
session StandardProcessSession[id=83204] for RecordBin[size=0, full=false, 
isComplete=false, id=4022]

10:13:46 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1066612 to 
RecordBin[size=1, full=false, isComplete=false, id=4022]

10:13:48 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1066896 to 
RecordBin[size=2, full=false, isComplete=false, id=4022]

10:13:48 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1066896 to 
RecordBin[size=3, full=false, isComplete=false, id=4022]

10:13:49 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID 
{"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
 for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e]

10:13:49 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1067051 to 
RecordBin[size=3, full=false, isComplete=false, id=4022]

10:13:49 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1067051 to 
RecordBin[size=4, full=false, isComplete=false, id=4022]

10:13:52 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1067254 to 
RecordBin[size=7, full=false, isComplete=true, id=4022]

10:13:53 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID 
{"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
 for FlowFile[filename=b7f4498d-647a-46d1-ad9f-badaed8591f8]

10:13:53 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1067395 to 
RecordBin[size=0, full=false, isComplete=false, id=4023]

10:13:53 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Created OutputStream using 
session StandardProcessSession[id=83205] for RecordBin[size=0, full=false, 
isComplete=false, id=4023]

10:13:53 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1067395 to 
RecordBin[size=1, full=false, isComplete=false, id=4023]

10:13:54 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1068472 to 
RecordBin[size=1, full=false, isComplete=false, id=4023]

10:13:54 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1068472 to 
RecordBin[size=2, full=false, isComplete=false, id=4023]

10:13:55 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID 
{"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
 for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e]

10:13:55 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1068597 to 
RecordBin[size=2, full=false, isComplete=false, id=4023]

10:13:55 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1068597 to 
RecordBin[size=3, full=false, isComplete=false, id=4023]

10:13:58 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] RecordBin[size=6, 
full=false, isComplete=false, id=4023] is now expired. Completing bin.

10:13:58 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Marked RecordBin[size=6, 
full=false, isComplete=true, id=4023] as complete because complete() was called

10:13:58 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Closed Record Writer using 
session StandardProcessSession[id=83205] for RecordBin[size=6, full=false, 
isComplete=true, id=4023]

10:13:58 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Completed bin 
RecordBin[size=6, full=false, isComplete=true, id=4023] with 6 records with 
Merged FlowFile FlowFile[filename=6824b503-82b9-444e-a77e-9b081e878948] using 
input FlowFiles [id=1067395, id=1068472, id=1068597, id=1068663, id=1068800, 
id=1068845]

10:13:58 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1068845 to 
RecordBin[size=6, full=false, isComplete=true, id=4023]

10:14:01 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1069272 to 
RecordBin[size=2, full=false, isComplete=false, id=4024]

10:14:01 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1069272 to 
RecordBin[size=3, full=false, isComplete=false, id=4024]

10:14:02 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID 
{"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
 for FlowFile[filename=b7f4498d-647a-46d1-ad9f-badaed8591f8]

10:14:02 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1069316 to 
RecordBin[size=3, full=false, isComplete=false, id=4024]

10:14:02 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1069316 to 
RecordBin[size=4, full=false, isComplete=false, id=4024]

10:14:05 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] RecordBin[size=6, 
full=false, isComplete=false, id=4024] is now expired. Completing bin.

10:14:05 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Marked RecordBin[size=6, 
full=false, isComplete=true, id=4024] as complete because complete() was called

10:14:05 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Closed Record Writer using 
session StandardProcessSession[id=83206] for RecordBin[size=6, full=false, 
isComplete=true, id=4024]

10:14:05 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Completed bin 
RecordBin[size=6, full=false, isComplete=true, id=4024] with 6 records with 
Merged FlowFile FlowFile[filename=6c13e518-655b-4507-ad6c-d37f6b9c0a5d] using 
input FlowFiles [id=1069044, id=1069103, id=1069272, id=1069316, id=1069451, 
id=1069492]

10:14:05 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1069492 to 
RecordBin[size=6, full=false, isComplete=true, id=4024]

10:14:07 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1072118 to 
RecordBin[size=2, full=false, isComplete=false, id=4025]

10:14:07 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1072118 to 
RecordBin[size=3, full=false, isComplete=false, id=4025]

10:14:08 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID 
{"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
 for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e]

10:14:08 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1072197 to 
RecordBin[size=3, full=false, isComplete=false, id=4025]

10:14:08 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1072197 to 
RecordBin[size=4, full=false, isComplete=false, id=4025]


On Sat, Dec 3, 2022 at 4:21 PM Joe Witt 
<[email protected]<mailto:[email protected]>> wrote:
Hello

Run schedule should be 0.

50 should be the min number of records

5 seconds is the max bin age it sounds like you want.

Start with these changes and let us know what youre seeing.

Thanks

On Fri, Dec 2, 2022 at 10:12 PM Richard Beare 
<[email protected]<mailto:[email protected]>> wrote:
Hi,
I'm having a great deal of trouble configuring the mergerecord processor to 
deliver reasonable performance and I'm not sure where to look to correct it. 
One of my upstream processors requires a single record per flowfile, but I'd 
like to create larger flowfiles before passing to the next stage. The flowfiles 
are independent at this stage so there's no special processing required of the 
merging. I'd like to create flowfiles of about 50 to 100 records.

I have two tests, both running on the same nifi system. One uses synthetic 
data, the other the production data. The performance of the mergerecord 
processor for the synthetic data is as I'd expect, and I can't figure out why 
the  production data is so much slower. Here's the
configuration:

mergerecord has the following settings. Timer driven, 1 concurrent task, 5 
second run schedule, bin packing merge strategy, min records = 1, max records = 
100, max bin age = 4.5 secs, maximum number of bins = 1.

In the case of synthetic data the typical flowfile size is in the range 2 to 
7KB.

The size of flowfiles for the production case is smaller - typically around 1KB.

The structure in the tests is slightly different. Synthetic is (note that I've 
removed the text part):

[ {
  "sampleid" : 1075,
  "typeid" : 98,
  "dct" : "2020-01-25T21:40:25.515Z",
  "filename" : "__tmp/txt/mtsamples-type-98-sample-1075.txt",
  "document" : "Text removed - typically a few hundred words",
  "docid" : "9"
} ]

Production is:
[ {
  "doc_id" : "5.60622895E8",
  "doc_text" : " Text deleted - typically a few hundred words",
  "processing_timestamp" : "2022-11-27T23:56:35.601Z",
  "metadata_x_ocr_applied" : true,
  "metadata_x_parsed_by" : 
"org.apache.tika.parser.DefaultParser;org.apache.tika.parser.microsoft.rtf.RTFParser;org.apache.tika.parser.AutoDetectParser",
  "metadata_content_type" : "application/rtf",
  "metadata_page_count" : null,
  "metadata_creation_date" : null,
  "metadata_last_modified" : null
} ]



I load up the queue feeding the mergerecord processor with several hundred 
individual flowfiles and activate it.

The synthetic data is nicely placed into chunks of 100, with any remainder 
being flushed in a smaller chunk.

The production data is generally bundled into groups of 6 records, sometimes 
less. Certainly it never gets close to 100 records.

Any ideas as to what I should look at to track down the difference?

Thanks

Reply via email to