Yes, embedded avro schema on both.

The part that mystifies me most is the different behaviour I see on
different test hosts, making me suspect there's some sort of network
timeout happening in the background (probably desperation on my part). The
problem host is sitting in a hospital datacentre with deliberately
restricted network access while my local laptop doesn't have the same
restrictions.

On Thu, Dec 22, 2022 at 1:46 AM Bryan Bende <[email protected]> wrote:

> That works. Can elaborate on the changes you made to AvroRecordSetWriter
> where it went from instant to slow?
>
> It sounds like the instant case had:
>   - Schema Access Strategy: Inherit Record Schema
>
> and slow case has:
>   - Schema Access Strategy: Use Schema Text
>
> What about Schema Write Strategy?
> Did both tests use Embedded Avro Schema?
>
>
> On Wed, Dec 21, 2022 at 5:03 AM Richard Beare <[email protected]>
> wrote:
>
>> I've converted to a template and attached - is that the best way to show
>> configuration?
>>
>> On Wed, Dec 21, 2022 at 7:40 PM Bryan Bende <[email protected]> wrote:
>>
>>> Can you show your configuration of JoltTransformRecord and
>>> AvroRecordSetWriter?
>>>
>>> On Wed, Dec 21, 2022 at 2:51 AM Lars Winderling <
>>> [email protected]> wrote:
>>>
>>>> Hi Richard,
>>>>
>>>> it's not related, but for the logical types timestamp-millis you should
>>>> use a "long" instead of a "string" (cf
>>>> https://avro.apache.org/docs/1.11.1/specification/#timestamp-millisecond-precision)
>>>>  afaik.
>>>>
>>>> Best, Lars
>>>>
>>>> On 21 December 2022 08:29:54 CET, Richard Beare <
>>>> [email protected]> wrote:
>>>>>
>>>>> I have found a way to force the schema to be used, but I've missed
>>>>> something in my configuration. When I use a default generic avro writer in
>>>>> my jolttransformrecord processor the queue of 259 entries (about 1.8M) is
>>>>> processed instantly.
>>>>> If I configure my avrowriter to use the schema text property and paste
>>>>> the following into the schema text field, the performance is terrible,
>>>>> taking tens of minutes to empty the same queue. Both are on the same 25ms
>>>>> run duration. I notice that even a "run once" of that processor. I did not
>>>>> notice the same behavior on my laptop. Is there likely to be some sort of
>>>>> querying remote sites going on behind the scenes that my server is failing
>>>>> to access due to firewalls etc? It seems really strange to me that it
>>>>> should be so slow with such tiny files, and the only commonality I can 
>>>>> find
>>>>> is the custom schema. Is there something odd about it?
>>>>>
>>>>> {
>>>>>   "type": "record",
>>>>>   "namespace": "cogstack",
>>>>>   "name": "document",
>>>>>   "fields":
>>>>>   [
>>>>>     { "name": "doc_id", "type": "string" },
>>>>>     { "name": "doc_text", "type": "string", "default": "" },
>>>>>     { "name": "processing_timestamp", "type": { "type" : "string",
>>>>> "logicalType" : "timestamp-millis" } },
>>>>>     { "name": "metadata_x_ocr_applied", "type": "boolean" },
>>>>>     { "name": "metadata_x_parsed_by", "type": "string" },
>>>>>     { "name": "metadata_content_type", "type": ["null", "string"],
>>>>> "default": null },
>>>>>     { "name": "metadata_page_count", "type": ["null", "int"],
>>>>> "default": null },
>>>>>     { "name": "metadata_creation_date", "type": ["null", { "type" :
>>>>> "string", "logicalType" : "timestamp-millis" }], "default": null },
>>>>>     { "name": "metadata_last_modified", "type": ["null", { "type" :
>>>>> "string", "logicalType" : "timestamp-millis" }], "default": null }
>>>>>   ]
>>>>> }
>>>>> }
>>>>>
>>>>> On Wed, Dec 21, 2022 at 2:05 PM Richard Beare <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> I've made progress with Jolt and I think I'm close to achieving what
>>>>>> I'm after. I am missing one conceptual step, I think.
>>>>>>
>>>>>> I rearrange my json so that it conforms to the desired structure and
>>>>>> I can then write the results as avro. However, that is generic avro. How 
>>>>>> do
>>>>>> I ensure that I conform to the schema that has been defined for that part
>>>>>> of the flow?
>>>>>>
>>>>>> i.e. The part of the flow I'm replacing with jolt was a groovy script
>>>>>> that created a flowfile according to a schema. That schema is below. Is
>>>>>> there a way to utilise this definition in the jolttransformrecord
>>>>>> processor, either via specification of data types in the transform
>>>>>> definition or by telling the avro writer to use that specification. Or 
>>>>>> am I
>>>>>> overthinking things here?
>>>>>> Thanks
>>>>>>
>>>>>> {
>>>>>>   "type": "record",
>>>>>>   "name": "document",
>>>>>>   "fields":
>>>>>>   [
>>>>>>     { "name": "doc_id", "type": "string" },
>>>>>>     { "name": "doc_text", "type": "string", "default": "" },
>>>>>>     { "name": "processing_timestamp", "type": { "type" : "string",
>>>>>> "logicalType" : "timestamp-millis" } },
>>>>>>     { "name": "metadata_x_ocr_applied", "type": "boolean" },
>>>>>>     { "name": "metadata_x_parsed_by", "type": "string" },
>>>>>>     { "name": "metadata_content_type", "type": ["null", "string"],
>>>>>> "default": null },
>>>>>>     { "name": "metadata_page_count", "type": ["null", "int"],
>>>>>> "default": null },
>>>>>>     { "name": "metadata_creation_date", "type": ["null", { "type" :
>>>>>> "string", "logicalType" : "timestamp-millis" }], "default": null },
>>>>>>     { "name": "metadata_last_modified", "type": ["null", { "type" :
>>>>>> "string", "logicalType" : "timestamp-millis" }], "default": null }
>>>>>>   ]
>>>>>> }
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Dec 21, 2022 at 9:06 AM Richard Beare <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Thanks - I'll have a look at that. It is helpfully to get guidance
>>>>>>> like this when the system is so large.
>>>>>>>
>>>>>>> On Wed, Dec 21, 2022 at 5:30 AM Matt Burgess <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks Vijay! I agree those processors should do the trick but there
>>>>>>>> were things in the transformation between input and desired output
>>>>>>>> that I wasn't sure of their origin. If you are setting constants you
>>>>>>>> can use either a Shift or Default spec, if you are moving fields
>>>>>>>> around you can use a Shift spec, and in general whether you end up
>>>>>>>> with one spec or multiple, I find it's easiest to use a Chain spec
>>>>>>>> (an
>>>>>>>> array of specs) in the processor configuration. You can play around
>>>>>>>> with the spec(s) at the Jolt playground [1]
>>>>>>>>
>>>>>>>> An important difference to note between JoltTransformJSON and
>>>>>>>> JoltTransformRecord is that for the former, the spec is applied to
>>>>>>>> the
>>>>>>>> entire input (and it is entirely read into memory) where in
>>>>>>>> JoltTransformRecord the spec is applied to each record.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Matt
>>>>>>>>
>>>>>>>> [1] http://jolt-demo.appspot.com/#inception
>>>>>>>>
>>>>>>>> On Tue, Dec 20, 2022 at 10:52 AM Vijay Chhipa <[email protected]>
>>>>>>>> wrote:
>>>>>>>> >
>>>>>>>> > Hi Richard
>>>>>>>> > Have you tried JoltTransformJSON or JoltTransformRecord
>>>>>>>> >
>>>>>>>> > I believe you should be able to do this
>>>>>>>> >
>>>>>>>> > Quick start here:
>>>>>>>> https://community.cloudera.com/t5/Community-Articles/Jolt-quick-reference-for-Nifi-Jolt-Processors/ta-p/244350
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > On Dec 20, 2022, at 4:13 AM, Richard Beare <
>>>>>>>> [email protected]> wrote:
>>>>>>>> >
>>>>>>>> > Hi Everyone,
>>>>>>>> > Still struggling to fix this issue and may need to try some
>>>>>>>> different things.
>>>>>>>> >
>>>>>>>> > What is the recommended way of transforming a record structure?
>>>>>>>> At the moment I have a groovy script doing this but the downstream
>>>>>>>> processing is very slow, as discussed in the preceding thread.
>>>>>>>> >
>>>>>>>> > The transformation is very simple - starting structure is:
>>>>>>>> >
>>>>>>>> > {
>>>>>>>> >  "result" : {
>>>>>>>> > "text" : " document text",
>>>>>>>> >   "metadata" : {
>>>>>>>> >      "X-TIKA:Parsed-By": [
>>>>>>>> >      "org.apache.tika.parser.pdf.PDFParser"
>>>>>>>> >      ],
>>>>>>>> >     "X-OCR-Applied" : true,
>>>>>>>> >     "dcterms:created": "2018;07-24T15:04:51Z",
>>>>>>>> >     "Content-Type" : "application/pdf",
>>>>>>>> >     "Page-Count" : 2,
>>>>>>>> >   },
>>>>>>>> > "success" : true,
>>>>>>>> > "timestamp" :  "2022-12-20T09:02:27.902Z",
>>>>>>>> > "processingElapsedTime" : 6
>>>>>>>> > }
>>>>>>>> > }
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > final structure is':
>>>>>>>> >
>>>>>>>> > [ {
>>>>>>>> > "doc_id" : 58,
>>>>>>>> > "doc_text" : "   ",
>>>>>>>> > "processing_timestamp" : "2022-12-20T09:02:27.902Z",
>>>>>>>> > "metadata_x_ocr_applies" : true,
>>>>>>>> > "metadata_x_parsed_by" : "org.apache.tika.parser.pdf.PDFParser",
>>>>>>>> > "metadata_content_type" : "application/pdf",
>>>>>>>> > "metadata_page_count" : 1
>>>>>>>> > "metadata_creation_date": null,
>>>>>>>> > "metadata_last_modified: nill
>>>>>>>> > }]
>>>>>>>> >
>>>>>>>> > So a kind of flattening of the structure. Is there a processor I
>>>>>>>> should be using to do this instead of a groovy script?
>>>>>>>> >
>>>>>>>> > Thanks
>>>>>>>> >
>>>>>>>> > On Wed, Dec 14, 2022 at 7:57 AM Richard Beare <
>>>>>>>> [email protected]> wrote:
>>>>>>>> >>
>>>>>>>> >> Any thoughts on this? Are there some extra steps required when
>>>>>>>> creating an avro file from a user defined schema?
>>>>>>>> >>
>>>>>>>> >> On Thu, Dec 8, 2022 at 2:56 PM Richard Beare <
>>>>>>>> [email protected]> wrote:
>>>>>>>> >>>
>>>>>>>> >>> Here's another result that I think suggests there's something
>>>>>>>> wrong with the avro files created by the groovy script, although I 
>>>>>>>> can't
>>>>>>>> see what the problem might be.
>>>>>>>> >>>
>>>>>>>> >>> The test is as follows. Output of the groovy script creating
>>>>>>>> avro files is passed to convertrecord, configured with an avro reader 
>>>>>>>> and
>>>>>>>> json writer. This is slow. The json output is then converted back to 
>>>>>>>> avro
>>>>>>>> with another convertrecord processor, configured with a jsontreereader 
>>>>>>>> and
>>>>>>>> an avro writer - this is fast, instantly emptying the queue. The 
>>>>>>>> result of
>>>>>>>> that is fed into the previously problematic merge processor which works
>>>>>>>> exactly as expected, producing flowfiles with 100 records each.
>>>>>>>> >>>
>>>>>>>> >>> The difference I can see between the two flow files is the way
>>>>>>>> in which the schema is specified. Perhaps some extras are required in 
>>>>>>>> the
>>>>>>>> groovy file to set that up?
>>>>>>>> >>>
>>>>>>>> >>> The slow one has:
>>>>>>>> >>>
>>>>>>>> >>> {"type":"record","name":"document", "fields":[{
>>>>>>>> >>>
>>>>>>>> >>> The fast one
>>>>>>>> >>>
>>>>>>>> {"type":"record","name":"nifiRecord","namespace":"org.apache.nifi","fields":
>>>>>>>> >>>
>>>>>>>> >>>
>>>>>>>> >>> Initial characters are also slightly different.
>>>>>>>> >>> Slow one:
>>>>>>>> >>>
>>>>>>>> >>> 0000000   O   b   j 001 002 026   a   v   r   o   .   s   c
>>>>>>>>  h   e   m
>>>>>>>> >>> 0000020   a 346  \n   {   "   t   y   p   e   "   :   "   r
>>>>>>>>  e   c   o
>>>>>>>> >>>
>>>>>>>> >>> Fast one
>>>>>>>> >>>
>>>>>>>> >>> 0000000   O   b   j 001 004 026   a   v   r   o   .   s   c
>>>>>>>>  h   e   m
>>>>>>>> >>> 0000020   a 362  \b   {   "   t   y   p   e   "   :   "   r
>>>>>>>>  e   c   o
>>>>>>>> >>>
>>>>>>>> >>>
>>>>>>>> >>> The groovy script is
>>>>>>>> >>> CogStack-NiFi/parse-tika-result-json-to-avro.groovy at master ·
>>>>>>>> CogStack/CogStack-NiFi · GitHub
>>>>>>>> >>>
>>>>>>>> >>> The schema is
>>>>>>>> >>> CogStack-NiFi/document.avsc at master · CogStack/CogStack-NiFi
>>>>>>>> · GitHub
>>>>>>>> >>>
>>>>>>>> >>>
>>>>>>>> >>> On Thu, Dec 8, 2022 at 1:59 PM Richard Beare <
>>>>>>>> [email protected]> wrote:
>>>>>>>> >>>>
>>>>>>>> >>>> I'm diving into the convertrecord tests a bit deeper on the
>>>>>>>> production server.
>>>>>>>> >>>>
>>>>>>>> >>>> The first test case - 259 documents, total of 1M when in avro
>>>>>>>> format in the input queue to the convert record processor. These avro 
>>>>>>>> files
>>>>>>>> were not created by the groovy script - they start life as a database 
>>>>>>>> query
>>>>>>>> and the text field is in one of the columns. The convertrecord 
>>>>>>>> processor
>>>>>>>> runs very quickly - click start, press refresh and it is done. The avro
>>>>>>>> ends up like this:
>>>>>>>> >>>>
>>>>>>>> >>>> [ {
>>>>>>>> >>>>   "sampleid" : 1075,
>>>>>>>> >>>>   "typeid" : 98,
>>>>>>>> >>>>   "dct" : "2020-01-25T21:40:25.515Z",
>>>>>>>> >>>>   "filename" : "__tmp/txt/mtsamples-type-98-sample-1075.txt",
>>>>>>>> >>>>   "document" : "Text removed",
>>>>>>>> >>>>   "docid" : "9"
>>>>>>>> >>>> } ]
>>>>>>>> >>>>
>>>>>>>> >>>> In the second test, where the text fields are extracted from
>>>>>>>> pdf tika before avro files are created by the groovy script (from the 
>>>>>>>> tika
>>>>>>>> json output), the total queue size for the 259 documents is larger -
>>>>>>>> 1.77MB, and the performance is very different - press start, click 
>>>>>>>> refresh
>>>>>>>> and only two flowfiles are processed.
>>>>>>>> >>>>
>>>>>>>> >>>> [ {
>>>>>>>> >>>>   "doc_id" : "70",
>>>>>>>> >>>>   "doc_text" : "text removed",
>>>>>>>> >>>>   "processing_timestamp" : "2022-12-07T23:09:52.354Z",
>>>>>>>> >>>>   "metadata_x_ocr_applied" : true,
>>>>>>>> >>>>   "metadata_x_parsed_by" :
>>>>>>>> "org.apache.tika.parser.pdf.PDFParser",
>>>>>>>> >>>>   "metadata_content_type" : "application/pdf",
>>>>>>>> >>>>   "metadata_page_count" : 1,
>>>>>>>> >>>>   "metadata_creation_date" : null,
>>>>>>>> >>>>   "metadata_last_modified" : null
>>>>>>>> >>>> } ]
>>>>>>>> >>>>
>>>>>>>> >>>> I've noticed that the second one has a content.type attribute
>>>>>>>> of 'application/json' which doesn't seem right and doesn't match the 
>>>>>>>> fast
>>>>>>>> case. I'll see what happens if I change that.
>>>>>>>> >>>>
>>>>>>>> >>>> On Thu, Dec 8, 2022 at 9:39 AM Richard Beare <
>>>>>>>> [email protected]> wrote:
>>>>>>>> >>>>>
>>>>>>>> >>>>> Hi All,
>>>>>>>> >>>>> Some progress on debugging options. I've found a flow that
>>>>>>>> exhibits the problem using synthetic data. However the results are host
>>>>>>>> dependent. On my laptop a "run-once" click of merge record gives me two
>>>>>>>> flowfiles of 100 records, while the same flow on the production server
>>>>>>>> produces several much smaller flowfiles. This makes me think that 
>>>>>>>> something
>>>>>>>> funny is happening with my storage setup that I'll need to chase.
>>>>>>>> >>>>>
>>>>>>>> >>>>> I had tested the convertrecord option (simply
>>>>>>>> avroreader->avrowriter) and it did seem slow, but I'll investigate this
>>>>>>>> further as it may be related to my storage issue.
>>>>>>>> >>>>>
>>>>>>>> >>>>>
>>>>>>>> >>>>>
>>>>>>>> >>>>> On Thu, Dec 8, 2022 at 1:23 AM Mark Payne <
>>>>>>>> [email protected]> wrote:
>>>>>>>> >>>>>>
>>>>>>>> >>>>>> > Is there something about this structure that is likely to
>>>>>>>> be causing the problem? Could there be other issues with the avro 
>>>>>>>> generated
>>>>>>>> by the script?
>>>>>>>> >>>>>>
>>>>>>>> >>>>>> I don’t think the structure should matter. And as long as
>>>>>>>> the avro produced is proper Avro, I don’t think it should matter. 
>>>>>>>> Unless
>>>>>>>> perhaps there’s some issue with the Avro library itself that’s causing 
>>>>>>>> it
>>>>>>>> to take a really long time to parse the Avro or something? I’d be 
>>>>>>>> curious -
>>>>>>>> if you take the output of your script and then you run it through a
>>>>>>>> ConvertRecord (Avro Reader -> Json Writer) is the ConvertRecord fast? 
>>>>>>>> Or is
>>>>>>>> it really slow to process it?
>>>>>>>> >>>>>>
>>>>>>>> >>>>>> On Dec 5, 2022, at 5:58 AM, Richard Beare <
>>>>>>>> [email protected]> wrote:
>>>>>>>> >>>>>>
>>>>>>>> >>>>>> Further - I performed another test in which I replaced the
>>>>>>>> custom json to avro script with a ConvertRecord processor - merge 
>>>>>>>> record
>>>>>>>> appears to work as expected in that case.
>>>>>>>> >>>>>>
>>>>>>>> >>>>>> Output of convertrecord looks like this:
>>>>>>>> >>>>>>
>>>>>>>> >>>>>> [ {
>>>>>>>> >>>>>>   "text" : "  No Alert Found \n\n",
>>>>>>>> >>>>>>   "metadata" : {
>>>>>>>> >>>>>>     "X_TIKA_Parsed_By" : null,
>>>>>>>> >>>>>>     "X_OCR_Applied" : null,
>>>>>>>> >>>>>>     "Content_Type" : null
>>>>>>>> >>>>>>   },
>>>>>>>> >>>>>>   "success" : true,
>>>>>>>> >>>>>>   "timestamp" : "2022-12-05T10:49:18.568Z",
>>>>>>>> >>>>>>   "processingElapsedTime" : 0,
>>>>>>>> >>>>>>   "doc_id" : "5.60178607E8"
>>>>>>>> >>>>>> } ]
>>>>>>>> >>>>>>
>>>>>>>> >>>>>> while the output of the script looks like:
>>>>>>>> >>>>>>
>>>>>>>> >>>>>> [ {
>>>>>>>> >>>>>>   "doc_id" : "5.61996505E8",
>>>>>>>> >>>>>>   "doc_text" : "  No Alert Found \n\n",
>>>>>>>> >>>>>>   "processing_timestamp" : "2022-11-28T01:16:46.775Z",
>>>>>>>> >>>>>>   "metadata_x_ocr_applied" : true,
>>>>>>>> >>>>>>   "metadata_x_parsed_by" :
>>>>>>>> "org.apache.tika.parser.DefaultParser;org.apache.tika.parser.microsoft.rtf.RTFParser;org.apache.tika.parser.AutoDetectParser",
>>>>>>>> >>>>>>   "metadata_content_type" : "application/rtf",
>>>>>>>> >>>>>>   "metadata_page_count" : null,
>>>>>>>> >>>>>>   "metadata_creation_date" : null,
>>>>>>>> >>>>>>   "metadata_last_modified" : null
>>>>>>>> >>>>>> } ]
>>>>>>>> >>>>>>
>>>>>>>> >>>>>> Is there something about this structure that is likely to be
>>>>>>>> causing the problem? Could there be other issues with the avro 
>>>>>>>> generated by
>>>>>>>> the script?
>>>>>>>> >>>>>>
>>>>>>>> >>>>>> On Mon, Dec 5, 2022 at 9:31 PM Richard Beare <
>>>>>>>> [email protected]> wrote:
>>>>>>>> >>>>>>>
>>>>>>>> >>>>>>> I've reset the backpressure to the default
>>>>>>>> >>>>>>>
>>>>>>>> >>>>>>> This remains something of a mystery. The merge with
>>>>>>>> synthetic data happily creates flowfiles with 100 records, and the join
>>>>>>>> says "Records merged due to: Bin is full" or "Records merged due to: 
>>>>>>>> Bin is
>>>>>>>> full enough". No timeouts in that case, even with the max bin age at 
>>>>>>>> 4.5
>>>>>>>> seconds. The resulting flowfiles were about 300K.
>>>>>>>> >>>>>>>
>>>>>>>> >>>>>>> The real data is doing much the same as before, producing
>>>>>>>> flowfiles of about 30K, with 7 records or so. If I increase the 
>>>>>>>> maximum bin
>>>>>>>> age to 30 seconds the size and record count is higher - 12 to 15. 
>>>>>>>> Nothing
>>>>>>>> like the behaviour with synthetic data, where the 100 record flowfiles 
>>>>>>>> are
>>>>>>>> created almost instantly. Joins are always due to bin age.
>>>>>>>> >>>>>>>
>>>>>>>> >>>>>>> Can the problem relate to the structure of the avro files?
>>>>>>>> Any way to dive into that? Everything else about the mergerecord 
>>>>>>>> settings
>>>>>>>> appear the same, so I can't see an explanation as to why the behaviour 
>>>>>>>> is
>>>>>>>> different on the same hardware.
>>>>>>>> >>>>>>>
>>>>>>>> >>>>>>>
>>>>>>>> >>>>>>>
>>>>>>>> >>>>>>>
>>>>>>>> >>>>>>>
>>>>>>>> >>>>>>>
>>>>>>>> >>>>>>>
>>>>>>>> >>>>>>>
>>>>>>>> >>>>>>> On Mon, Dec 5, 2022 at 2:09 AM Mark Payne <
>>>>>>>> [email protected]> wrote:
>>>>>>>> >>>>>>>>
>>>>>>>> >>>>>>>> Hey Richard,
>>>>>>>> >>>>>>>>
>>>>>>>> >>>>>>>> So a few things that I’ve done/looked at.
>>>>>>>> >>>>>>>>
>>>>>>>> >>>>>>>> I generated some Avro data (random JSON that I downloaded
>>>>>>>> from a Random JSON Generator and then converted to Avro).
>>>>>>>> >>>>>>>>
>>>>>>>> >>>>>>>> I then ran this avro data into both the MergeRecord
>>>>>>>> processors.
>>>>>>>> >>>>>>>>
>>>>>>>> >>>>>>>> Firstly, I noticed that both are very slow. Found that was
>>>>>>>> because Run Schedule was set to 5 seconds. This should *ALWAYS* be 0 
>>>>>>>> secs
>>>>>>>> for MergeRecord. And really for basically all processors except for the
>>>>>>>> first one in the flow.
>>>>>>>> >>>>>>>>
>>>>>>>> >>>>>>>> I also notice that you have backpressure set on your
>>>>>>>> connections to 40,000 FlowFiles and 4 GB. This can significantly slow
>>>>>>>> things down. If you have performance concerns you definitely want
>>>>>>>> backpressure set back to the default of 10,000 FlowFiles. Otherwise, 
>>>>>>>> as the
>>>>>>>> queues fill up they start “swapping out” FlowFiles to disk, and this 
>>>>>>>> can
>>>>>>>> significantly slow things down.
>>>>>>>> >>>>>>>>
>>>>>>>> >>>>>>>> I noticed that MergeRecord is set to 1 concurrent task.
>>>>>>>> Probably worth considering increasing that, if performance is a 
>>>>>>>> concern.
>>>>>>>> >>>>>>>>
>>>>>>>> >>>>>>>> That said, I am seeing nice, full bins of 100 records
>>>>>>>> merged from each of the MergeRecord processors.
>>>>>>>> >>>>>>>> So it is certainly possible that if you’re seeing smaller
>>>>>>>> bins it’s becuase you’re timing out. The 4.5 seconds timeout is quite
>>>>>>>> short. Have you tried increasing that to say 30 seconds to see if it 
>>>>>>>> gives
>>>>>>>> you larger bins?
>>>>>>>> >>>>>>>> I also recommend that you take a look at the data
>>>>>>>> provenance to see why it’s creating the bins.
>>>>>>>> >>>>>>>>
>>>>>>>> >>>>>>>> If unclear how to do that:
>>>>>>>> >>>>>>>> Right-click on the MergeRecord processor
>>>>>>>> >>>>>>>> Go to View data provenance
>>>>>>>> >>>>>>>> Scroll down the list until you see a “JOIN” event type.
>>>>>>>> You can ignore the ATTRIBUTES_MODIFIED and DROP events for now.
>>>>>>>> >>>>>>>> Click the ‘i’ icon on the left-hand side.
>>>>>>>> >>>>>>>> This will show you details about the merge. In the Details
>>>>>>>> tab, if you scroll down, it will show you a Details field, which tells 
>>>>>>>> you
>>>>>>>> why the data was merged. It should either say: "Records Merged due to: 
>>>>>>>> Bin
>>>>>>>> has reached Max Bin Age” or “ Records Merged due to: Bin is full”
>>>>>>>> >>>>>>>>
>>>>>>>> >>>>>>>> If it is due to Max Bin Age reached, then I’d recommend
>>>>>>>> increasing number of concurrent tasks, reducing backpressure to no more
>>>>>>>> than 10,000 FlowFiles in the queue, and/or increasing the Max Bin Age.
>>>>>>>> >>>>>>>> Also worth asking - what kind of machines is this running
>>>>>>>> on? A 64 core VM with 1 TB volume will, of course, run MUCH differently
>>>>>>>> than a 4 core VM with a 10 GB volume, especially in the cloud.
>>>>>>>> >>>>>>>>
>>>>>>>> >>>>>>>> If still having trouble, let me know what the provenance
>>>>>>>> tells you about the reason for merging the data, and we can go from 
>>>>>>>> there.
>>>>>>>> >>>>>>>>
>>>>>>>> >>>>>>>> Thanks!
>>>>>>>> >>>>>>>> -Mark
>>>>>>>> >>>>>>>>
>>>>>>>> >>>>>>>>
>>>>>>>> >>>>>>>> On Dec 3, 2022, at 4:38 PM, Mark Payne <
>>>>>>>> [email protected]> wrote:
>>>>>>>> >>>>>>>>
>>>>>>>> >>>>>>>> Richard,
>>>>>>>> >>>>>>>>
>>>>>>>> >>>>>>>> I think just the flow structure shoudl be sufficient.
>>>>>>>> >>>>>>>>
>>>>>>>> >>>>>>>> Thanks
>>>>>>>> >>>>>>>> -Mark
>>>>>>>> >>>>>>>>
>>>>>>>> >>>>>>>>
>>>>>>>> >>>>>>>> On Dec 3, 2022, at 4:32 PM, Richard Beare <
>>>>>>>> [email protected]> wrote:
>>>>>>>> >>>>>>>>
>>>>>>>> >>>>>>>> Thanks for responding,
>>>>>>>> >>>>>>>> I re-tested with max bins = 2, but the behaviour remained
>>>>>>>> the same. I can easily share a version of the functioning workflow (and
>>>>>>>> data), which is part of a public project. The problem workflow (which
>>>>>>>> shares many of the same components) is part of a health research 
>>>>>>>> project,
>>>>>>>> so more difficult. I definitely can't share any data from that one. Do 
>>>>>>>> you
>>>>>>>> need to see the data or is the overall structure sufficient at this 
>>>>>>>> point?
>>>>>>>> Happy to demonstrate via video conference too.
>>>>>>>> >>>>>>>>
>>>>>>>> >>>>>>>> Thanks
>>>>>>>> >>>>>>>>
>>>>>>>> >>>>>>>> On Sun, Dec 4, 2022 at 1:37 AM Mark Payne <
>>>>>>>> [email protected]> wrote:
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> Hi Richard,
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> Can you try increasing the Maximum Number of Bins? I
>>>>>>>> think there was an issue that was recently addressed in which the merge
>>>>>>>> processors had an issue when Max Number of Bins = 1.
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> If you still see the same issue, please provide a copy of
>>>>>>>> the flow that can be used to replicate the issue.
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> Thanks
>>>>>>>> >>>>>>>>> -Mark
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> On Dec 3, 2022, at 5:21 AM, Richard Beare <
>>>>>>>> [email protected]> wrote:
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> Hi,
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> Pretty much the same - I seem to end up with flowfiles
>>>>>>>> containing about 7 records, presumably always triggered by the timeout.
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> I had thought the timeout needed to be less than the run
>>>>>>>> schedule, but it looks like it can be the same.
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> Here's a debug dump
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> 10:13:43 UTC
>>>>>>>> >>>>>>>>> DEBUG
>>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1]
>>>>>>>> Migrating id=1066297 to RecordBin[size=4, full=false, isComplete=false,
>>>>>>>> id=4021]
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> 10:13:43 UTC
>>>>>>>> >>>>>>>>> DEBUG
>>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1]
>>>>>>>> Transferred id=1066297 to RecordBin[size=5, full=false, 
>>>>>>>> isComplete=false,
>>>>>>>> id=4021]
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> 10:13:44 UTC
>>>>>>>> >>>>>>>>> DEBUG
>>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got
>>>>>>>> Group ID
>>>>>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
>>>>>>>> for FlowFile[filename=9e9908f6-b28e-4615-b6c8-4bd163a3bc00]
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> 10:13:44 UTC
>>>>>>>> >>>>>>>>> DEBUG
>>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1]
>>>>>>>> Migrating id=1066372 to RecordBin[size=5, full=false, isComplete=false,
>>>>>>>> id=4021]
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> 10:13:44 UTC
>>>>>>>> >>>>>>>>> DEBUG
>>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1]
>>>>>>>> Transferred id=1066372 to RecordBin[size=6, full=false, 
>>>>>>>> isComplete=false,
>>>>>>>> id=4021]
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> 10:13:45 UTC
>>>>>>>> >>>>>>>>> DEBUG
>>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1]
>>>>>>>> Transferred id=1066575 to RecordBin[size=7, full=false, 
>>>>>>>> isComplete=true,
>>>>>>>> id=4021]
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> 10:13:46 UTC
>>>>>>>> >>>>>>>>> DEBUG
>>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got
>>>>>>>> Group ID
>>>>>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
>>>>>>>> for FlowFile[filename=9e9908f6-b28e-4615-b6c8-4bd163a3bc00]
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> 10:13:46 UTC
>>>>>>>> >>>>>>>>> DEBUG
>>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1]
>>>>>>>> Migrating id=1066612 to RecordBin[size=0, full=false, isComplete=false,
>>>>>>>> id=4022]
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> 10:13:46 UTC
>>>>>>>> >>>>>>>>> DEBUG
>>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1]
>>>>>>>> Created OutputStream using session StandardProcessSession[id=83204] for
>>>>>>>> RecordBin[size=0, full=false, isComplete=false, id=4022]
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> 10:13:46 UTC
>>>>>>>> >>>>>>>>> DEBUG
>>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1]
>>>>>>>> Transferred id=1066612 to RecordBin[size=1, full=false, 
>>>>>>>> isComplete=false,
>>>>>>>> id=4022]
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> 10:13:48 UTC
>>>>>>>> >>>>>>>>> DEBUG
>>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1]
>>>>>>>> Migrating id=1066896 to RecordBin[size=2, full=false, isComplete=false,
>>>>>>>> id=4022]
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> 10:13:48 UTC
>>>>>>>> >>>>>>>>> DEBUG
>>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1]
>>>>>>>> Transferred id=1066896 to RecordBin[size=3, full=false, 
>>>>>>>> isComplete=false,
>>>>>>>> id=4022]
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> 10:13:49 UTC
>>>>>>>> >>>>>>>>> DEBUG
>>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got
>>>>>>>> Group ID
>>>>>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
>>>>>>>> for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e]
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> 10:13:49 UTC
>>>>>>>> >>>>>>>>> DEBUG
>>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1]
>>>>>>>> Migrating id=1067051 to RecordBin[size=3, full=false, isComplete=false,
>>>>>>>> id=4022]
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> 10:13:49 UTC
>>>>>>>> >>>>>>>>> DEBUG
>>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1]
>>>>>>>> Transferred id=1067051 to RecordBin[size=4, full=false, 
>>>>>>>> isComplete=false,
>>>>>>>> id=4022]
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> 10:13:52 UTC
>>>>>>>> >>>>>>>>> DEBUG
>>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1]
>>>>>>>> Transferred id=1067254 to RecordBin[size=7, full=false, 
>>>>>>>> isComplete=true,
>>>>>>>> id=4022]
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> 10:13:53 UTC
>>>>>>>> >>>>>>>>> DEBUG
>>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got
>>>>>>>> Group ID
>>>>>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
>>>>>>>> for FlowFile[filename=b7f4498d-647a-46d1-ad9f-badaed8591f8]
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> 10:13:53 UTC
>>>>>>>> >>>>>>>>> DEBUG
>>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1]
>>>>>>>> Migrating id=1067395 to RecordBin[size=0, full=false, isComplete=false,
>>>>>>>> id=4023]
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> 10:13:53 UTC
>>>>>>>> >>>>>>>>> DEBUG
>>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1]
>>>>>>>> Created OutputStream using session StandardProcessSession[id=83205] for
>>>>>>>> RecordBin[size=0, full=false, isComplete=false, id=4023]
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> 10:13:53 UTC
>>>>>>>> >>>>>>>>> DEBUG
>>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1]
>>>>>>>> Transferred id=1067395 to RecordBin[size=1, full=false, 
>>>>>>>> isComplete=false,
>>>>>>>> id=4023]
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> 10:13:54 UTC
>>>>>>>> >>>>>>>>> DEBUG
>>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1]
>>>>>>>> Migrating id=1068472 to RecordBin[size=1, full=false, isComplete=false,
>>>>>>>> id=4023]
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> 10:13:54 UTC
>>>>>>>> >>>>>>>>> DEBUG
>>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1]
>>>>>>>> Transferred id=1068472 to RecordBin[size=2, full=false, 
>>>>>>>> isComplete=false,
>>>>>>>> id=4023]
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> 10:13:55 UTC
>>>>>>>> >>>>>>>>> DEBUG
>>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got
>>>>>>>> Group ID
>>>>>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
>>>>>>>> for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e]
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> 10:13:55 UTC
>>>>>>>> >>>>>>>>> DEBUG
>>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1]
>>>>>>>> Migrating id=1068597 to RecordBin[size=2, full=false, isComplete=false,
>>>>>>>> id=4023]
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> 10:13:55 UTC
>>>>>>>> >>>>>>>>> DEBUG
>>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1]
>>>>>>>> Transferred id=1068597 to RecordBin[size=3, full=false, 
>>>>>>>> isComplete=false,
>>>>>>>> id=4023]
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> 10:13:58 UTC
>>>>>>>> >>>>>>>>> DEBUG
>>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1]
>>>>>>>> RecordBin[size=6, full=false, isComplete=false, id=4023] is now 
>>>>>>>> expired.
>>>>>>>> Completing bin.
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> 10:13:58 UTC
>>>>>>>> >>>>>>>>> DEBUG
>>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1]
>>>>>>>> Marked RecordBin[size=6, full=false, isComplete=true, id=4023] as 
>>>>>>>> complete
>>>>>>>> because complete() was called
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> 10:13:58 UTC
>>>>>>>> >>>>>>>>> DEBUG
>>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1]
>>>>>>>> Closed Record Writer using session StandardProcessSession[id=83205] for
>>>>>>>> RecordBin[size=6, full=false, isComplete=true, id=4023]
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> 10:13:58 UTC
>>>>>>>> >>>>>>>>> DEBUG
>>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1]
>>>>>>>> Completed bin RecordBin[size=6, full=false, isComplete=true, id=4023] 
>>>>>>>> with
>>>>>>>> 6 records with Merged FlowFile
>>>>>>>> FlowFile[filename=6824b503-82b9-444e-a77e-9b081e878948] using input
>>>>>>>> FlowFiles [id=1067395, id=1068472, id=1068597, id=1068663, id=1068800,
>>>>>>>> id=1068845]
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> 10:13:58 UTC
>>>>>>>> >>>>>>>>> DEBUG
>>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1]
>>>>>>>> Transferred id=1068845 to RecordBin[size=6, full=false, 
>>>>>>>> isComplete=true,
>>>>>>>> id=4023]
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> 10:14:01 UTC
>>>>>>>> >>>>>>>>> DEBUG
>>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1]
>>>>>>>> Migrating id=1069272 to RecordBin[size=2, full=false, isComplete=false,
>>>>>>>> id=4024]
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> 10:14:01 UTC
>>>>>>>> >>>>>>>>> DEBUG
>>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1]
>>>>>>>> Transferred id=1069272 to RecordBin[size=3, full=false, 
>>>>>>>> isComplete=false,
>>>>>>>> id=4024]
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> 10:14:02 UTC
>>>>>>>> >>>>>>>>> DEBUG
>>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got
>>>>>>>> Group ID
>>>>>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
>>>>>>>> for FlowFile[filename=b7f4498d-647a-46d1-ad9f-badaed8591f8]
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> 10:14:02 UTC
>>>>>>>> >>>>>>>>> DEBUG
>>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1]
>>>>>>>> Migrating id=1069316 to RecordBin[size=3, full=false, isComplete=false,
>>>>>>>> id=4024]
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> 10:14:02 UTC
>>>>>>>> >>>>>>>>> DEBUG
>>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1]
>>>>>>>> Transferred id=1069316 to RecordBin[size=4, full=false, 
>>>>>>>> isComplete=false,
>>>>>>>> id=4024]
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> 10:14:05 UTC
>>>>>>>> >>>>>>>>> DEBUG
>>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1]
>>>>>>>> RecordBin[size=6, full=false, isComplete=false, id=4024] is now 
>>>>>>>> expired.
>>>>>>>> Completing bin.
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> 10:14:05 UTC
>>>>>>>> >>>>>>>>> DEBUG
>>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1]
>>>>>>>> Marked RecordBin[size=6, full=false, isComplete=true, id=4024] as 
>>>>>>>> complete
>>>>>>>> because complete() was called
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> 10:14:05 UTC
>>>>>>>> >>>>>>>>> DEBUG
>>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1]
>>>>>>>> Closed Record Writer using session StandardProcessSession[id=83206] for
>>>>>>>> RecordBin[size=6, full=false, isComplete=true, id=4024]
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> 10:14:05 UTC
>>>>>>>> >>>>>>>>> DEBUG
>>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1]
>>>>>>>> Completed bin RecordBin[size=6, full=false, isComplete=true, id=4024] 
>>>>>>>> with
>>>>>>>> 6 records with Merged FlowFile
>>>>>>>> FlowFile[filename=6c13e518-655b-4507-ad6c-d37f6b9c0a5d] using input
>>>>>>>> FlowFiles [id=1069044, id=1069103, id=1069272, id=1069316, id=1069451,
>>>>>>>> id=1069492]
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> 10:14:05 UTC
>>>>>>>> >>>>>>>>> DEBUG
>>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1]
>>>>>>>> Transferred id=1069492 to RecordBin[size=6, full=false, 
>>>>>>>> isComplete=true,
>>>>>>>> id=4024]
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> 10:14:07 UTC
>>>>>>>> >>>>>>>>> DEBUG
>>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1]
>>>>>>>> Migrating id=1072118 to RecordBin[size=2, full=false, isComplete=false,
>>>>>>>> id=4025]
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> 10:14:07 UTC
>>>>>>>> >>>>>>>>> DEBUG
>>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1]
>>>>>>>> Transferred id=1072118 to RecordBin[size=3, full=false, 
>>>>>>>> isComplete=false,
>>>>>>>> id=4025]
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> 10:14:08 UTC
>>>>>>>> >>>>>>>>> DEBUG
>>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got
>>>>>>>> Group ID
>>>>>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
>>>>>>>> for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e]
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> 10:14:08 UTC
>>>>>>>> >>>>>>>>> DEBUG
>>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1]
>>>>>>>> Migrating id=1072197 to RecordBin[size=3, full=false, isComplete=false,
>>>>>>>> id=4025]
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> 10:14:08 UTC
>>>>>>>> >>>>>>>>> DEBUG
>>>>>>>> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1]
>>>>>>>> Transferred id=1072197 to RecordBin[size=4, full=false, 
>>>>>>>> isComplete=false,
>>>>>>>> id=4025]
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>> On Sat, Dec 3, 2022 at 4:21 PM Joe Witt <
>>>>>>>> [email protected]> wrote:
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>> Hello
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>> Run schedule should be 0.
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>> 50 should be the min number of records
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>> 5 seconds is the max bin age it sounds like you want.
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>> Start with these changes and let us know what youre
>>>>>>>> seeing.
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>> Thanks
>>>>>>>> >>>>>>>>>>
>>>>>>>> >>>>>>>>>> On Fri, Dec 2, 2022 at 10:12 PM Richard Beare <
>>>>>>>> [email protected]> wrote:
>>>>>>>> >>>>>>>>>>>
>>>>>>>> >>>>>>>>>>> Hi,
>>>>>>>> >>>>>>>>>>> I'm having a great deal of trouble configuring the
>>>>>>>> mergerecord processor to deliver reasonable performance and I'm not 
>>>>>>>> sure
>>>>>>>> where to look to correct it. One of my upstream processors requires a
>>>>>>>> single record per flowfile, but I'd like to create larger flowfiles 
>>>>>>>> before
>>>>>>>> passing to the next stage. The flowfiles are independent at this stage 
>>>>>>>> so
>>>>>>>> there's no special processing required of the merging. I'd like to 
>>>>>>>> create
>>>>>>>> flowfiles of about 50 to 100 records.
>>>>>>>> >>>>>>>>>>>
>>>>>>>> >>>>>>>>>>> I have two tests, both running on the same nifi system.
>>>>>>>> One uses synthetic data, the other the production data. The 
>>>>>>>> performance of
>>>>>>>> the mergerecord processor for the synthetic data is as I'd expect, and 
>>>>>>>> I
>>>>>>>> can't figure out why the  production data is so much slower. Here's the
>>>>>>>> >>>>>>>>>>> configuration:
>>>>>>>> >>>>>>>>>>>
>>>>>>>> >>>>>>>>>>> mergerecord has the following settings. Timer driven, 1
>>>>>>>> concurrent task, 5 second run schedule, bin packing merge strategy, min
>>>>>>>> records = 1, max records = 100, max bin age = 4.5 secs, maximum number 
>>>>>>>> of
>>>>>>>> bins = 1.
>>>>>>>> >>>>>>>>>>>
>>>>>>>> >>>>>>>>>>> In the case of synthetic data the typical flowfile size
>>>>>>>> is in the range 2 to 7KB.
>>>>>>>> >>>>>>>>>>>
>>>>>>>> >>>>>>>>>>> The size of flowfiles for the production case is
>>>>>>>> smaller - typically around 1KB.
>>>>>>>> >>>>>>>>>>>
>>>>>>>> >>>>>>>>>>> The structure in the tests is slightly different.
>>>>>>>> Synthetic is (note that I've removed the text part):
>>>>>>>> >>>>>>>>>>>
>>>>>>>> >>>>>>>>>>> [ {
>>>>>>>> >>>>>>>>>>>   "sampleid" : 1075,
>>>>>>>> >>>>>>>>>>>   "typeid" : 98,
>>>>>>>> >>>>>>>>>>>   "dct" : "2020-01-25T21:40:25.515Z",
>>>>>>>> >>>>>>>>>>>   "filename" :
>>>>>>>> "__tmp/txt/mtsamples-type-98-sample-1075.txt",
>>>>>>>> >>>>>>>>>>>   "document" : "Text removed - typically a few hundred
>>>>>>>> words",
>>>>>>>> >>>>>>>>>>>   "docid" : "9"
>>>>>>>> >>>>>>>>>>> } ]
>>>>>>>> >>>>>>>>>>>
>>>>>>>> >>>>>>>>>>> Production is:
>>>>>>>> >>>>>>>>>>> [ {
>>>>>>>> >>>>>>>>>>>   "doc_id" : "5.60622895E8",
>>>>>>>> >>>>>>>>>>>   "doc_text" : " Text deleted - typically a few hundred
>>>>>>>> words",
>>>>>>>> >>>>>>>>>>>   "processing_timestamp" : "2022-11-27T23:56:35.601Z",
>>>>>>>> >>>>>>>>>>>   "metadata_x_ocr_applied" : true,
>>>>>>>> >>>>>>>>>>>   "metadata_x_parsed_by" :
>>>>>>>> "org.apache.tika.parser.DefaultParser;org.apache.tika.parser.microsoft.rtf.RTFParser;org.apache.tika.parser.AutoDetectParser",
>>>>>>>> >>>>>>>>>>>   "metadata_content_type" : "application/rtf",
>>>>>>>> >>>>>>>>>>>   "metadata_page_count" : null,
>>>>>>>> >>>>>>>>>>>   "metadata_creation_date" : null,
>>>>>>>> >>>>>>>>>>>   "metadata_last_modified" : null
>>>>>>>> >>>>>>>>>>> } ]
>>>>>>>> >>>>>>>>>>>
>>>>>>>> >>>>>>>>>>>
>>>>>>>> >>>>>>>>>>>
>>>>>>>> >>>>>>>>>>> I load up the queue feeding the mergerecord processor
>>>>>>>> with several hundred individual flowfiles and activate it.
>>>>>>>> >>>>>>>>>>>
>>>>>>>> >>>>>>>>>>> The synthetic data is nicely placed into chunks of 100,
>>>>>>>> with any remainder being flushed in a smaller chunk.
>>>>>>>> >>>>>>>>>>>
>>>>>>>> >>>>>>>>>>> The production data is generally bundled into groups of
>>>>>>>> 6 records, sometimes less. Certainly it never gets close to 100 
>>>>>>>> records.
>>>>>>>> >>>>>>>>>>>
>>>>>>>> >>>>>>>>>>> Any ideas as to what I should look at to track down the
>>>>>>>> difference?
>>>>>>>> >>>>>>>>>>>
>>>>>>>> >>>>>>>>>>> Thanks
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>>
>>>>>>>> >>>>>>>>
>>>>>>>> >>>>>>>>
>>>>>>>> >>>>>>
>>>>>>>> >
>>>>>>>>
>>>>>>>

Reply via email to