Yes, embedded avro schema on both. The part that mystifies me most is the different behaviour I see on different test hosts, making me suspect there's some sort of network timeout happening in the background (probably desperation on my part). The problem host is sitting in a hospital datacentre with deliberately restricted network access while my local laptop doesn't have the same restrictions.
On Thu, Dec 22, 2022 at 1:46 AM Bryan Bende <[email protected]> wrote: > That works. Can elaborate on the changes you made to AvroRecordSetWriter > where it went from instant to slow? > > It sounds like the instant case had: > - Schema Access Strategy: Inherit Record Schema > > and slow case has: > - Schema Access Strategy: Use Schema Text > > What about Schema Write Strategy? > Did both tests use Embedded Avro Schema? > > > On Wed, Dec 21, 2022 at 5:03 AM Richard Beare <[email protected]> > wrote: > >> I've converted to a template and attached - is that the best way to show >> configuration? >> >> On Wed, Dec 21, 2022 at 7:40 PM Bryan Bende <[email protected]> wrote: >> >>> Can you show your configuration of JoltTransformRecord and >>> AvroRecordSetWriter? >>> >>> On Wed, Dec 21, 2022 at 2:51 AM Lars Winderling < >>> [email protected]> wrote: >>> >>>> Hi Richard, >>>> >>>> it's not related, but for the logical types timestamp-millis you should >>>> use a "long" instead of a "string" (cf >>>> https://avro.apache.org/docs/1.11.1/specification/#timestamp-millisecond-precision) >>>> afaik. >>>> >>>> Best, Lars >>>> >>>> On 21 December 2022 08:29:54 CET, Richard Beare < >>>> [email protected]> wrote: >>>>> >>>>> I have found a way to force the schema to be used, but I've missed >>>>> something in my configuration. When I use a default generic avro writer in >>>>> my jolttransformrecord processor the queue of 259 entries (about 1.8M) is >>>>> processed instantly. >>>>> If I configure my avrowriter to use the schema text property and paste >>>>> the following into the schema text field, the performance is terrible, >>>>> taking tens of minutes to empty the same queue. Both are on the same 25ms >>>>> run duration. I notice that even a "run once" of that processor. I did not >>>>> notice the same behavior on my laptop. Is there likely to be some sort of >>>>> querying remote sites going on behind the scenes that my server is failing >>>>> to access due to firewalls etc? It seems really strange to me that it >>>>> should be so slow with such tiny files, and the only commonality I can >>>>> find >>>>> is the custom schema. Is there something odd about it? >>>>> >>>>> { >>>>> "type": "record", >>>>> "namespace": "cogstack", >>>>> "name": "document", >>>>> "fields": >>>>> [ >>>>> { "name": "doc_id", "type": "string" }, >>>>> { "name": "doc_text", "type": "string", "default": "" }, >>>>> { "name": "processing_timestamp", "type": { "type" : "string", >>>>> "logicalType" : "timestamp-millis" } }, >>>>> { "name": "metadata_x_ocr_applied", "type": "boolean" }, >>>>> { "name": "metadata_x_parsed_by", "type": "string" }, >>>>> { "name": "metadata_content_type", "type": ["null", "string"], >>>>> "default": null }, >>>>> { "name": "metadata_page_count", "type": ["null", "int"], >>>>> "default": null }, >>>>> { "name": "metadata_creation_date", "type": ["null", { "type" : >>>>> "string", "logicalType" : "timestamp-millis" }], "default": null }, >>>>> { "name": "metadata_last_modified", "type": ["null", { "type" : >>>>> "string", "logicalType" : "timestamp-millis" }], "default": null } >>>>> ] >>>>> } >>>>> } >>>>> >>>>> On Wed, Dec 21, 2022 at 2:05 PM Richard Beare <[email protected]> >>>>> wrote: >>>>> >>>>>> I've made progress with Jolt and I think I'm close to achieving what >>>>>> I'm after. I am missing one conceptual step, I think. >>>>>> >>>>>> I rearrange my json so that it conforms to the desired structure and >>>>>> I can then write the results as avro. However, that is generic avro. How >>>>>> do >>>>>> I ensure that I conform to the schema that has been defined for that part >>>>>> of the flow? >>>>>> >>>>>> i.e. The part of the flow I'm replacing with jolt was a groovy script >>>>>> that created a flowfile according to a schema. That schema is below. Is >>>>>> there a way to utilise this definition in the jolttransformrecord >>>>>> processor, either via specification of data types in the transform >>>>>> definition or by telling the avro writer to use that specification. Or >>>>>> am I >>>>>> overthinking things here? >>>>>> Thanks >>>>>> >>>>>> { >>>>>> "type": "record", >>>>>> "name": "document", >>>>>> "fields": >>>>>> [ >>>>>> { "name": "doc_id", "type": "string" }, >>>>>> { "name": "doc_text", "type": "string", "default": "" }, >>>>>> { "name": "processing_timestamp", "type": { "type" : "string", >>>>>> "logicalType" : "timestamp-millis" } }, >>>>>> { "name": "metadata_x_ocr_applied", "type": "boolean" }, >>>>>> { "name": "metadata_x_parsed_by", "type": "string" }, >>>>>> { "name": "metadata_content_type", "type": ["null", "string"], >>>>>> "default": null }, >>>>>> { "name": "metadata_page_count", "type": ["null", "int"], >>>>>> "default": null }, >>>>>> { "name": "metadata_creation_date", "type": ["null", { "type" : >>>>>> "string", "logicalType" : "timestamp-millis" }], "default": null }, >>>>>> { "name": "metadata_last_modified", "type": ["null", { "type" : >>>>>> "string", "logicalType" : "timestamp-millis" }], "default": null } >>>>>> ] >>>>>> } >>>>>> >>>>>> >>>>>> >>>>>> On Wed, Dec 21, 2022 at 9:06 AM Richard Beare < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Thanks - I'll have a look at that. It is helpfully to get guidance >>>>>>> like this when the system is so large. >>>>>>> >>>>>>> On Wed, Dec 21, 2022 at 5:30 AM Matt Burgess <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Thanks Vijay! I agree those processors should do the trick but there >>>>>>>> were things in the transformation between input and desired output >>>>>>>> that I wasn't sure of their origin. If you are setting constants you >>>>>>>> can use either a Shift or Default spec, if you are moving fields >>>>>>>> around you can use a Shift spec, and in general whether you end up >>>>>>>> with one spec or multiple, I find it's easiest to use a Chain spec >>>>>>>> (an >>>>>>>> array of specs) in the processor configuration. You can play around >>>>>>>> with the spec(s) at the Jolt playground [1] >>>>>>>> >>>>>>>> An important difference to note between JoltTransformJSON and >>>>>>>> JoltTransformRecord is that for the former, the spec is applied to >>>>>>>> the >>>>>>>> entire input (and it is entirely read into memory) where in >>>>>>>> JoltTransformRecord the spec is applied to each record. >>>>>>>> >>>>>>>> Regards, >>>>>>>> Matt >>>>>>>> >>>>>>>> [1] http://jolt-demo.appspot.com/#inception >>>>>>>> >>>>>>>> On Tue, Dec 20, 2022 at 10:52 AM Vijay Chhipa <[email protected]> >>>>>>>> wrote: >>>>>>>> > >>>>>>>> > Hi Richard >>>>>>>> > Have you tried JoltTransformJSON or JoltTransformRecord >>>>>>>> > >>>>>>>> > I believe you should be able to do this >>>>>>>> > >>>>>>>> > Quick start here: >>>>>>>> https://community.cloudera.com/t5/Community-Articles/Jolt-quick-reference-for-Nifi-Jolt-Processors/ta-p/244350 >>>>>>>> > >>>>>>>> > >>>>>>>> > On Dec 20, 2022, at 4:13 AM, Richard Beare < >>>>>>>> [email protected]> wrote: >>>>>>>> > >>>>>>>> > Hi Everyone, >>>>>>>> > Still struggling to fix this issue and may need to try some >>>>>>>> different things. >>>>>>>> > >>>>>>>> > What is the recommended way of transforming a record structure? >>>>>>>> At the moment I have a groovy script doing this but the downstream >>>>>>>> processing is very slow, as discussed in the preceding thread. >>>>>>>> > >>>>>>>> > The transformation is very simple - starting structure is: >>>>>>>> > >>>>>>>> > { >>>>>>>> > "result" : { >>>>>>>> > "text" : " document text", >>>>>>>> > "metadata" : { >>>>>>>> > "X-TIKA:Parsed-By": [ >>>>>>>> > "org.apache.tika.parser.pdf.PDFParser" >>>>>>>> > ], >>>>>>>> > "X-OCR-Applied" : true, >>>>>>>> > "dcterms:created": "2018;07-24T15:04:51Z", >>>>>>>> > "Content-Type" : "application/pdf", >>>>>>>> > "Page-Count" : 2, >>>>>>>> > }, >>>>>>>> > "success" : true, >>>>>>>> > "timestamp" : "2022-12-20T09:02:27.902Z", >>>>>>>> > "processingElapsedTime" : 6 >>>>>>>> > } >>>>>>>> > } >>>>>>>> > >>>>>>>> > >>>>>>>> > final structure is': >>>>>>>> > >>>>>>>> > [ { >>>>>>>> > "doc_id" : 58, >>>>>>>> > "doc_text" : " ", >>>>>>>> > "processing_timestamp" : "2022-12-20T09:02:27.902Z", >>>>>>>> > "metadata_x_ocr_applies" : true, >>>>>>>> > "metadata_x_parsed_by" : "org.apache.tika.parser.pdf.PDFParser", >>>>>>>> > "metadata_content_type" : "application/pdf", >>>>>>>> > "metadata_page_count" : 1 >>>>>>>> > "metadata_creation_date": null, >>>>>>>> > "metadata_last_modified: nill >>>>>>>> > }] >>>>>>>> > >>>>>>>> > So a kind of flattening of the structure. Is there a processor I >>>>>>>> should be using to do this instead of a groovy script? >>>>>>>> > >>>>>>>> > Thanks >>>>>>>> > >>>>>>>> > On Wed, Dec 14, 2022 at 7:57 AM Richard Beare < >>>>>>>> [email protected]> wrote: >>>>>>>> >> >>>>>>>> >> Any thoughts on this? Are there some extra steps required when >>>>>>>> creating an avro file from a user defined schema? >>>>>>>> >> >>>>>>>> >> On Thu, Dec 8, 2022 at 2:56 PM Richard Beare < >>>>>>>> [email protected]> wrote: >>>>>>>> >>> >>>>>>>> >>> Here's another result that I think suggests there's something >>>>>>>> wrong with the avro files created by the groovy script, although I >>>>>>>> can't >>>>>>>> see what the problem might be. >>>>>>>> >>> >>>>>>>> >>> The test is as follows. Output of the groovy script creating >>>>>>>> avro files is passed to convertrecord, configured with an avro reader >>>>>>>> and >>>>>>>> json writer. This is slow. The json output is then converted back to >>>>>>>> avro >>>>>>>> with another convertrecord processor, configured with a jsontreereader >>>>>>>> and >>>>>>>> an avro writer - this is fast, instantly emptying the queue. The >>>>>>>> result of >>>>>>>> that is fed into the previously problematic merge processor which works >>>>>>>> exactly as expected, producing flowfiles with 100 records each. >>>>>>>> >>> >>>>>>>> >>> The difference I can see between the two flow files is the way >>>>>>>> in which the schema is specified. Perhaps some extras are required in >>>>>>>> the >>>>>>>> groovy file to set that up? >>>>>>>> >>> >>>>>>>> >>> The slow one has: >>>>>>>> >>> >>>>>>>> >>> {"type":"record","name":"document", "fields":[{ >>>>>>>> >>> >>>>>>>> >>> The fast one >>>>>>>> >>> >>>>>>>> {"type":"record","name":"nifiRecord","namespace":"org.apache.nifi","fields": >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> >>> Initial characters are also slightly different. >>>>>>>> >>> Slow one: >>>>>>>> >>> >>>>>>>> >>> 0000000 O b j 001 002 026 a v r o . s c >>>>>>>> h e m >>>>>>>> >>> 0000020 a 346 \n { " t y p e " : " r >>>>>>>> e c o >>>>>>>> >>> >>>>>>>> >>> Fast one >>>>>>>> >>> >>>>>>>> >>> 0000000 O b j 001 004 026 a v r o . s c >>>>>>>> h e m >>>>>>>> >>> 0000020 a 362 \b { " t y p e " : " r >>>>>>>> e c o >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> >>> The groovy script is >>>>>>>> >>> CogStack-NiFi/parse-tika-result-json-to-avro.groovy at master · >>>>>>>> CogStack/CogStack-NiFi · GitHub >>>>>>>> >>> >>>>>>>> >>> The schema is >>>>>>>> >>> CogStack-NiFi/document.avsc at master · CogStack/CogStack-NiFi >>>>>>>> · GitHub >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> >>> On Thu, Dec 8, 2022 at 1:59 PM Richard Beare < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>> >>>>>>>> >>>> I'm diving into the convertrecord tests a bit deeper on the >>>>>>>> production server. >>>>>>>> >>>> >>>>>>>> >>>> The first test case - 259 documents, total of 1M when in avro >>>>>>>> format in the input queue to the convert record processor. These avro >>>>>>>> files >>>>>>>> were not created by the groovy script - they start life as a database >>>>>>>> query >>>>>>>> and the text field is in one of the columns. The convertrecord >>>>>>>> processor >>>>>>>> runs very quickly - click start, press refresh and it is done. The avro >>>>>>>> ends up like this: >>>>>>>> >>>> >>>>>>>> >>>> [ { >>>>>>>> >>>> "sampleid" : 1075, >>>>>>>> >>>> "typeid" : 98, >>>>>>>> >>>> "dct" : "2020-01-25T21:40:25.515Z", >>>>>>>> >>>> "filename" : "__tmp/txt/mtsamples-type-98-sample-1075.txt", >>>>>>>> >>>> "document" : "Text removed", >>>>>>>> >>>> "docid" : "9" >>>>>>>> >>>> } ] >>>>>>>> >>>> >>>>>>>> >>>> In the second test, where the text fields are extracted from >>>>>>>> pdf tika before avro files are created by the groovy script (from the >>>>>>>> tika >>>>>>>> json output), the total queue size for the 259 documents is larger - >>>>>>>> 1.77MB, and the performance is very different - press start, click >>>>>>>> refresh >>>>>>>> and only two flowfiles are processed. >>>>>>>> >>>> >>>>>>>> >>>> [ { >>>>>>>> >>>> "doc_id" : "70", >>>>>>>> >>>> "doc_text" : "text removed", >>>>>>>> >>>> "processing_timestamp" : "2022-12-07T23:09:52.354Z", >>>>>>>> >>>> "metadata_x_ocr_applied" : true, >>>>>>>> >>>> "metadata_x_parsed_by" : >>>>>>>> "org.apache.tika.parser.pdf.PDFParser", >>>>>>>> >>>> "metadata_content_type" : "application/pdf", >>>>>>>> >>>> "metadata_page_count" : 1, >>>>>>>> >>>> "metadata_creation_date" : null, >>>>>>>> >>>> "metadata_last_modified" : null >>>>>>>> >>>> } ] >>>>>>>> >>>> >>>>>>>> >>>> I've noticed that the second one has a content.type attribute >>>>>>>> of 'application/json' which doesn't seem right and doesn't match the >>>>>>>> fast >>>>>>>> case. I'll see what happens if I change that. >>>>>>>> >>>> >>>>>>>> >>>> On Thu, Dec 8, 2022 at 9:39 AM Richard Beare < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>> >>>>>>>> >>>>> Hi All, >>>>>>>> >>>>> Some progress on debugging options. I've found a flow that >>>>>>>> exhibits the problem using synthetic data. However the results are host >>>>>>>> dependent. On my laptop a "run-once" click of merge record gives me two >>>>>>>> flowfiles of 100 records, while the same flow on the production server >>>>>>>> produces several much smaller flowfiles. This makes me think that >>>>>>>> something >>>>>>>> funny is happening with my storage setup that I'll need to chase. >>>>>>>> >>>>> >>>>>>>> >>>>> I had tested the convertrecord option (simply >>>>>>>> avroreader->avrowriter) and it did seem slow, but I'll investigate this >>>>>>>> further as it may be related to my storage issue. >>>>>>>> >>>>> >>>>>>>> >>>>> >>>>>>>> >>>>> >>>>>>>> >>>>> On Thu, Dec 8, 2022 at 1:23 AM Mark Payne < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>> >>>>>>>> >>>>>> > Is there something about this structure that is likely to >>>>>>>> be causing the problem? Could there be other issues with the avro >>>>>>>> generated >>>>>>>> by the script? >>>>>>>> >>>>>> >>>>>>>> >>>>>> I don’t think the structure should matter. And as long as >>>>>>>> the avro produced is proper Avro, I don’t think it should matter. >>>>>>>> Unless >>>>>>>> perhaps there’s some issue with the Avro library itself that’s causing >>>>>>>> it >>>>>>>> to take a really long time to parse the Avro or something? I’d be >>>>>>>> curious - >>>>>>>> if you take the output of your script and then you run it through a >>>>>>>> ConvertRecord (Avro Reader -> Json Writer) is the ConvertRecord fast? >>>>>>>> Or is >>>>>>>> it really slow to process it? >>>>>>>> >>>>>> >>>>>>>> >>>>>> On Dec 5, 2022, at 5:58 AM, Richard Beare < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>> >>>>>>>> >>>>>> Further - I performed another test in which I replaced the >>>>>>>> custom json to avro script with a ConvertRecord processor - merge >>>>>>>> record >>>>>>>> appears to work as expected in that case. >>>>>>>> >>>>>> >>>>>>>> >>>>>> Output of convertrecord looks like this: >>>>>>>> >>>>>> >>>>>>>> >>>>>> [ { >>>>>>>> >>>>>> "text" : " No Alert Found \n\n", >>>>>>>> >>>>>> "metadata" : { >>>>>>>> >>>>>> "X_TIKA_Parsed_By" : null, >>>>>>>> >>>>>> "X_OCR_Applied" : null, >>>>>>>> >>>>>> "Content_Type" : null >>>>>>>> >>>>>> }, >>>>>>>> >>>>>> "success" : true, >>>>>>>> >>>>>> "timestamp" : "2022-12-05T10:49:18.568Z", >>>>>>>> >>>>>> "processingElapsedTime" : 0, >>>>>>>> >>>>>> "doc_id" : "5.60178607E8" >>>>>>>> >>>>>> } ] >>>>>>>> >>>>>> >>>>>>>> >>>>>> while the output of the script looks like: >>>>>>>> >>>>>> >>>>>>>> >>>>>> [ { >>>>>>>> >>>>>> "doc_id" : "5.61996505E8", >>>>>>>> >>>>>> "doc_text" : " No Alert Found \n\n", >>>>>>>> >>>>>> "processing_timestamp" : "2022-11-28T01:16:46.775Z", >>>>>>>> >>>>>> "metadata_x_ocr_applied" : true, >>>>>>>> >>>>>> "metadata_x_parsed_by" : >>>>>>>> "org.apache.tika.parser.DefaultParser;org.apache.tika.parser.microsoft.rtf.RTFParser;org.apache.tika.parser.AutoDetectParser", >>>>>>>> >>>>>> "metadata_content_type" : "application/rtf", >>>>>>>> >>>>>> "metadata_page_count" : null, >>>>>>>> >>>>>> "metadata_creation_date" : null, >>>>>>>> >>>>>> "metadata_last_modified" : null >>>>>>>> >>>>>> } ] >>>>>>>> >>>>>> >>>>>>>> >>>>>> Is there something about this structure that is likely to be >>>>>>>> causing the problem? Could there be other issues with the avro >>>>>>>> generated by >>>>>>>> the script? >>>>>>>> >>>>>> >>>>>>>> >>>>>> On Mon, Dec 5, 2022 at 9:31 PM Richard Beare < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>> >>>>>>>> >>>>>>> I've reset the backpressure to the default >>>>>>>> >>>>>>> >>>>>>>> >>>>>>> This remains something of a mystery. The merge with >>>>>>>> synthetic data happily creates flowfiles with 100 records, and the join >>>>>>>> says "Records merged due to: Bin is full" or "Records merged due to: >>>>>>>> Bin is >>>>>>>> full enough". No timeouts in that case, even with the max bin age at >>>>>>>> 4.5 >>>>>>>> seconds. The resulting flowfiles were about 300K. >>>>>>>> >>>>>>> >>>>>>>> >>>>>>> The real data is doing much the same as before, producing >>>>>>>> flowfiles of about 30K, with 7 records or so. If I increase the >>>>>>>> maximum bin >>>>>>>> age to 30 seconds the size and record count is higher - 12 to 15. >>>>>>>> Nothing >>>>>>>> like the behaviour with synthetic data, where the 100 record flowfiles >>>>>>>> are >>>>>>>> created almost instantly. Joins are always due to bin age. >>>>>>>> >>>>>>> >>>>>>>> >>>>>>> Can the problem relate to the structure of the avro files? >>>>>>>> Any way to dive into that? Everything else about the mergerecord >>>>>>>> settings >>>>>>>> appear the same, so I can't see an explanation as to why the behaviour >>>>>>>> is >>>>>>>> different on the same hardware. >>>>>>>> >>>>>>> >>>>>>>> >>>>>>> >>>>>>>> >>>>>>> >>>>>>>> >>>>>>> >>>>>>>> >>>>>>> >>>>>>>> >>>>>>> >>>>>>>> >>>>>>> >>>>>>>> >>>>>>> >>>>>>>> >>>>>>> On Mon, Dec 5, 2022 at 2:09 AM Mark Payne < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Hey Richard, >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> So a few things that I’ve done/looked at. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> I generated some Avro data (random JSON that I downloaded >>>>>>>> from a Random JSON Generator and then converted to Avro). >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> I then ran this avro data into both the MergeRecord >>>>>>>> processors. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Firstly, I noticed that both are very slow. Found that was >>>>>>>> because Run Schedule was set to 5 seconds. This should *ALWAYS* be 0 >>>>>>>> secs >>>>>>>> for MergeRecord. And really for basically all processors except for the >>>>>>>> first one in the flow. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> I also notice that you have backpressure set on your >>>>>>>> connections to 40,000 FlowFiles and 4 GB. This can significantly slow >>>>>>>> things down. If you have performance concerns you definitely want >>>>>>>> backpressure set back to the default of 10,000 FlowFiles. Otherwise, >>>>>>>> as the >>>>>>>> queues fill up they start “swapping out” FlowFiles to disk, and this >>>>>>>> can >>>>>>>> significantly slow things down. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> I noticed that MergeRecord is set to 1 concurrent task. >>>>>>>> Probably worth considering increasing that, if performance is a >>>>>>>> concern. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> That said, I am seeing nice, full bins of 100 records >>>>>>>> merged from each of the MergeRecord processors. >>>>>>>> >>>>>>>> So it is certainly possible that if you’re seeing smaller >>>>>>>> bins it’s becuase you’re timing out. The 4.5 seconds timeout is quite >>>>>>>> short. Have you tried increasing that to say 30 seconds to see if it >>>>>>>> gives >>>>>>>> you larger bins? >>>>>>>> >>>>>>>> I also recommend that you take a look at the data >>>>>>>> provenance to see why it’s creating the bins. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> If unclear how to do that: >>>>>>>> >>>>>>>> Right-click on the MergeRecord processor >>>>>>>> >>>>>>>> Go to View data provenance >>>>>>>> >>>>>>>> Scroll down the list until you see a “JOIN” event type. >>>>>>>> You can ignore the ATTRIBUTES_MODIFIED and DROP events for now. >>>>>>>> >>>>>>>> Click the ‘i’ icon on the left-hand side. >>>>>>>> >>>>>>>> This will show you details about the merge. In the Details >>>>>>>> tab, if you scroll down, it will show you a Details field, which tells >>>>>>>> you >>>>>>>> why the data was merged. It should either say: "Records Merged due to: >>>>>>>> Bin >>>>>>>> has reached Max Bin Age” or “ Records Merged due to: Bin is full” >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> If it is due to Max Bin Age reached, then I’d recommend >>>>>>>> increasing number of concurrent tasks, reducing backpressure to no more >>>>>>>> than 10,000 FlowFiles in the queue, and/or increasing the Max Bin Age. >>>>>>>> >>>>>>>> Also worth asking - what kind of machines is this running >>>>>>>> on? A 64 core VM with 1 TB volume will, of course, run MUCH differently >>>>>>>> than a 4 core VM with a 10 GB volume, especially in the cloud. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> If still having trouble, let me know what the provenance >>>>>>>> tells you about the reason for merging the data, and we can go from >>>>>>>> there. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Thanks! >>>>>>>> >>>>>>>> -Mark >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Dec 3, 2022, at 4:38 PM, Mark Payne < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Richard, >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> I think just the flow structure shoudl be sufficient. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Thanks >>>>>>>> >>>>>>>> -Mark >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Dec 3, 2022, at 4:32 PM, Richard Beare < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Thanks for responding, >>>>>>>> >>>>>>>> I re-tested with max bins = 2, but the behaviour remained >>>>>>>> the same. I can easily share a version of the functioning workflow (and >>>>>>>> data), which is part of a public project. The problem workflow (which >>>>>>>> shares many of the same components) is part of a health research >>>>>>>> project, >>>>>>>> so more difficult. I definitely can't share any data from that one. Do >>>>>>>> you >>>>>>>> need to see the data or is the overall structure sufficient at this >>>>>>>> point? >>>>>>>> Happy to demonstrate via video conference too. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Thanks >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Sun, Dec 4, 2022 at 1:37 AM Mark Payne < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> Hi Richard, >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> Can you try increasing the Maximum Number of Bins? I >>>>>>>> think there was an issue that was recently addressed in which the merge >>>>>>>> processors had an issue when Max Number of Bins = 1. >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> If you still see the same issue, please provide a copy of >>>>>>>> the flow that can be used to replicate the issue. >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> Thanks >>>>>>>> >>>>>>>>> -Mark >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> On Dec 3, 2022, at 5:21 AM, Richard Beare < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> Hi, >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> Pretty much the same - I seem to end up with flowfiles >>>>>>>> containing about 7 records, presumably always triggered by the timeout. >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> I had thought the timeout needed to be less than the run >>>>>>>> schedule, but it looks like it can be the same. >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> Here's a debug dump >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> 10:13:43 UTC >>>>>>>> >>>>>>>>> DEBUG >>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>>>>> Migrating id=1066297 to RecordBin[size=4, full=false, isComplete=false, >>>>>>>> id=4021] >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> 10:13:43 UTC >>>>>>>> >>>>>>>>> DEBUG >>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>>>>> Transferred id=1066297 to RecordBin[size=5, full=false, >>>>>>>> isComplete=false, >>>>>>>> id=4021] >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> 10:13:44 UTC >>>>>>>> >>>>>>>>> DEBUG >>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got >>>>>>>> Group ID >>>>>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} >>>>>>>> for FlowFile[filename=9e9908f6-b28e-4615-b6c8-4bd163a3bc00] >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> 10:13:44 UTC >>>>>>>> >>>>>>>>> DEBUG >>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>>>>> Migrating id=1066372 to RecordBin[size=5, full=false, isComplete=false, >>>>>>>> id=4021] >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> 10:13:44 UTC >>>>>>>> >>>>>>>>> DEBUG >>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>>>>> Transferred id=1066372 to RecordBin[size=6, full=false, >>>>>>>> isComplete=false, >>>>>>>> id=4021] >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> 10:13:45 UTC >>>>>>>> >>>>>>>>> DEBUG >>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>>>>> Transferred id=1066575 to RecordBin[size=7, full=false, >>>>>>>> isComplete=true, >>>>>>>> id=4021] >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> 10:13:46 UTC >>>>>>>> >>>>>>>>> DEBUG >>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got >>>>>>>> Group ID >>>>>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} >>>>>>>> for FlowFile[filename=9e9908f6-b28e-4615-b6c8-4bd163a3bc00] >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> 10:13:46 UTC >>>>>>>> >>>>>>>>> DEBUG >>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>>>>> Migrating id=1066612 to RecordBin[size=0, full=false, isComplete=false, >>>>>>>> id=4022] >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> 10:13:46 UTC >>>>>>>> >>>>>>>>> DEBUG >>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>>>>> Created OutputStream using session StandardProcessSession[id=83204] for >>>>>>>> RecordBin[size=0, full=false, isComplete=false, id=4022] >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> 10:13:46 UTC >>>>>>>> >>>>>>>>> DEBUG >>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>>>>> Transferred id=1066612 to RecordBin[size=1, full=false, >>>>>>>> isComplete=false, >>>>>>>> id=4022] >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> 10:13:48 UTC >>>>>>>> >>>>>>>>> DEBUG >>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>>>>> Migrating id=1066896 to RecordBin[size=2, full=false, isComplete=false, >>>>>>>> id=4022] >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> 10:13:48 UTC >>>>>>>> >>>>>>>>> DEBUG >>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>>>>> Transferred id=1066896 to RecordBin[size=3, full=false, >>>>>>>> isComplete=false, >>>>>>>> id=4022] >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> 10:13:49 UTC >>>>>>>> >>>>>>>>> DEBUG >>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got >>>>>>>> Group ID >>>>>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} >>>>>>>> for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e] >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> 10:13:49 UTC >>>>>>>> >>>>>>>>> DEBUG >>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>>>>> Migrating id=1067051 to RecordBin[size=3, full=false, isComplete=false, >>>>>>>> id=4022] >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> 10:13:49 UTC >>>>>>>> >>>>>>>>> DEBUG >>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>>>>> Transferred id=1067051 to RecordBin[size=4, full=false, >>>>>>>> isComplete=false, >>>>>>>> id=4022] >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> 10:13:52 UTC >>>>>>>> >>>>>>>>> DEBUG >>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>>>>> Transferred id=1067254 to RecordBin[size=7, full=false, >>>>>>>> isComplete=true, >>>>>>>> id=4022] >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> 10:13:53 UTC >>>>>>>> >>>>>>>>> DEBUG >>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got >>>>>>>> Group ID >>>>>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} >>>>>>>> for FlowFile[filename=b7f4498d-647a-46d1-ad9f-badaed8591f8] >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> 10:13:53 UTC >>>>>>>> >>>>>>>>> DEBUG >>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>>>>> Migrating id=1067395 to RecordBin[size=0, full=false, isComplete=false, >>>>>>>> id=4023] >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> 10:13:53 UTC >>>>>>>> >>>>>>>>> DEBUG >>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>>>>> Created OutputStream using session StandardProcessSession[id=83205] for >>>>>>>> RecordBin[size=0, full=false, isComplete=false, id=4023] >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> 10:13:53 UTC >>>>>>>> >>>>>>>>> DEBUG >>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>>>>> Transferred id=1067395 to RecordBin[size=1, full=false, >>>>>>>> isComplete=false, >>>>>>>> id=4023] >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> 10:13:54 UTC >>>>>>>> >>>>>>>>> DEBUG >>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>>>>> Migrating id=1068472 to RecordBin[size=1, full=false, isComplete=false, >>>>>>>> id=4023] >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> 10:13:54 UTC >>>>>>>> >>>>>>>>> DEBUG >>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>>>>> Transferred id=1068472 to RecordBin[size=2, full=false, >>>>>>>> isComplete=false, >>>>>>>> id=4023] >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> 10:13:55 UTC >>>>>>>> >>>>>>>>> DEBUG >>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got >>>>>>>> Group ID >>>>>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} >>>>>>>> for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e] >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> 10:13:55 UTC >>>>>>>> >>>>>>>>> DEBUG >>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>>>>> Migrating id=1068597 to RecordBin[size=2, full=false, isComplete=false, >>>>>>>> id=4023] >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> 10:13:55 UTC >>>>>>>> >>>>>>>>> DEBUG >>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>>>>> Transferred id=1068597 to RecordBin[size=3, full=false, >>>>>>>> isComplete=false, >>>>>>>> id=4023] >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> 10:13:58 UTC >>>>>>>> >>>>>>>>> DEBUG >>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>>>>> RecordBin[size=6, full=false, isComplete=false, id=4023] is now >>>>>>>> expired. >>>>>>>> Completing bin. >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> 10:13:58 UTC >>>>>>>> >>>>>>>>> DEBUG >>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>>>>> Marked RecordBin[size=6, full=false, isComplete=true, id=4023] as >>>>>>>> complete >>>>>>>> because complete() was called >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> 10:13:58 UTC >>>>>>>> >>>>>>>>> DEBUG >>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>>>>> Closed Record Writer using session StandardProcessSession[id=83205] for >>>>>>>> RecordBin[size=6, full=false, isComplete=true, id=4023] >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> 10:13:58 UTC >>>>>>>> >>>>>>>>> DEBUG >>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>>>>> Completed bin RecordBin[size=6, full=false, isComplete=true, id=4023] >>>>>>>> with >>>>>>>> 6 records with Merged FlowFile >>>>>>>> FlowFile[filename=6824b503-82b9-444e-a77e-9b081e878948] using input >>>>>>>> FlowFiles [id=1067395, id=1068472, id=1068597, id=1068663, id=1068800, >>>>>>>> id=1068845] >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> 10:13:58 UTC >>>>>>>> >>>>>>>>> DEBUG >>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>>>>> Transferred id=1068845 to RecordBin[size=6, full=false, >>>>>>>> isComplete=true, >>>>>>>> id=4023] >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> 10:14:01 UTC >>>>>>>> >>>>>>>>> DEBUG >>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>>>>> Migrating id=1069272 to RecordBin[size=2, full=false, isComplete=false, >>>>>>>> id=4024] >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> 10:14:01 UTC >>>>>>>> >>>>>>>>> DEBUG >>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>>>>> Transferred id=1069272 to RecordBin[size=3, full=false, >>>>>>>> isComplete=false, >>>>>>>> id=4024] >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> 10:14:02 UTC >>>>>>>> >>>>>>>>> DEBUG >>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got >>>>>>>> Group ID >>>>>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} >>>>>>>> for FlowFile[filename=b7f4498d-647a-46d1-ad9f-badaed8591f8] >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> 10:14:02 UTC >>>>>>>> >>>>>>>>> DEBUG >>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>>>>> Migrating id=1069316 to RecordBin[size=3, full=false, isComplete=false, >>>>>>>> id=4024] >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> 10:14:02 UTC >>>>>>>> >>>>>>>>> DEBUG >>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>>>>> Transferred id=1069316 to RecordBin[size=4, full=false, >>>>>>>> isComplete=false, >>>>>>>> id=4024] >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> 10:14:05 UTC >>>>>>>> >>>>>>>>> DEBUG >>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>>>>> RecordBin[size=6, full=false, isComplete=false, id=4024] is now >>>>>>>> expired. >>>>>>>> Completing bin. >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> 10:14:05 UTC >>>>>>>> >>>>>>>>> DEBUG >>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>>>>> Marked RecordBin[size=6, full=false, isComplete=true, id=4024] as >>>>>>>> complete >>>>>>>> because complete() was called >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> 10:14:05 UTC >>>>>>>> >>>>>>>>> DEBUG >>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>>>>> Closed Record Writer using session StandardProcessSession[id=83206] for >>>>>>>> RecordBin[size=6, full=false, isComplete=true, id=4024] >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> 10:14:05 UTC >>>>>>>> >>>>>>>>> DEBUG >>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>>>>> Completed bin RecordBin[size=6, full=false, isComplete=true, id=4024] >>>>>>>> with >>>>>>>> 6 records with Merged FlowFile >>>>>>>> FlowFile[filename=6c13e518-655b-4507-ad6c-d37f6b9c0a5d] using input >>>>>>>> FlowFiles [id=1069044, id=1069103, id=1069272, id=1069316, id=1069451, >>>>>>>> id=1069492] >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> 10:14:05 UTC >>>>>>>> >>>>>>>>> DEBUG >>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>>>>> Transferred id=1069492 to RecordBin[size=6, full=false, >>>>>>>> isComplete=true, >>>>>>>> id=4024] >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> 10:14:07 UTC >>>>>>>> >>>>>>>>> DEBUG >>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>>>>> Migrating id=1072118 to RecordBin[size=2, full=false, isComplete=false, >>>>>>>> id=4025] >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> 10:14:07 UTC >>>>>>>> >>>>>>>>> DEBUG >>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>>>>> Transferred id=1072118 to RecordBin[size=3, full=false, >>>>>>>> isComplete=false, >>>>>>>> id=4025] >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> 10:14:08 UTC >>>>>>>> >>>>>>>>> DEBUG >>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got >>>>>>>> Group ID >>>>>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} >>>>>>>> for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e] >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> 10:14:08 UTC >>>>>>>> >>>>>>>>> DEBUG >>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>>>>> Migrating id=1072197 to RecordBin[size=3, full=false, isComplete=false, >>>>>>>> id=4025] >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> 10:14:08 UTC >>>>>>>> >>>>>>>>> DEBUG >>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>>>>> Transferred id=1072197 to RecordBin[size=4, full=false, >>>>>>>> isComplete=false, >>>>>>>> id=4025] >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> On Sat, Dec 3, 2022 at 4:21 PM Joe Witt < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> Hello >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> Run schedule should be 0. >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> 50 should be the min number of records >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> 5 seconds is the max bin age it sounds like you want. >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> Start with these changes and let us know what youre >>>>>>>> seeing. >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> Thanks >>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>>>> On Fri, Dec 2, 2022 at 10:12 PM Richard Beare < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>>>> >>>>>>>> >>>>>>>>>>> Hi, >>>>>>>> >>>>>>>>>>> I'm having a great deal of trouble configuring the >>>>>>>> mergerecord processor to deliver reasonable performance and I'm not >>>>>>>> sure >>>>>>>> where to look to correct it. One of my upstream processors requires a >>>>>>>> single record per flowfile, but I'd like to create larger flowfiles >>>>>>>> before >>>>>>>> passing to the next stage. The flowfiles are independent at this stage >>>>>>>> so >>>>>>>> there's no special processing required of the merging. I'd like to >>>>>>>> create >>>>>>>> flowfiles of about 50 to 100 records. >>>>>>>> >>>>>>>>>>> >>>>>>>> >>>>>>>>>>> I have two tests, both running on the same nifi system. >>>>>>>> One uses synthetic data, the other the production data. The >>>>>>>> performance of >>>>>>>> the mergerecord processor for the synthetic data is as I'd expect, and >>>>>>>> I >>>>>>>> can't figure out why the production data is so much slower. Here's the >>>>>>>> >>>>>>>>>>> configuration: >>>>>>>> >>>>>>>>>>> >>>>>>>> >>>>>>>>>>> mergerecord has the following settings. Timer driven, 1 >>>>>>>> concurrent task, 5 second run schedule, bin packing merge strategy, min >>>>>>>> records = 1, max records = 100, max bin age = 4.5 secs, maximum number >>>>>>>> of >>>>>>>> bins = 1. >>>>>>>> >>>>>>>>>>> >>>>>>>> >>>>>>>>>>> In the case of synthetic data the typical flowfile size >>>>>>>> is in the range 2 to 7KB. >>>>>>>> >>>>>>>>>>> >>>>>>>> >>>>>>>>>>> The size of flowfiles for the production case is >>>>>>>> smaller - typically around 1KB. >>>>>>>> >>>>>>>>>>> >>>>>>>> >>>>>>>>>>> The structure in the tests is slightly different. >>>>>>>> Synthetic is (note that I've removed the text part): >>>>>>>> >>>>>>>>>>> >>>>>>>> >>>>>>>>>>> [ { >>>>>>>> >>>>>>>>>>> "sampleid" : 1075, >>>>>>>> >>>>>>>>>>> "typeid" : 98, >>>>>>>> >>>>>>>>>>> "dct" : "2020-01-25T21:40:25.515Z", >>>>>>>> >>>>>>>>>>> "filename" : >>>>>>>> "__tmp/txt/mtsamples-type-98-sample-1075.txt", >>>>>>>> >>>>>>>>>>> "document" : "Text removed - typically a few hundred >>>>>>>> words", >>>>>>>> >>>>>>>>>>> "docid" : "9" >>>>>>>> >>>>>>>>>>> } ] >>>>>>>> >>>>>>>>>>> >>>>>>>> >>>>>>>>>>> Production is: >>>>>>>> >>>>>>>>>>> [ { >>>>>>>> >>>>>>>>>>> "doc_id" : "5.60622895E8", >>>>>>>> >>>>>>>>>>> "doc_text" : " Text deleted - typically a few hundred >>>>>>>> words", >>>>>>>> >>>>>>>>>>> "processing_timestamp" : "2022-11-27T23:56:35.601Z", >>>>>>>> >>>>>>>>>>> "metadata_x_ocr_applied" : true, >>>>>>>> >>>>>>>>>>> "metadata_x_parsed_by" : >>>>>>>> "org.apache.tika.parser.DefaultParser;org.apache.tika.parser.microsoft.rtf.RTFParser;org.apache.tika.parser.AutoDetectParser", >>>>>>>> >>>>>>>>>>> "metadata_content_type" : "application/rtf", >>>>>>>> >>>>>>>>>>> "metadata_page_count" : null, >>>>>>>> >>>>>>>>>>> "metadata_creation_date" : null, >>>>>>>> >>>>>>>>>>> "metadata_last_modified" : null >>>>>>>> >>>>>>>>>>> } ] >>>>>>>> >>>>>>>>>>> >>>>>>>> >>>>>>>>>>> >>>>>>>> >>>>>>>>>>> >>>>>>>> >>>>>>>>>>> I load up the queue feeding the mergerecord processor >>>>>>>> with several hundred individual flowfiles and activate it. >>>>>>>> >>>>>>>>>>> >>>>>>>> >>>>>>>>>>> The synthetic data is nicely placed into chunks of 100, >>>>>>>> with any remainder being flushed in a smaller chunk. >>>>>>>> >>>>>>>>>>> >>>>>>>> >>>>>>>>>>> The production data is generally bundled into groups of >>>>>>>> 6 records, sometimes less. Certainly it never gets close to 100 >>>>>>>> records. >>>>>>>> >>>>>>>>>>> >>>>>>>> >>>>>>>>>>> Any ideas as to what I should look at to track down the >>>>>>>> difference? >>>>>>>> >>>>>>>>>>> >>>>>>>> >>>>>>>>>>> Thanks >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>> >>>>>>>> > >>>>>>>> >>>>>>>
