Any thoughts on this? Are there some extra steps required when creating an
avro file from a user defined schema?

On Thu, Dec 8, 2022 at 2:56 PM Richard Beare <[email protected]>
wrote:

> Here's another result that I think suggests there's something wrong with
> the avro files created by the groovy script, although I can't see what the
> problem might be.
>
> The test is as follows. Output of the groovy script creating avro files is
> passed to convertrecord, configured with an avro reader and json writer.
> This is slow. The json output is then converted back to avro with another
> convertrecord processor, configured with a jsontreereader and an avro
> writer - this is fast, instantly emptying the queue. The result of that is
> fed into the previously problematic merge processor which works exactly as
> expected, producing flowfiles with 100 records each.
>
> The difference I can see between the two flow files is the way in which
> the schema is specified. Perhaps some extras are required in the groovy
> file to set that up?
>
> The slow one has:
>
> {"type":"record","name":"document", "fields":[{
>
> The fast one
>
> {"type":"record","name":"nifiRecord","namespace":"org.apache.nifi","fields":
>
>
> Initial characters are also slightly different.
> Slow one:
>
> 0000000   O   b   j 001 002 026   a   v   r   o   .   s   c   h   e   m
> 0000020   a 346  \n   {   "   t   y   p   e   "   :   "   r   e   c   o
>
> Fast one
>
> 0000000   O   b   j 001 004 026   a   v   r   o   .   s   c   h   e   m
> 0000020   a 362  \b   {   "   t   y   p   e   "   :   "   r   e   c   o
>
>
> The groovy script is
> CogStack-NiFi/parse-tika-result-json-to-avro.groovy at master ·
> CogStack/CogStack-NiFi · GitHub
> <https://github.com/CogStack/CogStack-NiFi/blob/master/nifi/user-scripts/parse-tika-result-json-to-avro.groovy>
>
> The schema is
> CogStack-NiFi/document.avsc at master · CogStack/CogStack-NiFi · GitHub
> <https://github.com/CogStack/CogStack-NiFi/blob/master/nifi/user-schemas/document.avsc>
>
>
> On Thu, Dec 8, 2022 at 1:59 PM Richard Beare <[email protected]>
> wrote:
>
>> I'm diving into the convertrecord tests a bit deeper on the production
>> server.
>>
>> The first test case - 259 documents, total of 1M when in avro format in
>> the input queue to the convert record processor. These avro files were not
>> created by the groovy script - they start life as a database query and the
>> text field is in one of the columns. The convertrecord processor runs very
>> quickly - click start, press refresh and it is done. The avro ends up like
>> this:
>>
>> [ {
>>   "sampleid" : 1075,
>>   "typeid" : 98,
>>   "dct" : "2020-01-25T21:40:25.515Z",
>>   "filename" : "__tmp/txt/mtsamples-type-98-sample-1075.txt",
>>   "document" : "Text removed",
>>   "docid" : "9"
>> } ]
>>
>> In the second test, where the text fields are extracted from pdf tika
>> before avro files are created by the groovy script (from the tika json
>> output), the total queue size for the 259 documents is larger - 1.77MB, and
>> the performance is very different - press start, click refresh and only two
>> flowfiles are processed.
>>
>> [ {
>>   "doc_id" : "70",
>>   "doc_text" : "text removed",
>>   "processing_timestamp" : "2022-12-07T23:09:52.354Z",
>>   "metadata_x_ocr_applied" : true,
>>   "metadata_x_parsed_by" : "org.apache.tika.parser.pdf.PDFParser",
>>   "metadata_content_type" : "application/pdf",
>>   "metadata_page_count" : 1,
>>   "metadata_creation_date" : null,
>>   "metadata_last_modified" : null
>> } ]
>>
>> I've noticed that the second one has a content.type attribute of
>> 'application/json' which doesn't seem right and doesn't match the fast
>> case. I'll see what happens if I change that.
>>
>> On Thu, Dec 8, 2022 at 9:39 AM Richard Beare <[email protected]>
>> wrote:
>>
>>> Hi All,
>>> Some progress on debugging options. I've found a flow that exhibits the
>>> problem using synthetic data. However the results are host dependent. On my
>>> laptop a "run-once" click of merge record gives me two flowfiles of 100
>>> records, while the same flow on the production server produces several much
>>> smaller flowfiles. This makes me think that something funny is happening
>>> with my storage setup that I'll need to chase.
>>>
>>> I had tested the convertrecord option (simply avroreader->avrowriter)
>>> and it did seem slow, but I'll investigate this further as it may be
>>> related to my storage issue.
>>>
>>>
>>>
>>> On Thu, Dec 8, 2022 at 1:23 AM Mark Payne <[email protected]> wrote:
>>>
>>>> > Is there something about this structure that is likely to be causing
>>>> the problem? Could there be other issues with the avro generated by the
>>>> script?
>>>>
>>>> I don’t think the structure should matter. And as long as the avro
>>>> produced is proper Avro, I don’t think it should matter. Unless perhaps
>>>> there’s some issue with the Avro library itself that’s causing it to take a
>>>> really long time to parse the Avro or something? I’d be curious - if you
>>>> take the output of your script and then you run it through a ConvertRecord
>>>> (Avro Reader -> Json Writer) is the ConvertRecord fast? Or is it really
>>>> slow to process it?
>>>>
>>>> On Dec 5, 2022, at 5:58 AM, Richard Beare <[email protected]>
>>>> wrote:
>>>>
>>>> Further - I performed another test in which I replaced the custom json
>>>> to avro script with a ConvertRecord processor - merge record appears to
>>>> work as expected in that case.
>>>>
>>>> Output of convertrecord looks like this:
>>>>
>>>> [ {
>>>>   "text" : "  No Alert Found \n\n",
>>>>   "metadata" : {
>>>>     "X_TIKA_Parsed_By" : null,
>>>>     "X_OCR_Applied" : null,
>>>>     "Content_Type" : null
>>>>   },
>>>>   "success" : true,
>>>>   "timestamp" : "2022-12-05T10:49:18.568Z",
>>>>   "processingElapsedTime" : 0,
>>>>   "doc_id" : "5.60178607E8"
>>>> } ]
>>>>
>>>> while the output of the script looks like:
>>>>
>>>> [ {
>>>>   "doc_id" : "5.61996505E8",
>>>>   "doc_text" : "  No Alert Found \n\n",
>>>>   "processing_timestamp" : "2022-11-28T01:16:46.775Z",
>>>>   "metadata_x_ocr_applied" : true,
>>>>   "metadata_x_parsed_by" :
>>>> "org.apache.tika.parser.DefaultParser;org.apache.tika.parser.microsoft.rtf.RTFParser;org.apache.tika.parser.AutoDetectParser",
>>>>   "metadata_content_type" : "application/rtf",
>>>>   "metadata_page_count" : null,
>>>>   "metadata_creation_date" : null,
>>>>   "metadata_last_modified" : null
>>>> } ]
>>>>
>>>> Is there something about this structure that is likely to be causing
>>>> the problem? Could there be other issues with the avro generated by the
>>>> script?
>>>>
>>>> On Mon, Dec 5, 2022 at 9:31 PM Richard Beare <[email protected]>
>>>> wrote:
>>>>
>>>>> I've reset the backpressure to the default
>>>>>
>>>>> This remains something of a mystery. The merge with synthetic data
>>>>> happily creates flowfiles with 100 records, and the join says "Records
>>>>> merged due to: Bin is full" or "Records merged due to: Bin is full 
>>>>> enough".
>>>>> No timeouts in that case, even with the max bin age at 4.5 seconds. The
>>>>> resulting flowfiles were about 300K.
>>>>>
>>>>> The real data is doing much the same as before, producing flowfiles of
>>>>> about 30K, with 7 records or so. If I increase the maximum bin age to 30
>>>>> seconds the size and record count is higher - 12 to 15. Nothing like the
>>>>> behaviour with synthetic data, where the 100 record flowfiles are created
>>>>> almost instantly. Joins are always due to bin age.
>>>>>
>>>>> Can the problem relate to the structure of the avro files? Any way to
>>>>> dive into that? Everything else about the mergerecord settings appear the
>>>>> same, so I can't see an explanation as to why the behaviour is different 
>>>>> on
>>>>> the same hardware.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Dec 5, 2022 at 2:09 AM Mark Payne <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hey Richard,
>>>>>>
>>>>>> So a few things that I’ve done/looked at.
>>>>>>
>>>>>> I generated some Avro data (random JSON that I downloaded from a
>>>>>> Random JSON Generator and then converted to Avro).
>>>>>>
>>>>>> I then ran this avro data into both the MergeRecord processors.
>>>>>>
>>>>>> Firstly, I noticed that both are very slow. Found that was because
>>>>>> Run Schedule was set to 5 seconds. This should *ALWAYS* be 0 secs for
>>>>>> MergeRecord. And really for basically all processors except for the first
>>>>>> one in the flow.
>>>>>>
>>>>>> I also notice that you have backpressure set on your connections to
>>>>>> 40,000 FlowFiles and 4 GB. This can significantly slow things down. If 
>>>>>> you
>>>>>> have performance concerns you definitely want backpressure set back to 
>>>>>> the
>>>>>> default of 10,000 FlowFiles. Otherwise, as the queues fill up they start
>>>>>> “swapping out” FlowFiles to disk, and this can significantly slow things
>>>>>> down.
>>>>>>
>>>>>> I noticed that MergeRecord is set to 1 concurrent task. Probably
>>>>>> worth considering increasing that, if performance is a concern.
>>>>>>
>>>>>> That said, I am seeing nice, full bins of 100 records merged from
>>>>>> each of the MergeRecord processors.
>>>>>> So it is certainly possible that if you’re seeing smaller bins it’s
>>>>>> becuase you’re timing out. The 4.5 seconds timeout is quite short. Have 
>>>>>> you
>>>>>> tried increasing that to say 30 seconds to see if it gives you larger 
>>>>>> bins?
>>>>>> I also recommend that you take a look at the data provenance to see
>>>>>> why it’s creating the bins.
>>>>>>
>>>>>> If unclear how to do that:
>>>>>> Right-click on the MergeRecord processor
>>>>>> Go to View data provenance
>>>>>> Scroll down the list until you see a “JOIN” event type. You can
>>>>>> ignore the ATTRIBUTES_MODIFIED and DROP events for now.
>>>>>> Click the ‘i’ icon on the left-hand side.
>>>>>> This will show you details about the merge. In the Details tab, if
>>>>>> you scroll down, it will show you a Details field, which tells you why 
>>>>>> the
>>>>>> data was merged. It should either say: "Records Merged due to: Bin has
>>>>>> reached Max Bin Age” or “ Records Merged due to: Bin is full”
>>>>>>
>>>>>> If it is due to Max Bin Age reached, then I’d recommend increasing
>>>>>> number of concurrent tasks, reducing backpressure to no more than 10,000
>>>>>> FlowFiles in the queue, and/or increasing the Max Bin Age.
>>>>>> Also worth asking - what kind of machines is this running on? A 64
>>>>>> core VM with 1 TB volume will, of course, run MUCH differently than a 4
>>>>>> core VM with a 10 GB volume, especially in the cloud.
>>>>>>
>>>>>> If still having trouble, let me know what the provenance tells you
>>>>>> about the reason for merging the data, and we can go from there.
>>>>>>
>>>>>> Thanks!
>>>>>> -Mark
>>>>>>
>>>>>>
>>>>>> On Dec 3, 2022, at 4:38 PM, Mark Payne <[email protected]> wrote:
>>>>>>
>>>>>> Richard,
>>>>>>
>>>>>> I think just the flow structure shoudl be sufficient.
>>>>>>
>>>>>> Thanks
>>>>>> -Mark
>>>>>>
>>>>>>
>>>>>> On Dec 3, 2022, at 4:32 PM, Richard Beare <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>> Thanks for responding,
>>>>>> I re-tested with max bins = 2, but the behaviour remained the same. I
>>>>>> can easily share a version of the functioning workflow (and data), which 
>>>>>> is
>>>>>> part of a public project. The problem workflow (which shares many of the
>>>>>> same components) is part of a health research project, so more 
>>>>>> difficult. I
>>>>>> definitely can't share any data from that one. Do you need to see the 
>>>>>> data
>>>>>> or is the overall structure sufficient at this point? Happy to 
>>>>>> demonstrate
>>>>>> via video conference too.
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>> On Sun, Dec 4, 2022 at 1:37 AM Mark Payne <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Richard,
>>>>>>>
>>>>>>> Can you try increasing the Maximum Number of Bins? I think there was
>>>>>>> an issue that was recently addressed in which the merge processors had 
>>>>>>> an
>>>>>>> issue when Max Number of Bins = 1.
>>>>>>>
>>>>>>> If you still see the same issue, please provide a copy of the flow
>>>>>>> that can be used to replicate the issue.
>>>>>>>
>>>>>>> Thanks
>>>>>>> -Mark
>>>>>>>
>>>>>>>
>>>>>>> On Dec 3, 2022, at 5:21 AM, Richard Beare <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Pretty much the same - I seem to end up with flowfiles containing
>>>>>>> about 7 records, presumably always triggered by the timeout.
>>>>>>>
>>>>>>> I had thought the timeout needed to be less than the run schedule,
>>>>>>> but it looks like it can be the same.
>>>>>>>
>>>>>>> Here's a debug dump
>>>>>>>
>>>>>>> 10:13:43 UTC
>>>>>>> DEBUG
>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>
>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>>>>>> id=1066297 to RecordBin[size=4, full=false, isComplete=false, id=4021]
>>>>>>>
>>>>>>> 10:13:43 UTC
>>>>>>> DEBUG
>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>
>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>>>>> id=1066297 to RecordBin[size=5, full=false, isComplete=false, id=4021]
>>>>>>>
>>>>>>> 10:13:44 UTC
>>>>>>> DEBUG
>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>
>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID
>>>>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
>>>>>>> for FlowFile[filename=9e9908f6-b28e-4615-b6c8-4bd163a3bc00]
>>>>>>>
>>>>>>> 10:13:44 UTC
>>>>>>> DEBUG
>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>
>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>>>>>> id=1066372 to RecordBin[size=5, full=false, isComplete=false, id=4021]
>>>>>>>
>>>>>>> 10:13:44 UTC
>>>>>>> DEBUG
>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>
>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>>>>> id=1066372 to RecordBin[size=6, full=false, isComplete=false, id=4021]
>>>>>>>
>>>>>>> 10:13:45 UTC
>>>>>>> DEBUG
>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>
>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>>>>> id=1066575 to RecordBin[size=7, full=false, isComplete=true, id=4021]
>>>>>>>
>>>>>>> 10:13:46 UTC
>>>>>>> DEBUG
>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>
>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID
>>>>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
>>>>>>> for FlowFile[filename=9e9908f6-b28e-4615-b6c8-4bd163a3bc00]
>>>>>>>
>>>>>>> 10:13:46 UTC
>>>>>>> DEBUG
>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>
>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>>>>>> id=1066612 to RecordBin[size=0, full=false, isComplete=false, id=4022]
>>>>>>>
>>>>>>> 10:13:46 UTC
>>>>>>> DEBUG
>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>
>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Created
>>>>>>> OutputStream using session StandardProcessSession[id=83204] for
>>>>>>> RecordBin[size=0, full=false, isComplete=false, id=4022]
>>>>>>>
>>>>>>> 10:13:46 UTC
>>>>>>> DEBUG
>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>
>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>>>>> id=1066612 to RecordBin[size=1, full=false, isComplete=false, id=4022]
>>>>>>>
>>>>>>> 10:13:48 UTC
>>>>>>> DEBUG
>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>
>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>>>>>> id=1066896 to RecordBin[size=2, full=false, isComplete=false, id=4022]
>>>>>>>
>>>>>>> 10:13:48 UTC
>>>>>>> DEBUG
>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>
>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>>>>> id=1066896 to RecordBin[size=3, full=false, isComplete=false, id=4022]
>>>>>>>
>>>>>>> 10:13:49 UTC
>>>>>>> DEBUG
>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>
>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID
>>>>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
>>>>>>> for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e]
>>>>>>>
>>>>>>> 10:13:49 UTC
>>>>>>> DEBUG
>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>
>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>>>>>> id=1067051 to RecordBin[size=3, full=false, isComplete=false, id=4022]
>>>>>>>
>>>>>>> 10:13:49 UTC
>>>>>>> DEBUG
>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>
>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>>>>> id=1067051 to RecordBin[size=4, full=false, isComplete=false, id=4022]
>>>>>>>
>>>>>>> 10:13:52 UTC
>>>>>>> DEBUG
>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>
>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>>>>> id=1067254 to RecordBin[size=7, full=false, isComplete=true, id=4022]
>>>>>>>
>>>>>>> 10:13:53 UTC
>>>>>>> DEBUG
>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>
>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID
>>>>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
>>>>>>> for FlowFile[filename=b7f4498d-647a-46d1-ad9f-badaed8591f8]
>>>>>>>
>>>>>>> 10:13:53 UTC
>>>>>>> DEBUG
>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>
>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>>>>>> id=1067395 to RecordBin[size=0, full=false, isComplete=false, id=4023]
>>>>>>>
>>>>>>> 10:13:53 UTC
>>>>>>> DEBUG
>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>
>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Created
>>>>>>> OutputStream using session StandardProcessSession[id=83205] for
>>>>>>> RecordBin[size=0, full=false, isComplete=false, id=4023]
>>>>>>>
>>>>>>> 10:13:53 UTC
>>>>>>> DEBUG
>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>
>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>>>>> id=1067395 to RecordBin[size=1, full=false, isComplete=false, id=4023]
>>>>>>>
>>>>>>> 10:13:54 UTC
>>>>>>> DEBUG
>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>
>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>>>>>> id=1068472 to RecordBin[size=1, full=false, isComplete=false, id=4023]
>>>>>>>
>>>>>>> 10:13:54 UTC
>>>>>>> DEBUG
>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>
>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>>>>> id=1068472 to RecordBin[size=2, full=false, isComplete=false, id=4023]
>>>>>>>
>>>>>>> 10:13:55 UTC
>>>>>>> DEBUG
>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>
>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID
>>>>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
>>>>>>> for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e]
>>>>>>>
>>>>>>> 10:13:55 UTC
>>>>>>> DEBUG
>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>
>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>>>>>> id=1068597 to RecordBin[size=2, full=false, isComplete=false, id=4023]
>>>>>>>
>>>>>>> 10:13:55 UTC
>>>>>>> DEBUG
>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>
>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>>>>> id=1068597 to RecordBin[size=3, full=false, isComplete=false, id=4023]
>>>>>>>
>>>>>>> 10:13:58 UTC
>>>>>>> DEBUG
>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>
>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1]
>>>>>>> RecordBin[size=6, full=false, isComplete=false, id=4023] is now expired.
>>>>>>> Completing bin.
>>>>>>>
>>>>>>> 10:13:58 UTC
>>>>>>> DEBUG
>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>
>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Marked
>>>>>>> RecordBin[size=6, full=false, isComplete=true, id=4023] as complete 
>>>>>>> because
>>>>>>> complete() was called
>>>>>>>
>>>>>>> 10:13:58 UTC
>>>>>>> DEBUG
>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>
>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Closed Record
>>>>>>> Writer using session StandardProcessSession[id=83205] for 
>>>>>>> RecordBin[size=6,
>>>>>>> full=false, isComplete=true, id=4023]
>>>>>>>
>>>>>>> 10:13:58 UTC
>>>>>>> DEBUG
>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>
>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Completed bin
>>>>>>> RecordBin[size=6, full=false, isComplete=true, id=4023] with 6 records 
>>>>>>> with
>>>>>>> Merged FlowFile FlowFile[filename=6824b503-82b9-444e-a77e-9b081e878948]
>>>>>>> using input FlowFiles [id=1067395, id=1068472, id=1068597, id=1068663,
>>>>>>> id=1068800, id=1068845]
>>>>>>>
>>>>>>> 10:13:58 UTC
>>>>>>> DEBUG
>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>
>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>>>>> id=1068845 to RecordBin[size=6, full=false, isComplete=true, id=4023]
>>>>>>>
>>>>>>> 10:14:01 UTC
>>>>>>> DEBUG
>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>
>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>>>>>> id=1069272 to RecordBin[size=2, full=false, isComplete=false, id=4024]
>>>>>>>
>>>>>>> 10:14:01 UTC
>>>>>>> DEBUG
>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>
>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>>>>> id=1069272 to RecordBin[size=3, full=false, isComplete=false, id=4024]
>>>>>>>
>>>>>>> 10:14:02 UTC
>>>>>>> DEBUG
>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>
>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID
>>>>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
>>>>>>> for FlowFile[filename=b7f4498d-647a-46d1-ad9f-badaed8591f8]
>>>>>>>
>>>>>>> 10:14:02 UTC
>>>>>>> DEBUG
>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>
>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>>>>>> id=1069316 to RecordBin[size=3, full=false, isComplete=false, id=4024]
>>>>>>>
>>>>>>> 10:14:02 UTC
>>>>>>> DEBUG
>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>
>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>>>>> id=1069316 to RecordBin[size=4, full=false, isComplete=false, id=4024]
>>>>>>>
>>>>>>> 10:14:05 UTC
>>>>>>> DEBUG
>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>
>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1]
>>>>>>> RecordBin[size=6, full=false, isComplete=false, id=4024] is now expired.
>>>>>>> Completing bin.
>>>>>>>
>>>>>>> 10:14:05 UTC
>>>>>>> DEBUG
>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>
>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Marked
>>>>>>> RecordBin[size=6, full=false, isComplete=true, id=4024] as complete 
>>>>>>> because
>>>>>>> complete() was called
>>>>>>>
>>>>>>> 10:14:05 UTC
>>>>>>> DEBUG
>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>
>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Closed Record
>>>>>>> Writer using session StandardProcessSession[id=83206] for 
>>>>>>> RecordBin[size=6,
>>>>>>> full=false, isComplete=true, id=4024]
>>>>>>>
>>>>>>> 10:14:05 UTC
>>>>>>> DEBUG
>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>
>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Completed bin
>>>>>>> RecordBin[size=6, full=false, isComplete=true, id=4024] with 6 records 
>>>>>>> with
>>>>>>> Merged FlowFile FlowFile[filename=6c13e518-655b-4507-ad6c-d37f6b9c0a5d]
>>>>>>> using input FlowFiles [id=1069044, id=1069103, id=1069272, id=1069316,
>>>>>>> id=1069451, id=1069492]
>>>>>>>
>>>>>>> 10:14:05 UTC
>>>>>>> DEBUG
>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>
>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>>>>> id=1069492 to RecordBin[size=6, full=false, isComplete=true, id=4024]
>>>>>>>
>>>>>>> 10:14:07 UTC
>>>>>>> DEBUG
>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>
>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>>>>>> id=1072118 to RecordBin[size=2, full=false, isComplete=false, id=4025]
>>>>>>>
>>>>>>> 10:14:07 UTC
>>>>>>> DEBUG
>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>
>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>>>>> id=1072118 to RecordBin[size=3, full=false, isComplete=false, id=4025]
>>>>>>>
>>>>>>> 10:14:08 UTC
>>>>>>> DEBUG
>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>
>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID
>>>>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
>>>>>>> for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e]
>>>>>>>
>>>>>>> 10:14:08 UTC
>>>>>>> DEBUG
>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>
>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>>>>>> id=1072197 to RecordBin[size=3, full=false, isComplete=false, id=4025]
>>>>>>>
>>>>>>> 10:14:08 UTC
>>>>>>> DEBUG
>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>
>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>>>>> id=1072197 to RecordBin[size=4, full=false, isComplete=false, id=4025]
>>>>>>>
>>>>>>>
>>>>>>> On Sat, Dec 3, 2022 at 4:21 PM Joe Witt <[email protected]> wrote:
>>>>>>>
>>>>>>>> Hello
>>>>>>>>
>>>>>>>> Run schedule should be 0.
>>>>>>>>
>>>>>>>> 50 should be the min number of records
>>>>>>>>
>>>>>>>> 5 seconds is the max bin age it sounds like you want.
>>>>>>>>
>>>>>>>> Start with these changes and let us know what youre seeing.
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>> On Fri, Dec 2, 2022 at 10:12 PM Richard Beare <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>> I'm having a great deal of trouble configuring the mergerecord
>>>>>>>>> processor to deliver reasonable performance and I'm not sure where to 
>>>>>>>>> look
>>>>>>>>> to correct it. One of my upstream processors requires a single record 
>>>>>>>>> per
>>>>>>>>> flowfile, but I'd like to create larger flowfiles before passing to 
>>>>>>>>> the
>>>>>>>>> next stage. The flowfiles are independent at this stage so there's no
>>>>>>>>> special processing required of the merging. I'd like to create 
>>>>>>>>> flowfiles of
>>>>>>>>> about 50 to 100 records.
>>>>>>>>>
>>>>>>>>> I have two tests, both running on the same nifi system. One uses
>>>>>>>>> synthetic data, the other the production data. The performance of the
>>>>>>>>> mergerecord processor for the synthetic data is as I'd expect, and I 
>>>>>>>>> can't
>>>>>>>>> figure out why the  production data is so much slower. Here's the
>>>>>>>>> configuration:
>>>>>>>>>
>>>>>>>>> mergerecord has the following settings. Timer driven, 1 concurrent
>>>>>>>>> task, 5 second run schedule, bin packing merge strategy, min records 
>>>>>>>>> = 1,
>>>>>>>>> max records = 100, max bin age = 4.5 secs, maximum number of bins = 1.
>>>>>>>>>
>>>>>>>>> In the case of synthetic data the typical flowfile size is in the
>>>>>>>>> range 2 to 7KB.
>>>>>>>>>
>>>>>>>>> The size of flowfiles for the production case is smaller -
>>>>>>>>> typically around 1KB.
>>>>>>>>>
>>>>>>>>> The structure in the tests is slightly different. Synthetic is
>>>>>>>>> (note that I've removed the text part):
>>>>>>>>>
>>>>>>>>> [ {
>>>>>>>>>   "sampleid" : 1075,
>>>>>>>>>   "typeid" : 98,
>>>>>>>>>   "dct" : "2020-01-25T21:40:25.515Z",
>>>>>>>>>   "filename" : "__tmp/txt/mtsamples-type-98-sample-1075.txt",
>>>>>>>>>   "document" : "Text removed - typically a few hundred words",
>>>>>>>>>   "docid" : "9"
>>>>>>>>> } ]
>>>>>>>>>
>>>>>>>>> Production is:
>>>>>>>>> [ {
>>>>>>>>>   "doc_id" : "5.60622895E8",
>>>>>>>>>   "doc_text" : " Text deleted - typically a few hundred words",
>>>>>>>>>   "processing_timestamp" : "2022-11-27T23:56:35.601Z",
>>>>>>>>>   "metadata_x_ocr_applied" : true,
>>>>>>>>>   "metadata_x_parsed_by" :
>>>>>>>>> "org.apache.tika.parser.DefaultParser;org.apache.tika.parser.microsoft.rtf.RTFParser;org.apache.tika.parser.AutoDetectParser",
>>>>>>>>>   "metadata_content_type" : "application/rtf",
>>>>>>>>>   "metadata_page_count" : null,
>>>>>>>>>   "metadata_creation_date" : null,
>>>>>>>>>   "metadata_last_modified" : null
>>>>>>>>> } ]
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I load up the queue feeding the mergerecord processor with several
>>>>>>>>> hundred individual flowfiles and activate it.
>>>>>>>>>
>>>>>>>>> The synthetic data is nicely placed into chunks of 100, with any
>>>>>>>>> remainder being flushed in a smaller chunk.
>>>>>>>>>
>>>>>>>>> The production data is generally bundled into groups of 6 records,
>>>>>>>>> sometimes less. Certainly it never gets close to 100 records.
>>>>>>>>>
>>>>>>>>> Any ideas as to what I should look at to track down the difference?
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>

Reply via email to