Can you show your configuration of JoltTransformRecord and AvroRecordSetWriter?
On Wed, Dec 21, 2022 at 2:51 AM Lars Winderling <[email protected]> wrote: > Hi Richard, > > it's not related, but for the logical types timestamp-millis you should > use a "long" instead of a "string" (cf > https://avro.apache.org/docs/1.11.1/specification/#timestamp-millisecond-precision) > afaik. > > Best, Lars > > On 21 December 2022 08:29:54 CET, Richard Beare <[email protected]> > wrote: >> >> I have found a way to force the schema to be used, but I've missed >> something in my configuration. When I use a default generic avro writer in >> my jolttransformrecord processor the queue of 259 entries (about 1.8M) is >> processed instantly. >> If I configure my avrowriter to use the schema text property and paste >> the following into the schema text field, the performance is terrible, >> taking tens of minutes to empty the same queue. Both are on the same 25ms >> run duration. I notice that even a "run once" of that processor. I did not >> notice the same behavior on my laptop. Is there likely to be some sort of >> querying remote sites going on behind the scenes that my server is failing >> to access due to firewalls etc? It seems really strange to me that it >> should be so slow with such tiny files, and the only commonality I can find >> is the custom schema. Is there something odd about it? >> >> { >> "type": "record", >> "namespace": "cogstack", >> "name": "document", >> "fields": >> [ >> { "name": "doc_id", "type": "string" }, >> { "name": "doc_text", "type": "string", "default": "" }, >> { "name": "processing_timestamp", "type": { "type" : "string", >> "logicalType" : "timestamp-millis" } }, >> { "name": "metadata_x_ocr_applied", "type": "boolean" }, >> { "name": "metadata_x_parsed_by", "type": "string" }, >> { "name": "metadata_content_type", "type": ["null", "string"], >> "default": null }, >> { "name": "metadata_page_count", "type": ["null", "int"], "default": >> null }, >> { "name": "metadata_creation_date", "type": ["null", { "type" : >> "string", "logicalType" : "timestamp-millis" }], "default": null }, >> { "name": "metadata_last_modified", "type": ["null", { "type" : >> "string", "logicalType" : "timestamp-millis" }], "default": null } >> ] >> } >> } >> >> On Wed, Dec 21, 2022 at 2:05 PM Richard Beare <[email protected]> >> wrote: >> >>> I've made progress with Jolt and I think I'm close to achieving what I'm >>> after. I am missing one conceptual step, I think. >>> >>> I rearrange my json so that it conforms to the desired structure and I >>> can then write the results as avro. However, that is generic avro. How do I >>> ensure that I conform to the schema that has been defined for that part of >>> the flow? >>> >>> i.e. The part of the flow I'm replacing with jolt was a groovy script >>> that created a flowfile according to a schema. That schema is below. Is >>> there a way to utilise this definition in the jolttransformrecord >>> processor, either via specification of data types in the transform >>> definition or by telling the avro writer to use that specification. Or am I >>> overthinking things here? >>> Thanks >>> >>> { >>> "type": "record", >>> "name": "document", >>> "fields": >>> [ >>> { "name": "doc_id", "type": "string" }, >>> { "name": "doc_text", "type": "string", "default": "" }, >>> { "name": "processing_timestamp", "type": { "type" : "string", >>> "logicalType" : "timestamp-millis" } }, >>> { "name": "metadata_x_ocr_applied", "type": "boolean" }, >>> { "name": "metadata_x_parsed_by", "type": "string" }, >>> { "name": "metadata_content_type", "type": ["null", "string"], >>> "default": null }, >>> { "name": "metadata_page_count", "type": ["null", "int"], "default": >>> null }, >>> { "name": "metadata_creation_date", "type": ["null", { "type" : >>> "string", "logicalType" : "timestamp-millis" }], "default": null }, >>> { "name": "metadata_last_modified", "type": ["null", { "type" : >>> "string", "logicalType" : "timestamp-millis" }], "default": null } >>> ] >>> } >>> >>> >>> >>> On Wed, Dec 21, 2022 at 9:06 AM Richard Beare <[email protected]> >>> wrote: >>> >>>> Thanks - I'll have a look at that. It is helpfully to get guidance like >>>> this when the system is so large. >>>> >>>> On Wed, Dec 21, 2022 at 5:30 AM Matt Burgess <[email protected]> >>>> wrote: >>>> >>>>> Thanks Vijay! I agree those processors should do the trick but there >>>>> were things in the transformation between input and desired output >>>>> that I wasn't sure of their origin. If you are setting constants you >>>>> can use either a Shift or Default spec, if you are moving fields >>>>> around you can use a Shift spec, and in general whether you end up >>>>> with one spec or multiple, I find it's easiest to use a Chain spec (an >>>>> array of specs) in the processor configuration. You can play around >>>>> with the spec(s) at the Jolt playground [1] >>>>> >>>>> An important difference to note between JoltTransformJSON and >>>>> JoltTransformRecord is that for the former, the spec is applied to the >>>>> entire input (and it is entirely read into memory) where in >>>>> JoltTransformRecord the spec is applied to each record. >>>>> >>>>> Regards, >>>>> Matt >>>>> >>>>> [1] http://jolt-demo.appspot.com/#inception >>>>> >>>>> On Tue, Dec 20, 2022 at 10:52 AM Vijay Chhipa <[email protected]> >>>>> wrote: >>>>> > >>>>> > Hi Richard >>>>> > Have you tried JoltTransformJSON or JoltTransformRecord >>>>> > >>>>> > I believe you should be able to do this >>>>> > >>>>> > Quick start here: >>>>> https://community.cloudera.com/t5/Community-Articles/Jolt-quick-reference-for-Nifi-Jolt-Processors/ta-p/244350 >>>>> > >>>>> > >>>>> > On Dec 20, 2022, at 4:13 AM, Richard Beare <[email protected]> >>>>> wrote: >>>>> > >>>>> > Hi Everyone, >>>>> > Still struggling to fix this issue and may need to try some >>>>> different things. >>>>> > >>>>> > What is the recommended way of transforming a record structure? At >>>>> the moment I have a groovy script doing this but the downstream processing >>>>> is very slow, as discussed in the preceding thread. >>>>> > >>>>> > The transformation is very simple - starting structure is: >>>>> > >>>>> > { >>>>> > "result" : { >>>>> > "text" : " document text", >>>>> > "metadata" : { >>>>> > "X-TIKA:Parsed-By": [ >>>>> > "org.apache.tika.parser.pdf.PDFParser" >>>>> > ], >>>>> > "X-OCR-Applied" : true, >>>>> > "dcterms:created": "2018;07-24T15:04:51Z", >>>>> > "Content-Type" : "application/pdf", >>>>> > "Page-Count" : 2, >>>>> > }, >>>>> > "success" : true, >>>>> > "timestamp" : "2022-12-20T09:02:27.902Z", >>>>> > "processingElapsedTime" : 6 >>>>> > } >>>>> > } >>>>> > >>>>> > >>>>> > final structure is': >>>>> > >>>>> > [ { >>>>> > "doc_id" : 58, >>>>> > "doc_text" : " ", >>>>> > "processing_timestamp" : "2022-12-20T09:02:27.902Z", >>>>> > "metadata_x_ocr_applies" : true, >>>>> > "metadata_x_parsed_by" : "org.apache.tika.parser.pdf.PDFParser", >>>>> > "metadata_content_type" : "application/pdf", >>>>> > "metadata_page_count" : 1 >>>>> > "metadata_creation_date": null, >>>>> > "metadata_last_modified: nill >>>>> > }] >>>>> > >>>>> > So a kind of flattening of the structure. Is there a processor I >>>>> should be using to do this instead of a groovy script? >>>>> > >>>>> > Thanks >>>>> > >>>>> > On Wed, Dec 14, 2022 at 7:57 AM Richard Beare < >>>>> [email protected]> wrote: >>>>> >> >>>>> >> Any thoughts on this? Are there some extra steps required when >>>>> creating an avro file from a user defined schema? >>>>> >> >>>>> >> On Thu, Dec 8, 2022 at 2:56 PM Richard Beare < >>>>> [email protected]> wrote: >>>>> >>> >>>>> >>> Here's another result that I think suggests there's something >>>>> wrong with the avro files created by the groovy script, although I can't >>>>> see what the problem might be. >>>>> >>> >>>>> >>> The test is as follows. Output of the groovy script creating avro >>>>> files is passed to convertrecord, configured with an avro reader and json >>>>> writer. This is slow. The json output is then converted back to avro with >>>>> another convertrecord processor, configured with a jsontreereader and an >>>>> avro writer - this is fast, instantly emptying the queue. The result of >>>>> that is fed into the previously problematic merge processor which works >>>>> exactly as expected, producing flowfiles with 100 records each. >>>>> >>> >>>>> >>> The difference I can see between the two flow files is the way in >>>>> which the schema is specified. Perhaps some extras are required in the >>>>> groovy file to set that up? >>>>> >>> >>>>> >>> The slow one has: >>>>> >>> >>>>> >>> {"type":"record","name":"document", "fields":[{ >>>>> >>> >>>>> >>> The fast one >>>>> >>> >>>>> {"type":"record","name":"nifiRecord","namespace":"org.apache.nifi","fields": >>>>> >>> >>>>> >>> >>>>> >>> Initial characters are also slightly different. >>>>> >>> Slow one: >>>>> >>> >>>>> >>> 0000000 O b j 001 002 026 a v r o . s c h >>>>> e m >>>>> >>> 0000020 a 346 \n { " t y p e " : " r e >>>>> c o >>>>> >>> >>>>> >>> Fast one >>>>> >>> >>>>> >>> 0000000 O b j 001 004 026 a v r o . s c h >>>>> e m >>>>> >>> 0000020 a 362 \b { " t y p e " : " r e >>>>> c o >>>>> >>> >>>>> >>> >>>>> >>> The groovy script is >>>>> >>> CogStack-NiFi/parse-tika-result-json-to-avro.groovy at master · >>>>> CogStack/CogStack-NiFi · GitHub >>>>> >>> >>>>> >>> The schema is >>>>> >>> CogStack-NiFi/document.avsc at master · CogStack/CogStack-NiFi · >>>>> GitHub >>>>> >>> >>>>> >>> >>>>> >>> On Thu, Dec 8, 2022 at 1:59 PM Richard Beare < >>>>> [email protected]> wrote: >>>>> >>>> >>>>> >>>> I'm diving into the convertrecord tests a bit deeper on the >>>>> production server. >>>>> >>>> >>>>> >>>> The first test case - 259 documents, total of 1M when in avro >>>>> format in the input queue to the convert record processor. These avro >>>>> files >>>>> were not created by the groovy script - they start life as a database >>>>> query >>>>> and the text field is in one of the columns. The convertrecord processor >>>>> runs very quickly - click start, press refresh and it is done. The avro >>>>> ends up like this: >>>>> >>>> >>>>> >>>> [ { >>>>> >>>> "sampleid" : 1075, >>>>> >>>> "typeid" : 98, >>>>> >>>> "dct" : "2020-01-25T21:40:25.515Z", >>>>> >>>> "filename" : "__tmp/txt/mtsamples-type-98-sample-1075.txt", >>>>> >>>> "document" : "Text removed", >>>>> >>>> "docid" : "9" >>>>> >>>> } ] >>>>> >>>> >>>>> >>>> In the second test, where the text fields are extracted from pdf >>>>> tika before avro files are created by the groovy script (from the tika >>>>> json >>>>> output), the total queue size for the 259 documents is larger - 1.77MB, >>>>> and >>>>> the performance is very different - press start, click refresh and only >>>>> two >>>>> flowfiles are processed. >>>>> >>>> >>>>> >>>> [ { >>>>> >>>> "doc_id" : "70", >>>>> >>>> "doc_text" : "text removed", >>>>> >>>> "processing_timestamp" : "2022-12-07T23:09:52.354Z", >>>>> >>>> "metadata_x_ocr_applied" : true, >>>>> >>>> "metadata_x_parsed_by" : "org.apache.tika.parser.pdf.PDFParser", >>>>> >>>> "metadata_content_type" : "application/pdf", >>>>> >>>> "metadata_page_count" : 1, >>>>> >>>> "metadata_creation_date" : null, >>>>> >>>> "metadata_last_modified" : null >>>>> >>>> } ] >>>>> >>>> >>>>> >>>> I've noticed that the second one has a content.type attribute of >>>>> 'application/json' which doesn't seem right and doesn't match the fast >>>>> case. I'll see what happens if I change that. >>>>> >>>> >>>>> >>>> On Thu, Dec 8, 2022 at 9:39 AM Richard Beare < >>>>> [email protected]> wrote: >>>>> >>>>> >>>>> >>>>> Hi All, >>>>> >>>>> Some progress on debugging options. I've found a flow that >>>>> exhibits the problem using synthetic data. However the results are host >>>>> dependent. On my laptop a "run-once" click of merge record gives me two >>>>> flowfiles of 100 records, while the same flow on the production server >>>>> produces several much smaller flowfiles. This makes me think that >>>>> something >>>>> funny is happening with my storage setup that I'll need to chase. >>>>> >>>>> >>>>> >>>>> I had tested the convertrecord option (simply >>>>> avroreader->avrowriter) and it did seem slow, but I'll investigate this >>>>> further as it may be related to my storage issue. >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Thu, Dec 8, 2022 at 1:23 AM Mark Payne <[email protected]> >>>>> wrote: >>>>> >>>>>> >>>>> >>>>>> > Is there something about this structure that is likely to be >>>>> causing the problem? Could there be other issues with the avro generated >>>>> by >>>>> the script? >>>>> >>>>>> >>>>> >>>>>> I don’t think the structure should matter. And as long as the >>>>> avro produced is proper Avro, I don’t think it should matter. Unless >>>>> perhaps there’s some issue with the Avro library itself that’s causing it >>>>> to take a really long time to parse the Avro or something? I’d be curious >>>>> - >>>>> if you take the output of your script and then you run it through a >>>>> ConvertRecord (Avro Reader -> Json Writer) is the ConvertRecord fast? Or >>>>> is >>>>> it really slow to process it? >>>>> >>>>>> >>>>> >>>>>> On Dec 5, 2022, at 5:58 AM, Richard Beare < >>>>> [email protected]> wrote: >>>>> >>>>>> >>>>> >>>>>> Further - I performed another test in which I replaced the >>>>> custom json to avro script with a ConvertRecord processor - merge record >>>>> appears to work as expected in that case. >>>>> >>>>>> >>>>> >>>>>> Output of convertrecord looks like this: >>>>> >>>>>> >>>>> >>>>>> [ { >>>>> >>>>>> "text" : " No Alert Found \n\n", >>>>> >>>>>> "metadata" : { >>>>> >>>>>> "X_TIKA_Parsed_By" : null, >>>>> >>>>>> "X_OCR_Applied" : null, >>>>> >>>>>> "Content_Type" : null >>>>> >>>>>> }, >>>>> >>>>>> "success" : true, >>>>> >>>>>> "timestamp" : "2022-12-05T10:49:18.568Z", >>>>> >>>>>> "processingElapsedTime" : 0, >>>>> >>>>>> "doc_id" : "5.60178607E8" >>>>> >>>>>> } ] >>>>> >>>>>> >>>>> >>>>>> while the output of the script looks like: >>>>> >>>>>> >>>>> >>>>>> [ { >>>>> >>>>>> "doc_id" : "5.61996505E8", >>>>> >>>>>> "doc_text" : " No Alert Found \n\n", >>>>> >>>>>> "processing_timestamp" : "2022-11-28T01:16:46.775Z", >>>>> >>>>>> "metadata_x_ocr_applied" : true, >>>>> >>>>>> "metadata_x_parsed_by" : >>>>> "org.apache.tika.parser.DefaultParser;org.apache.tika.parser.microsoft.rtf.RTFParser;org.apache.tika.parser.AutoDetectParser", >>>>> >>>>>> "metadata_content_type" : "application/rtf", >>>>> >>>>>> "metadata_page_count" : null, >>>>> >>>>>> "metadata_creation_date" : null, >>>>> >>>>>> "metadata_last_modified" : null >>>>> >>>>>> } ] >>>>> >>>>>> >>>>> >>>>>> Is there something about this structure that is likely to be >>>>> causing the problem? Could there be other issues with the avro generated >>>>> by >>>>> the script? >>>>> >>>>>> >>>>> >>>>>> On Mon, Dec 5, 2022 at 9:31 PM Richard Beare < >>>>> [email protected]> wrote: >>>>> >>>>>>> >>>>> >>>>>>> I've reset the backpressure to the default >>>>> >>>>>>> >>>>> >>>>>>> This remains something of a mystery. The merge with synthetic >>>>> data happily creates flowfiles with 100 records, and the join says >>>>> "Records >>>>> merged due to: Bin is full" or "Records merged due to: Bin is full >>>>> enough". >>>>> No timeouts in that case, even with the max bin age at 4.5 seconds. The >>>>> resulting flowfiles were about 300K. >>>>> >>>>>>> >>>>> >>>>>>> The real data is doing much the same as before, producing >>>>> flowfiles of about 30K, with 7 records or so. If I increase the maximum >>>>> bin >>>>> age to 30 seconds the size and record count is higher - 12 to 15. Nothing >>>>> like the behaviour with synthetic data, where the 100 record flowfiles are >>>>> created almost instantly. Joins are always due to bin age. >>>>> >>>>>>> >>>>> >>>>>>> Can the problem relate to the structure of the avro files? Any >>>>> way to dive into that? Everything else about the mergerecord settings >>>>> appear the same, so I can't see an explanation as to why the behaviour is >>>>> different on the same hardware. >>>>> >>>>>>> >>>>> >>>>>>> >>>>> >>>>>>> >>>>> >>>>>>> >>>>> >>>>>>> >>>>> >>>>>>> >>>>> >>>>>>> >>>>> >>>>>>> >>>>> >>>>>>> On Mon, Dec 5, 2022 at 2:09 AM Mark Payne < >>>>> [email protected]> wrote: >>>>> >>>>>>>> >>>>> >>>>>>>> Hey Richard, >>>>> >>>>>>>> >>>>> >>>>>>>> So a few things that I’ve done/looked at. >>>>> >>>>>>>> >>>>> >>>>>>>> I generated some Avro data (random JSON that I downloaded >>>>> from a Random JSON Generator and then converted to Avro). >>>>> >>>>>>>> >>>>> >>>>>>>> I then ran this avro data into both the MergeRecord >>>>> processors. >>>>> >>>>>>>> >>>>> >>>>>>>> Firstly, I noticed that both are very slow. Found that was >>>>> because Run Schedule was set to 5 seconds. This should *ALWAYS* be 0 secs >>>>> for MergeRecord. And really for basically all processors except for the >>>>> first one in the flow. >>>>> >>>>>>>> >>>>> >>>>>>>> I also notice that you have backpressure set on your >>>>> connections to 40,000 FlowFiles and 4 GB. This can significantly slow >>>>> things down. If you have performance concerns you definitely want >>>>> backpressure set back to the default of 10,000 FlowFiles. Otherwise, as >>>>> the >>>>> queues fill up they start “swapping out” FlowFiles to disk, and this can >>>>> significantly slow things down. >>>>> >>>>>>>> >>>>> >>>>>>>> I noticed that MergeRecord is set to 1 concurrent task. >>>>> Probably worth considering increasing that, if performance is a concern. >>>>> >>>>>>>> >>>>> >>>>>>>> That said, I am seeing nice, full bins of 100 records merged >>>>> from each of the MergeRecord processors. >>>>> >>>>>>>> So it is certainly possible that if you’re seeing smaller >>>>> bins it’s becuase you’re timing out. The 4.5 seconds timeout is quite >>>>> short. Have you tried increasing that to say 30 seconds to see if it gives >>>>> you larger bins? >>>>> >>>>>>>> I also recommend that you take a look at the data provenance >>>>> to see why it’s creating the bins. >>>>> >>>>>>>> >>>>> >>>>>>>> If unclear how to do that: >>>>> >>>>>>>> Right-click on the MergeRecord processor >>>>> >>>>>>>> Go to View data provenance >>>>> >>>>>>>> Scroll down the list until you see a “JOIN” event type. You >>>>> can ignore the ATTRIBUTES_MODIFIED and DROP events for now. >>>>> >>>>>>>> Click the ‘i’ icon on the left-hand side. >>>>> >>>>>>>> This will show you details about the merge. In the Details >>>>> tab, if you scroll down, it will show you a Details field, which tells you >>>>> why the data was merged. It should either say: "Records Merged due to: Bin >>>>> has reached Max Bin Age” or “ Records Merged due to: Bin is full” >>>>> >>>>>>>> >>>>> >>>>>>>> If it is due to Max Bin Age reached, then I’d recommend >>>>> increasing number of concurrent tasks, reducing backpressure to no more >>>>> than 10,000 FlowFiles in the queue, and/or increasing the Max Bin Age. >>>>> >>>>>>>> Also worth asking - what kind of machines is this running on? >>>>> A 64 core VM with 1 TB volume will, of course, run MUCH differently than a >>>>> 4 core VM with a 10 GB volume, especially in the cloud. >>>>> >>>>>>>> >>>>> >>>>>>>> If still having trouble, let me know what the provenance >>>>> tells you about the reason for merging the data, and we can go from there. >>>>> >>>>>>>> >>>>> >>>>>>>> Thanks! >>>>> >>>>>>>> -Mark >>>>> >>>>>>>> >>>>> >>>>>>>> >>>>> >>>>>>>> On Dec 3, 2022, at 4:38 PM, Mark Payne <[email protected]> >>>>> wrote: >>>>> >>>>>>>> >>>>> >>>>>>>> Richard, >>>>> >>>>>>>> >>>>> >>>>>>>> I think just the flow structure shoudl be sufficient. >>>>> >>>>>>>> >>>>> >>>>>>>> Thanks >>>>> >>>>>>>> -Mark >>>>> >>>>>>>> >>>>> >>>>>>>> >>>>> >>>>>>>> On Dec 3, 2022, at 4:32 PM, Richard Beare < >>>>> [email protected]> wrote: >>>>> >>>>>>>> >>>>> >>>>>>>> Thanks for responding, >>>>> >>>>>>>> I re-tested with max bins = 2, but the behaviour remained the >>>>> same. I can easily share a version of the functioning workflow (and data), >>>>> which is part of a public project. The problem workflow (which shares many >>>>> of the same components) is part of a health research project, so more >>>>> difficult. I definitely can't share any data from that one. Do you need to >>>>> see the data or is the overall structure sufficient at this point? Happy >>>>> to >>>>> demonstrate via video conference too. >>>>> >>>>>>>> >>>>> >>>>>>>> Thanks >>>>> >>>>>>>> >>>>> >>>>>>>> On Sun, Dec 4, 2022 at 1:37 AM Mark Payne < >>>>> [email protected]> wrote: >>>>> >>>>>>>>> >>>>> >>>>>>>>> Hi Richard, >>>>> >>>>>>>>> >>>>> >>>>>>>>> Can you try increasing the Maximum Number of Bins? I think >>>>> there was an issue that was recently addressed in which the merge >>>>> processors had an issue when Max Number of Bins = 1. >>>>> >>>>>>>>> >>>>> >>>>>>>>> If you still see the same issue, please provide a copy of >>>>> the flow that can be used to replicate the issue. >>>>> >>>>>>>>> >>>>> >>>>>>>>> Thanks >>>>> >>>>>>>>> -Mark >>>>> >>>>>>>>> >>>>> >>>>>>>>> >>>>> >>>>>>>>> On Dec 3, 2022, at 5:21 AM, Richard Beare < >>>>> [email protected]> wrote: >>>>> >>>>>>>>> >>>>> >>>>>>>>> Hi, >>>>> >>>>>>>>> >>>>> >>>>>>>>> Pretty much the same - I seem to end up with flowfiles >>>>> containing about 7 records, presumably always triggered by the timeout. >>>>> >>>>>>>>> >>>>> >>>>>>>>> I had thought the timeout needed to be less than the run >>>>> schedule, but it looks like it can be the same. >>>>> >>>>>>>>> >>>>> >>>>>>>>> Here's a debug dump >>>>> >>>>>>>>> >>>>> >>>>>>>>> 10:13:43 UTC >>>>> >>>>>>>>> DEBUG >>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>>>>>> >>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>> Migrating id=1066297 to RecordBin[size=4, full=false, isComplete=false, >>>>> id=4021] >>>>> >>>>>>>>> >>>>> >>>>>>>>> 10:13:43 UTC >>>>> >>>>>>>>> DEBUG >>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>>>>>> >>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>> Transferred id=1066297 to RecordBin[size=5, full=false, isComplete=false, >>>>> id=4021] >>>>> >>>>>>>>> >>>>> >>>>>>>>> 10:13:44 UTC >>>>> >>>>>>>>> DEBUG >>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>>>>>> >>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got >>>>> Group ID >>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} >>>>> for FlowFile[filename=9e9908f6-b28e-4615-b6c8-4bd163a3bc00] >>>>> >>>>>>>>> >>>>> >>>>>>>>> 10:13:44 UTC >>>>> >>>>>>>>> DEBUG >>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>>>>>> >>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>> Migrating id=1066372 to RecordBin[size=5, full=false, isComplete=false, >>>>> id=4021] >>>>> >>>>>>>>> >>>>> >>>>>>>>> 10:13:44 UTC >>>>> >>>>>>>>> DEBUG >>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>>>>>> >>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>> Transferred id=1066372 to RecordBin[size=6, full=false, isComplete=false, >>>>> id=4021] >>>>> >>>>>>>>> >>>>> >>>>>>>>> 10:13:45 UTC >>>>> >>>>>>>>> DEBUG >>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>>>>>> >>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>> Transferred id=1066575 to RecordBin[size=7, full=false, isComplete=true, >>>>> id=4021] >>>>> >>>>>>>>> >>>>> >>>>>>>>> 10:13:46 UTC >>>>> >>>>>>>>> DEBUG >>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>>>>>> >>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got >>>>> Group ID >>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} >>>>> for FlowFile[filename=9e9908f6-b28e-4615-b6c8-4bd163a3bc00] >>>>> >>>>>>>>> >>>>> >>>>>>>>> 10:13:46 UTC >>>>> >>>>>>>>> DEBUG >>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>>>>>> >>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>> Migrating id=1066612 to RecordBin[size=0, full=false, isComplete=false, >>>>> id=4022] >>>>> >>>>>>>>> >>>>> >>>>>>>>> 10:13:46 UTC >>>>> >>>>>>>>> DEBUG >>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>>>>>> >>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Created >>>>> OutputStream using session StandardProcessSession[id=83204] for >>>>> RecordBin[size=0, full=false, isComplete=false, id=4022] >>>>> >>>>>>>>> >>>>> >>>>>>>>> 10:13:46 UTC >>>>> >>>>>>>>> DEBUG >>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>>>>>> >>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>> Transferred id=1066612 to RecordBin[size=1, full=false, isComplete=false, >>>>> id=4022] >>>>> >>>>>>>>> >>>>> >>>>>>>>> 10:13:48 UTC >>>>> >>>>>>>>> DEBUG >>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>>>>>> >>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>> Migrating id=1066896 to RecordBin[size=2, full=false, isComplete=false, >>>>> id=4022] >>>>> >>>>>>>>> >>>>> >>>>>>>>> 10:13:48 UTC >>>>> >>>>>>>>> DEBUG >>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>>>>>> >>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>> Transferred id=1066896 to RecordBin[size=3, full=false, isComplete=false, >>>>> id=4022] >>>>> >>>>>>>>> >>>>> >>>>>>>>> 10:13:49 UTC >>>>> >>>>>>>>> DEBUG >>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>>>>>> >>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got >>>>> Group ID >>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} >>>>> for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e] >>>>> >>>>>>>>> >>>>> >>>>>>>>> 10:13:49 UTC >>>>> >>>>>>>>> DEBUG >>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>>>>>> >>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>> Migrating id=1067051 to RecordBin[size=3, full=false, isComplete=false, >>>>> id=4022] >>>>> >>>>>>>>> >>>>> >>>>>>>>> 10:13:49 UTC >>>>> >>>>>>>>> DEBUG >>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>>>>>> >>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>> Transferred id=1067051 to RecordBin[size=4, full=false, isComplete=false, >>>>> id=4022] >>>>> >>>>>>>>> >>>>> >>>>>>>>> 10:13:52 UTC >>>>> >>>>>>>>> DEBUG >>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>>>>>> >>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>> Transferred id=1067254 to RecordBin[size=7, full=false, isComplete=true, >>>>> id=4022] >>>>> >>>>>>>>> >>>>> >>>>>>>>> 10:13:53 UTC >>>>> >>>>>>>>> DEBUG >>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>>>>>> >>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got >>>>> Group ID >>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} >>>>> for FlowFile[filename=b7f4498d-647a-46d1-ad9f-badaed8591f8] >>>>> >>>>>>>>> >>>>> >>>>>>>>> 10:13:53 UTC >>>>> >>>>>>>>> DEBUG >>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>>>>>> >>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>> Migrating id=1067395 to RecordBin[size=0, full=false, isComplete=false, >>>>> id=4023] >>>>> >>>>>>>>> >>>>> >>>>>>>>> 10:13:53 UTC >>>>> >>>>>>>>> DEBUG >>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>>>>>> >>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Created >>>>> OutputStream using session StandardProcessSession[id=83205] for >>>>> RecordBin[size=0, full=false, isComplete=false, id=4023] >>>>> >>>>>>>>> >>>>> >>>>>>>>> 10:13:53 UTC >>>>> >>>>>>>>> DEBUG >>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>>>>>> >>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>> Transferred id=1067395 to RecordBin[size=1, full=false, isComplete=false, >>>>> id=4023] >>>>> >>>>>>>>> >>>>> >>>>>>>>> 10:13:54 UTC >>>>> >>>>>>>>> DEBUG >>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>>>>>> >>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>> Migrating id=1068472 to RecordBin[size=1, full=false, isComplete=false, >>>>> id=4023] >>>>> >>>>>>>>> >>>>> >>>>>>>>> 10:13:54 UTC >>>>> >>>>>>>>> DEBUG >>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>>>>>> >>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>> Transferred id=1068472 to RecordBin[size=2, full=false, isComplete=false, >>>>> id=4023] >>>>> >>>>>>>>> >>>>> >>>>>>>>> 10:13:55 UTC >>>>> >>>>>>>>> DEBUG >>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>>>>>> >>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got >>>>> Group ID >>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} >>>>> for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e] >>>>> >>>>>>>>> >>>>> >>>>>>>>> 10:13:55 UTC >>>>> >>>>>>>>> DEBUG >>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>>>>>> >>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>> Migrating id=1068597 to RecordBin[size=2, full=false, isComplete=false, >>>>> id=4023] >>>>> >>>>>>>>> >>>>> >>>>>>>>> 10:13:55 UTC >>>>> >>>>>>>>> DEBUG >>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>>>>>> >>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>> Transferred id=1068597 to RecordBin[size=3, full=false, isComplete=false, >>>>> id=4023] >>>>> >>>>>>>>> >>>>> >>>>>>>>> 10:13:58 UTC >>>>> >>>>>>>>> DEBUG >>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>>>>>> >>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>> RecordBin[size=6, full=false, isComplete=false, id=4023] is now expired. >>>>> Completing bin. >>>>> >>>>>>>>> >>>>> >>>>>>>>> 10:13:58 UTC >>>>> >>>>>>>>> DEBUG >>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>>>>>> >>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Marked >>>>> RecordBin[size=6, full=false, isComplete=true, id=4023] as complete >>>>> because >>>>> complete() was called >>>>> >>>>>>>>> >>>>> >>>>>>>>> 10:13:58 UTC >>>>> >>>>>>>>> DEBUG >>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>>>>>> >>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Closed >>>>> Record Writer using session StandardProcessSession[id=83205] for >>>>> RecordBin[size=6, full=false, isComplete=true, id=4023] >>>>> >>>>>>>>> >>>>> >>>>>>>>> 10:13:58 UTC >>>>> >>>>>>>>> DEBUG >>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>>>>>> >>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>> Completed bin RecordBin[size=6, full=false, isComplete=true, id=4023] with >>>>> 6 records with Merged FlowFile >>>>> FlowFile[filename=6824b503-82b9-444e-a77e-9b081e878948] using input >>>>> FlowFiles [id=1067395, id=1068472, id=1068597, id=1068663, id=1068800, >>>>> id=1068845] >>>>> >>>>>>>>> >>>>> >>>>>>>>> 10:13:58 UTC >>>>> >>>>>>>>> DEBUG >>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>>>>>> >>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>> Transferred id=1068845 to RecordBin[size=6, full=false, isComplete=true, >>>>> id=4023] >>>>> >>>>>>>>> >>>>> >>>>>>>>> 10:14:01 UTC >>>>> >>>>>>>>> DEBUG >>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>>>>>> >>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>> Migrating id=1069272 to RecordBin[size=2, full=false, isComplete=false, >>>>> id=4024] >>>>> >>>>>>>>> >>>>> >>>>>>>>> 10:14:01 UTC >>>>> >>>>>>>>> DEBUG >>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>>>>>> >>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>> Transferred id=1069272 to RecordBin[size=3, full=false, isComplete=false, >>>>> id=4024] >>>>> >>>>>>>>> >>>>> >>>>>>>>> 10:14:02 UTC >>>>> >>>>>>>>> DEBUG >>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>>>>>> >>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got >>>>> Group ID >>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} >>>>> for FlowFile[filename=b7f4498d-647a-46d1-ad9f-badaed8591f8] >>>>> >>>>>>>>> >>>>> >>>>>>>>> 10:14:02 UTC >>>>> >>>>>>>>> DEBUG >>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>>>>>> >>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>> Migrating id=1069316 to RecordBin[size=3, full=false, isComplete=false, >>>>> id=4024] >>>>> >>>>>>>>> >>>>> >>>>>>>>> 10:14:02 UTC >>>>> >>>>>>>>> DEBUG >>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>>>>>> >>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>> Transferred id=1069316 to RecordBin[size=4, full=false, isComplete=false, >>>>> id=4024] >>>>> >>>>>>>>> >>>>> >>>>>>>>> 10:14:05 UTC >>>>> >>>>>>>>> DEBUG >>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>>>>>> >>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>> RecordBin[size=6, full=false, isComplete=false, id=4024] is now expired. >>>>> Completing bin. >>>>> >>>>>>>>> >>>>> >>>>>>>>> 10:14:05 UTC >>>>> >>>>>>>>> DEBUG >>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>>>>>> >>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Marked >>>>> RecordBin[size=6, full=false, isComplete=true, id=4024] as complete >>>>> because >>>>> complete() was called >>>>> >>>>>>>>> >>>>> >>>>>>>>> 10:14:05 UTC >>>>> >>>>>>>>> DEBUG >>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>>>>>> >>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Closed >>>>> Record Writer using session StandardProcessSession[id=83206] for >>>>> RecordBin[size=6, full=false, isComplete=true, id=4024] >>>>> >>>>>>>>> >>>>> >>>>>>>>> 10:14:05 UTC >>>>> >>>>>>>>> DEBUG >>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>>>>>> >>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>> Completed bin RecordBin[size=6, full=false, isComplete=true, id=4024] with >>>>> 6 records with Merged FlowFile >>>>> FlowFile[filename=6c13e518-655b-4507-ad6c-d37f6b9c0a5d] using input >>>>> FlowFiles [id=1069044, id=1069103, id=1069272, id=1069316, id=1069451, >>>>> id=1069492] >>>>> >>>>>>>>> >>>>> >>>>>>>>> 10:14:05 UTC >>>>> >>>>>>>>> DEBUG >>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>>>>>> >>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>> Transferred id=1069492 to RecordBin[size=6, full=false, isComplete=true, >>>>> id=4024] >>>>> >>>>>>>>> >>>>> >>>>>>>>> 10:14:07 UTC >>>>> >>>>>>>>> DEBUG >>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>>>>>> >>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>> Migrating id=1072118 to RecordBin[size=2, full=false, isComplete=false, >>>>> id=4025] >>>>> >>>>>>>>> >>>>> >>>>>>>>> 10:14:07 UTC >>>>> >>>>>>>>> DEBUG >>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>>>>>> >>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>> Transferred id=1072118 to RecordBin[size=3, full=false, isComplete=false, >>>>> id=4025] >>>>> >>>>>>>>> >>>>> >>>>>>>>> 10:14:08 UTC >>>>> >>>>>>>>> DEBUG >>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>>>>>> >>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got >>>>> Group ID >>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} >>>>> for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e] >>>>> >>>>>>>>> >>>>> >>>>>>>>> 10:14:08 UTC >>>>> >>>>>>>>> DEBUG >>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>>>>>> >>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>> Migrating id=1072197 to RecordBin[size=3, full=false, isComplete=false, >>>>> id=4025] >>>>> >>>>>>>>> >>>>> >>>>>>>>> 10:14:08 UTC >>>>> >>>>>>>>> DEBUG >>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>> >>>>>>>>> >>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>> Transferred id=1072197 to RecordBin[size=4, full=false, isComplete=false, >>>>> id=4025] >>>>> >>>>>>>>> >>>>> >>>>>>>>> >>>>> >>>>>>>>> On Sat, Dec 3, 2022 at 4:21 PM Joe Witt <[email protected]> >>>>> wrote: >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> Hello >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> Run schedule should be 0. >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> 50 should be the min number of records >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> 5 seconds is the max bin age it sounds like you want. >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> Start with these changes and let us know what youre seeing. >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> Thanks >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> On Fri, Dec 2, 2022 at 10:12 PM Richard Beare < >>>>> [email protected]> wrote: >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> Hi, >>>>> >>>>>>>>>>> I'm having a great deal of trouble configuring the >>>>> mergerecord processor to deliver reasonable performance and I'm not sure >>>>> where to look to correct it. One of my upstream processors requires a >>>>> single record per flowfile, but I'd like to create larger flowfiles before >>>>> passing to the next stage. The flowfiles are independent at this stage so >>>>> there's no special processing required of the merging. I'd like to create >>>>> flowfiles of about 50 to 100 records. >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> I have two tests, both running on the same nifi system. >>>>> One uses synthetic data, the other the production data. The performance of >>>>> the mergerecord processor for the synthetic data is as I'd expect, and I >>>>> can't figure out why the production data is so much slower. Here's the >>>>> >>>>>>>>>>> configuration: >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> mergerecord has the following settings. Timer driven, 1 >>>>> concurrent task, 5 second run schedule, bin packing merge strategy, min >>>>> records = 1, max records = 100, max bin age = 4.5 secs, maximum number of >>>>> bins = 1. >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> In the case of synthetic data the typical flowfile size is >>>>> in the range 2 to 7KB. >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> The size of flowfiles for the production case is smaller - >>>>> typically around 1KB. >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> The structure in the tests is slightly different. >>>>> Synthetic is (note that I've removed the text part): >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> [ { >>>>> >>>>>>>>>>> "sampleid" : 1075, >>>>> >>>>>>>>>>> "typeid" : 98, >>>>> >>>>>>>>>>> "dct" : "2020-01-25T21:40:25.515Z", >>>>> >>>>>>>>>>> "filename" : >>>>> "__tmp/txt/mtsamples-type-98-sample-1075.txt", >>>>> >>>>>>>>>>> "document" : "Text removed - typically a few hundred >>>>> words", >>>>> >>>>>>>>>>> "docid" : "9" >>>>> >>>>>>>>>>> } ] >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> Production is: >>>>> >>>>>>>>>>> [ { >>>>> >>>>>>>>>>> "doc_id" : "5.60622895E8", >>>>> >>>>>>>>>>> "doc_text" : " Text deleted - typically a few hundred >>>>> words", >>>>> >>>>>>>>>>> "processing_timestamp" : "2022-11-27T23:56:35.601Z", >>>>> >>>>>>>>>>> "metadata_x_ocr_applied" : true, >>>>> >>>>>>>>>>> "metadata_x_parsed_by" : >>>>> "org.apache.tika.parser.DefaultParser;org.apache.tika.parser.microsoft.rtf.RTFParser;org.apache.tika.parser.AutoDetectParser", >>>>> >>>>>>>>>>> "metadata_content_type" : "application/rtf", >>>>> >>>>>>>>>>> "metadata_page_count" : null, >>>>> >>>>>>>>>>> "metadata_creation_date" : null, >>>>> >>>>>>>>>>> "metadata_last_modified" : null >>>>> >>>>>>>>>>> } ] >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> I load up the queue feeding the mergerecord processor with >>>>> several hundred individual flowfiles and activate it. >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> The synthetic data is nicely placed into chunks of 100, >>>>> with any remainder being flushed in a smaller chunk. >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> The production data is generally bundled into groups of 6 >>>>> records, sometimes less. Certainly it never gets close to 100 records. >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> Any ideas as to what I should look at to track down the >>>>> difference? >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> Thanks >>>>> >>>>>>>>> >>>>> >>>>>>>>> >>>>> >>>>>>>> >>>>> >>>>>>>> >>>>> >>>>>> >>>>> > >>>>> >>>>
