Thanks for responding, I re-tested with max bins = 2, but the behaviour remained the same. I can easily share a version of the functioning workflow (and data), which is part of a public project. The problem workflow (which shares many of the same components) is part of a health research project, so more difficult. I definitely can't share any data from that one. Do you need to see the data or is the overall structure sufficient at this point? Happy to demonstrate via video conference too.
Thanks On Sun, Dec 4, 2022 at 1:37 AM Mark Payne <[email protected]> wrote: > Hi Richard, > > Can you try increasing the Maximum Number of Bins? I think there was an > issue that was recently addressed in which the merge processors had an > issue when Max Number of Bins = 1. > > If you still see the same issue, please provide a copy of the flow that > can be used to replicate the issue. > > Thanks > -Mark > > > On Dec 3, 2022, at 5:21 AM, Richard Beare <[email protected]> wrote: > > Hi, > > Pretty much the same - I seem to end up with flowfiles containing about 7 > records, presumably always triggered by the timeout. > > I had thought the timeout needed to be less than the run schedule, but it > looks like it can be the same. > > Here's a debug dump > > 10:13:43 UTC > DEBUG > 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > > MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1066297 > to RecordBin[size=4, full=false, isComplete=false, id=4021] > > 10:13:43 UTC > DEBUG > 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > > MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred > id=1066297 to RecordBin[size=5, full=false, isComplete=false, id=4021] > > 10:13:44 UTC > DEBUG > 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > > MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID > {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} > for FlowFile[filename=9e9908f6-b28e-4615-b6c8-4bd163a3bc00] > > 10:13:44 UTC > DEBUG > 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > > MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1066372 > to RecordBin[size=5, full=false, isComplete=false, id=4021] > > 10:13:44 UTC > DEBUG > 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > > MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred > id=1066372 to RecordBin[size=6, full=false, isComplete=false, id=4021] > > 10:13:45 UTC > DEBUG > 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > > MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred > id=1066575 to RecordBin[size=7, full=false, isComplete=true, id=4021] > > 10:13:46 UTC > DEBUG > 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > > MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID > {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} > for FlowFile[filename=9e9908f6-b28e-4615-b6c8-4bd163a3bc00] > > 10:13:46 UTC > DEBUG > 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > > MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1066612 > to RecordBin[size=0, full=false, isComplete=false, id=4022] > > 10:13:46 UTC > DEBUG > 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > > MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Created OutputStream > using session StandardProcessSession[id=83204] for RecordBin[size=0, > full=false, isComplete=false, id=4022] > > 10:13:46 UTC > DEBUG > 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > > MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred > id=1066612 to RecordBin[size=1, full=false, isComplete=false, id=4022] > > 10:13:48 UTC > DEBUG > 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > > MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1066896 > to RecordBin[size=2, full=false, isComplete=false, id=4022] > > 10:13:48 UTC > DEBUG > 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > > MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred > id=1066896 to RecordBin[size=3, full=false, isComplete=false, id=4022] > > 10:13:49 UTC > DEBUG > 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > > MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID > {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} > for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e] > > 10:13:49 UTC > DEBUG > 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > > MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1067051 > to RecordBin[size=3, full=false, isComplete=false, id=4022] > > 10:13:49 UTC > DEBUG > 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > > MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred > id=1067051 to RecordBin[size=4, full=false, isComplete=false, id=4022] > > 10:13:52 UTC > DEBUG > 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > > MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred > id=1067254 to RecordBin[size=7, full=false, isComplete=true, id=4022] > > 10:13:53 UTC > DEBUG > 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > > MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID > {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} > for FlowFile[filename=b7f4498d-647a-46d1-ad9f-badaed8591f8] > > 10:13:53 UTC > DEBUG > 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > > MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1067395 > to RecordBin[size=0, full=false, isComplete=false, id=4023] > > 10:13:53 UTC > DEBUG > 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > > MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Created OutputStream > using session StandardProcessSession[id=83205] for RecordBin[size=0, > full=false, isComplete=false, id=4023] > > 10:13:53 UTC > DEBUG > 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > > MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred > id=1067395 to RecordBin[size=1, full=false, isComplete=false, id=4023] > > 10:13:54 UTC > DEBUG > 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > > MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1068472 > to RecordBin[size=1, full=false, isComplete=false, id=4023] > > 10:13:54 UTC > DEBUG > 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > > MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred > id=1068472 to RecordBin[size=2, full=false, isComplete=false, id=4023] > > 10:13:55 UTC > DEBUG > 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > > MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID > {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} > for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e] > > 10:13:55 UTC > DEBUG > 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > > MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1068597 > to RecordBin[size=2, full=false, isComplete=false, id=4023] > > 10:13:55 UTC > DEBUG > 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > > MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred > id=1068597 to RecordBin[size=3, full=false, isComplete=false, id=4023] > > 10:13:58 UTC > DEBUG > 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > > MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] RecordBin[size=6, > full=false, isComplete=false, id=4023] is now expired. Completing bin. > > 10:13:58 UTC > DEBUG > 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > > MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Marked > RecordBin[size=6, full=false, isComplete=true, id=4023] as complete because > complete() was called > > 10:13:58 UTC > DEBUG > 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > > MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Closed Record Writer > using session StandardProcessSession[id=83205] for RecordBin[size=6, > full=false, isComplete=true, id=4023] > > 10:13:58 UTC > DEBUG > 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > > MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Completed bin > RecordBin[size=6, full=false, isComplete=true, id=4023] with 6 records with > Merged FlowFile FlowFile[filename=6824b503-82b9-444e-a77e-9b081e878948] > using input FlowFiles [id=1067395, id=1068472, id=1068597, id=1068663, > id=1068800, id=1068845] > > 10:13:58 UTC > DEBUG > 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > > MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred > id=1068845 to RecordBin[size=6, full=false, isComplete=true, id=4023] > > 10:14:01 UTC > DEBUG > 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > > MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1069272 > to RecordBin[size=2, full=false, isComplete=false, id=4024] > > 10:14:01 UTC > DEBUG > 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > > MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred > id=1069272 to RecordBin[size=3, full=false, isComplete=false, id=4024] > > 10:14:02 UTC > DEBUG > 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > > MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID > {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} > for FlowFile[filename=b7f4498d-647a-46d1-ad9f-badaed8591f8] > > 10:14:02 UTC > DEBUG > 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > > MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1069316 > to RecordBin[size=3, full=false, isComplete=false, id=4024] > > 10:14:02 UTC > DEBUG > 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > > MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred > id=1069316 to RecordBin[size=4, full=false, isComplete=false, id=4024] > > 10:14:05 UTC > DEBUG > 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > > MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] RecordBin[size=6, > full=false, isComplete=false, id=4024] is now expired. Completing bin. > > 10:14:05 UTC > DEBUG > 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > > MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Marked > RecordBin[size=6, full=false, isComplete=true, id=4024] as complete because > complete() was called > > 10:14:05 UTC > DEBUG > 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > > MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Closed Record Writer > using session StandardProcessSession[id=83206] for RecordBin[size=6, > full=false, isComplete=true, id=4024] > > 10:14:05 UTC > DEBUG > 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > > MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Completed bin > RecordBin[size=6, full=false, isComplete=true, id=4024] with 6 records with > Merged FlowFile FlowFile[filename=6c13e518-655b-4507-ad6c-d37f6b9c0a5d] > using input FlowFiles [id=1069044, id=1069103, id=1069272, id=1069316, > id=1069451, id=1069492] > > 10:14:05 UTC > DEBUG > 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > > MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred > id=1069492 to RecordBin[size=6, full=false, isComplete=true, id=4024] > > 10:14:07 UTC > DEBUG > 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > > MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1072118 > to RecordBin[size=2, full=false, isComplete=false, id=4025] > > 10:14:07 UTC > DEBUG > 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > > MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred > id=1072118 to RecordBin[size=3, full=false, isComplete=false, id=4025] > > 10:14:08 UTC > DEBUG > 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > > MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID > {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} > for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e] > > 10:14:08 UTC > DEBUG > 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > > MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1072197 > to RecordBin[size=3, full=false, isComplete=false, id=4025] > > 10:14:08 UTC > DEBUG > 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 > > MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred > id=1072197 to RecordBin[size=4, full=false, isComplete=false, id=4025] > > > On Sat, Dec 3, 2022 at 4:21 PM Joe Witt <[email protected]> wrote: > >> Hello >> >> Run schedule should be 0. >> >> 50 should be the min number of records >> >> 5 seconds is the max bin age it sounds like you want. >> >> Start with these changes and let us know what youre seeing. >> >> Thanks >> >> On Fri, Dec 2, 2022 at 10:12 PM Richard Beare <[email protected]> >> wrote: >> >>> Hi, >>> I'm having a great deal of trouble configuring the mergerecord processor >>> to deliver reasonable performance and I'm not sure where to look to correct >>> it. One of my upstream processors requires a single record per flowfile, >>> but I'd like to create larger flowfiles before passing to the next stage. >>> The flowfiles are independent at this stage so there's no special >>> processing required of the merging. I'd like to create flowfiles of about >>> 50 to 100 records. >>> >>> I have two tests, both running on the same nifi system. One uses >>> synthetic data, the other the production data. The performance of the >>> mergerecord processor for the synthetic data is as I'd expect, and I can't >>> figure out why the production data is so much slower. Here's the >>> configuration: >>> >>> mergerecord has the following settings. Timer driven, 1 concurrent task, >>> 5 second run schedule, bin packing merge strategy, min records = 1, max >>> records = 100, max bin age = 4.5 secs, maximum number of bins = 1. >>> >>> In the case of synthetic data the typical flowfile size is in the range >>> 2 to 7KB. >>> >>> The size of flowfiles for the production case is smaller - typically >>> around 1KB. >>> >>> The structure in the tests is slightly different. Synthetic is (note >>> that I've removed the text part): >>> >>> [ { >>> "sampleid" : 1075, >>> "typeid" : 98, >>> "dct" : "2020-01-25T21:40:25.515Z", >>> "filename" : "__tmp/txt/mtsamples-type-98-sample-1075.txt", >>> "document" : "Text removed - typically a few hundred words", >>> "docid" : "9" >>> } ] >>> >>> Production is: >>> [ { >>> "doc_id" : "5.60622895E8", >>> "doc_text" : " Text deleted - typically a few hundred words", >>> "processing_timestamp" : "2022-11-27T23:56:35.601Z", >>> "metadata_x_ocr_applied" : true, >>> "metadata_x_parsed_by" : >>> "org.apache.tika.parser.DefaultParser;org.apache.tika.parser.microsoft.rtf.RTFParser;org.apache.tika.parser.AutoDetectParser", >>> "metadata_content_type" : "application/rtf", >>> "metadata_page_count" : null, >>> "metadata_creation_date" : null, >>> "metadata_last_modified" : null >>> } ] >>> >>> >>> >>> I load up the queue feeding the mergerecord processor with several >>> hundred individual flowfiles and activate it. >>> >>> The synthetic data is nicely placed into chunks of 100, with any >>> remainder being flushed in a smaller chunk. >>> >>> The production data is generally bundled into groups of 6 records, >>> sometimes less. Certainly it never gets close to 100 records. >>> >>> Any ideas as to what I should look at to track down the difference? >>> >>> Thanks >>> >> >
