Thanks - I'll have a look at that. It is helpfully to get guidance like
this when the system is so large.

On Wed, Dec 21, 2022 at 5:30 AM Matt Burgess <[email protected]> wrote:

> Thanks Vijay! I agree those processors should do the trick but there
> were things in the transformation between input and desired output
> that I wasn't sure of their origin. If you are setting constants you
> can use either a Shift or Default spec, if you are moving fields
> around you can use a Shift spec, and in general whether you end up
> with one spec or multiple, I find it's easiest to use a Chain spec (an
> array of specs) in the processor configuration. You can play around
> with the spec(s) at the Jolt playground [1]
>
> An important difference to note between JoltTransformJSON and
> JoltTransformRecord is that for the former, the spec is applied to the
> entire input (and it is entirely read into memory) where in
> JoltTransformRecord the spec is applied to each record.
>
> Regards,
> Matt
>
> [1] http://jolt-demo.appspot.com/#inception
>
> On Tue, Dec 20, 2022 at 10:52 AM Vijay Chhipa <[email protected]> wrote:
> >
> > Hi Richard
> > Have you tried JoltTransformJSON or JoltTransformRecord
> >
> > I believe you should be able to do this
> >
> > Quick start here:
> https://community.cloudera.com/t5/Community-Articles/Jolt-quick-reference-for-Nifi-Jolt-Processors/ta-p/244350
> >
> >
> > On Dec 20, 2022, at 4:13 AM, Richard Beare <[email protected]>
> wrote:
> >
> > Hi Everyone,
> > Still struggling to fix this issue and may need to try some different
> things.
> >
> > What is the recommended way of transforming a record structure? At the
> moment I have a groovy script doing this but the downstream processing is
> very slow, as discussed in the preceding thread.
> >
> > The transformation is very simple - starting structure is:
> >
> > {
> >  "result" : {
> > "text" : " document text",
> >   "metadata" : {
> >      "X-TIKA:Parsed-By": [
> >      "org.apache.tika.parser.pdf.PDFParser"
> >      ],
> >     "X-OCR-Applied" : true,
> >     "dcterms:created": "2018;07-24T15:04:51Z",
> >     "Content-Type" : "application/pdf",
> >     "Page-Count" : 2,
> >   },
> > "success" : true,
> > "timestamp" :  "2022-12-20T09:02:27.902Z",
> > "processingElapsedTime" : 6
> > }
> > }
> >
> >
> > final structure is':
> >
> > [ {
> > "doc_id" : 58,
> > "doc_text" : "   ",
> > "processing_timestamp" : "2022-12-20T09:02:27.902Z",
> > "metadata_x_ocr_applies" : true,
> > "metadata_x_parsed_by" : "org.apache.tika.parser.pdf.PDFParser",
> > "metadata_content_type" : "application/pdf",
> > "metadata_page_count" : 1
> > "metadata_creation_date": null,
> > "metadata_last_modified: nill
> > }]
> >
> > So a kind of flattening of the structure. Is there a processor I should
> be using to do this instead of a groovy script?
> >
> > Thanks
> >
> > On Wed, Dec 14, 2022 at 7:57 AM Richard Beare <[email protected]>
> wrote:
> >>
> >> Any thoughts on this? Are there some extra steps required when creating
> an avro file from a user defined schema?
> >>
> >> On Thu, Dec 8, 2022 at 2:56 PM Richard Beare <[email protected]>
> wrote:
> >>>
> >>> Here's another result that I think suggests there's something wrong
> with the avro files created by the groovy script, although I can't see what
> the problem might be.
> >>>
> >>> The test is as follows. Output of the groovy script creating avro
> files is passed to convertrecord, configured with an avro reader and json
> writer. This is slow. The json output is then converted back to avro with
> another convertrecord processor, configured with a jsontreereader and an
> avro writer - this is fast, instantly emptying the queue. The result of
> that is fed into the previously problematic merge processor which works
> exactly as expected, producing flowfiles with 100 records each.
> >>>
> >>> The difference I can see between the two flow files is the way in
> which the schema is specified. Perhaps some extras are required in the
> groovy file to set that up?
> >>>
> >>> The slow one has:
> >>>
> >>> {"type":"record","name":"document", "fields":[{
> >>>
> >>> The fast one
> >>>
> {"type":"record","name":"nifiRecord","namespace":"org.apache.nifi","fields":
> >>>
> >>>
> >>> Initial characters are also slightly different.
> >>> Slow one:
> >>>
> >>> 0000000   O   b   j 001 002 026   a   v   r   o   .   s   c   h   e   m
> >>> 0000020   a 346  \n   {   "   t   y   p   e   "   :   "   r   e   c   o
> >>>
> >>> Fast one
> >>>
> >>> 0000000   O   b   j 001 004 026   a   v   r   o   .   s   c   h   e   m
> >>> 0000020   a 362  \b   {   "   t   y   p   e   "   :   "   r   e   c   o
> >>>
> >>>
> >>> The groovy script is
> >>> CogStack-NiFi/parse-tika-result-json-to-avro.groovy at master ·
> CogStack/CogStack-NiFi · GitHub
> >>>
> >>> The schema is
> >>> CogStack-NiFi/document.avsc at master · CogStack/CogStack-NiFi · GitHub
> >>>
> >>>
> >>> On Thu, Dec 8, 2022 at 1:59 PM Richard Beare <[email protected]>
> wrote:
> >>>>
> >>>> I'm diving into the convertrecord tests a bit deeper on the
> production server.
> >>>>
> >>>> The first test case - 259 documents, total of 1M when in avro format
> in the input queue to the convert record processor. These avro files were
> not created by the groovy script - they start life as a database query and
> the text field is in one of the columns. The convertrecord processor runs
> very quickly - click start, press refresh and it is done. The avro ends up
> like this:
> >>>>
> >>>> [ {
> >>>>   "sampleid" : 1075,
> >>>>   "typeid" : 98,
> >>>>   "dct" : "2020-01-25T21:40:25.515Z",
> >>>>   "filename" : "__tmp/txt/mtsamples-type-98-sample-1075.txt",
> >>>>   "document" : "Text removed",
> >>>>   "docid" : "9"
> >>>> } ]
> >>>>
> >>>> In the second test, where the text fields are extracted from pdf tika
> before avro files are created by the groovy script (from the tika json
> output), the total queue size for the 259 documents is larger - 1.77MB, and
> the performance is very different - press start, click refresh and only two
> flowfiles are processed.
> >>>>
> >>>> [ {
> >>>>   "doc_id" : "70",
> >>>>   "doc_text" : "text removed",
> >>>>   "processing_timestamp" : "2022-12-07T23:09:52.354Z",
> >>>>   "metadata_x_ocr_applied" : true,
> >>>>   "metadata_x_parsed_by" : "org.apache.tika.parser.pdf.PDFParser",
> >>>>   "metadata_content_type" : "application/pdf",
> >>>>   "metadata_page_count" : 1,
> >>>>   "metadata_creation_date" : null,
> >>>>   "metadata_last_modified" : null
> >>>> } ]
> >>>>
> >>>> I've noticed that the second one has a content.type attribute of
> 'application/json' which doesn't seem right and doesn't match the fast
> case. I'll see what happens if I change that.
> >>>>
> >>>> On Thu, Dec 8, 2022 at 9:39 AM Richard Beare <[email protected]>
> wrote:
> >>>>>
> >>>>> Hi All,
> >>>>> Some progress on debugging options. I've found a flow that exhibits
> the problem using synthetic data. However the results are host dependent.
> On my laptop a "run-once" click of merge record gives me two flowfiles of
> 100 records, while the same flow on the production server produces several
> much smaller flowfiles. This makes me think that something funny is
> happening with my storage setup that I'll need to chase.
> >>>>>
> >>>>> I had tested the convertrecord option (simply
> avroreader->avrowriter) and it did seem slow, but I'll investigate this
> further as it may be related to my storage issue.
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Thu, Dec 8, 2022 at 1:23 AM Mark Payne <[email protected]>
> wrote:
> >>>>>>
> >>>>>> > Is there something about this structure that is likely to be
> causing the problem? Could there be other issues with the avro generated by
> the script?
> >>>>>>
> >>>>>> I don’t think the structure should matter. And as long as the avro
> produced is proper Avro, I don’t think it should matter. Unless perhaps
> there’s some issue with the Avro library itself that’s causing it to take a
> really long time to parse the Avro or something? I’d be curious - if you
> take the output of your script and then you run it through a ConvertRecord
> (Avro Reader -> Json Writer) is the ConvertRecord fast? Or is it really
> slow to process it?
> >>>>>>
> >>>>>> On Dec 5, 2022, at 5:58 AM, Richard Beare <[email protected]>
> wrote:
> >>>>>>
> >>>>>> Further - I performed another test in which I replaced the custom
> json to avro script with a ConvertRecord processor - merge record appears
> to work as expected in that case.
> >>>>>>
> >>>>>> Output of convertrecord looks like this:
> >>>>>>
> >>>>>> [ {
> >>>>>>   "text" : "  No Alert Found \n\n",
> >>>>>>   "metadata" : {
> >>>>>>     "X_TIKA_Parsed_By" : null,
> >>>>>>     "X_OCR_Applied" : null,
> >>>>>>     "Content_Type" : null
> >>>>>>   },
> >>>>>>   "success" : true,
> >>>>>>   "timestamp" : "2022-12-05T10:49:18.568Z",
> >>>>>>   "processingElapsedTime" : 0,
> >>>>>>   "doc_id" : "5.60178607E8"
> >>>>>> } ]
> >>>>>>
> >>>>>> while the output of the script looks like:
> >>>>>>
> >>>>>> [ {
> >>>>>>   "doc_id" : "5.61996505E8",
> >>>>>>   "doc_text" : "  No Alert Found \n\n",
> >>>>>>   "processing_timestamp" : "2022-11-28T01:16:46.775Z",
> >>>>>>   "metadata_x_ocr_applied" : true,
> >>>>>>   "metadata_x_parsed_by" :
> "org.apache.tika.parser.DefaultParser;org.apache.tika.parser.microsoft.rtf.RTFParser;org.apache.tika.parser.AutoDetectParser",
> >>>>>>   "metadata_content_type" : "application/rtf",
> >>>>>>   "metadata_page_count" : null,
> >>>>>>   "metadata_creation_date" : null,
> >>>>>>   "metadata_last_modified" : null
> >>>>>> } ]
> >>>>>>
> >>>>>> Is there something about this structure that is likely to be
> causing the problem? Could there be other issues with the avro generated by
> the script?
> >>>>>>
> >>>>>> On Mon, Dec 5, 2022 at 9:31 PM Richard Beare <
> [email protected]> wrote:
> >>>>>>>
> >>>>>>> I've reset the backpressure to the default
> >>>>>>>
> >>>>>>> This remains something of a mystery. The merge with synthetic data
> happily creates flowfiles with 100 records, and the join says "Records
> merged due to: Bin is full" or "Records merged due to: Bin is full enough".
> No timeouts in that case, even with the max bin age at 4.5 seconds. The
> resulting flowfiles were about 300K.
> >>>>>>>
> >>>>>>> The real data is doing much the same as before, producing
> flowfiles of about 30K, with 7 records or so. If I increase the maximum bin
> age to 30 seconds the size and record count is higher - 12 to 15. Nothing
> like the behaviour with synthetic data, where the 100 record flowfiles are
> created almost instantly. Joins are always due to bin age.
> >>>>>>>
> >>>>>>> Can the problem relate to the structure of the avro files? Any way
> to dive into that? Everything else about the mergerecord settings appear
> the same, so I can't see an explanation as to why the behaviour is
> different on the same hardware.
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On Mon, Dec 5, 2022 at 2:09 AM Mark Payne <[email protected]>
> wrote:
> >>>>>>>>
> >>>>>>>> Hey Richard,
> >>>>>>>>
> >>>>>>>> So a few things that I’ve done/looked at.
> >>>>>>>>
> >>>>>>>> I generated some Avro data (random JSON that I downloaded from a
> Random JSON Generator and then converted to Avro).
> >>>>>>>>
> >>>>>>>> I then ran this avro data into both the MergeRecord processors.
> >>>>>>>>
> >>>>>>>> Firstly, I noticed that both are very slow. Found that was
> because Run Schedule was set to 5 seconds. This should *ALWAYS* be 0 secs
> for MergeRecord. And really for basically all processors except for the
> first one in the flow.
> >>>>>>>>
> >>>>>>>> I also notice that you have backpressure set on your connections
> to 40,000 FlowFiles and 4 GB. This can significantly slow things down. If
> you have performance concerns you definitely want backpressure set back to
> the default of 10,000 FlowFiles. Otherwise, as the queues fill up they
> start “swapping out” FlowFiles to disk, and this can significantly slow
> things down.
> >>>>>>>>
> >>>>>>>> I noticed that MergeRecord is set to 1 concurrent task. Probably
> worth considering increasing that, if performance is a concern.
> >>>>>>>>
> >>>>>>>> That said, I am seeing nice, full bins of 100 records merged from
> each of the MergeRecord processors.
> >>>>>>>> So it is certainly possible that if you’re seeing smaller bins
> it’s becuase you’re timing out. The 4.5 seconds timeout is quite short.
> Have you tried increasing that to say 30 seconds to see if it gives you
> larger bins?
> >>>>>>>> I also recommend that you take a look at the data provenance to
> see why it’s creating the bins.
> >>>>>>>>
> >>>>>>>> If unclear how to do that:
> >>>>>>>> Right-click on the MergeRecord processor
> >>>>>>>> Go to View data provenance
> >>>>>>>> Scroll down the list until you see a “JOIN” event type. You can
> ignore the ATTRIBUTES_MODIFIED and DROP events for now.
> >>>>>>>> Click the ‘i’ icon on the left-hand side.
> >>>>>>>> This will show you details about the merge. In the Details tab,
> if you scroll down, it will show you a Details field, which tells you why
> the data was merged. It should either say: "Records Merged due to: Bin has
> reached Max Bin Age” or “ Records Merged due to: Bin is full”
> >>>>>>>>
> >>>>>>>> If it is due to Max Bin Age reached, then I’d recommend
> increasing number of concurrent tasks, reducing backpressure to no more
> than 10,000 FlowFiles in the queue, and/or increasing the Max Bin Age.
> >>>>>>>> Also worth asking - what kind of machines is this running on? A
> 64 core VM with 1 TB volume will, of course, run MUCH differently than a 4
> core VM with a 10 GB volume, especially in the cloud.
> >>>>>>>>
> >>>>>>>> If still having trouble, let me know what the provenance tells
> you about the reason for merging the data, and we can go from there.
> >>>>>>>>
> >>>>>>>> Thanks!
> >>>>>>>> -Mark
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Dec 3, 2022, at 4:38 PM, Mark Payne <[email protected]>
> wrote:
> >>>>>>>>
> >>>>>>>> Richard,
> >>>>>>>>
> >>>>>>>> I think just the flow structure shoudl be sufficient.
> >>>>>>>>
> >>>>>>>> Thanks
> >>>>>>>> -Mark
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Dec 3, 2022, at 4:32 PM, Richard Beare <
> [email protected]> wrote:
> >>>>>>>>
> >>>>>>>> Thanks for responding,
> >>>>>>>> I re-tested with max bins = 2, but the behaviour remained the
> same. I can easily share a version of the functioning workflow (and data),
> which is part of a public project. The problem workflow (which shares many
> of the same components) is part of a health research project, so more
> difficult. I definitely can't share any data from that one. Do you need to
> see the data or is the overall structure sufficient at this point? Happy to
> demonstrate via video conference too.
> >>>>>>>>
> >>>>>>>> Thanks
> >>>>>>>>
> >>>>>>>> On Sun, Dec 4, 2022 at 1:37 AM Mark Payne <[email protected]>
> wrote:
> >>>>>>>>>
> >>>>>>>>> Hi Richard,
> >>>>>>>>>
> >>>>>>>>> Can you try increasing the Maximum Number of Bins? I think there
> was an issue that was recently addressed in which the merge processors had
> an issue when Max Number of Bins = 1.
> >>>>>>>>>
> >>>>>>>>> If you still see the same issue, please provide a copy of the
> flow that can be used to replicate the issue.
> >>>>>>>>>
> >>>>>>>>> Thanks
> >>>>>>>>> -Mark
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Dec 3, 2022, at 5:21 AM, Richard Beare <
> [email protected]> wrote:
> >>>>>>>>>
> >>>>>>>>> Hi,
> >>>>>>>>>
> >>>>>>>>> Pretty much the same - I seem to end up with flowfiles
> containing about 7 records, presumably always triggered by the timeout.
> >>>>>>>>>
> >>>>>>>>> I had thought the timeout needed to be less than the run
> schedule, but it looks like it can be the same.
> >>>>>>>>>
> >>>>>>>>> Here's a debug dump
> >>>>>>>>>
> >>>>>>>>> 10:13:43 UTC
> >>>>>>>>> DEBUG
> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
> >>>>>>>>>
> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
> id=1066297 to RecordBin[size=4, full=false, isComplete=false, id=4021]
> >>>>>>>>>
> >>>>>>>>> 10:13:43 UTC
> >>>>>>>>> DEBUG
> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
> >>>>>>>>>
> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
> id=1066297 to RecordBin[size=5, full=false, isComplete=false, id=4021]
> >>>>>>>>>
> >>>>>>>>> 10:13:44 UTC
> >>>>>>>>> DEBUG
> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
> >>>>>>>>>
> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group
> ID
> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
> for FlowFile[filename=9e9908f6-b28e-4615-b6c8-4bd163a3bc00]
> >>>>>>>>>
> >>>>>>>>> 10:13:44 UTC
> >>>>>>>>> DEBUG
> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
> >>>>>>>>>
> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
> id=1066372 to RecordBin[size=5, full=false, isComplete=false, id=4021]
> >>>>>>>>>
> >>>>>>>>> 10:13:44 UTC
> >>>>>>>>> DEBUG
> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
> >>>>>>>>>
> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
> id=1066372 to RecordBin[size=6, full=false, isComplete=false, id=4021]
> >>>>>>>>>
> >>>>>>>>> 10:13:45 UTC
> >>>>>>>>> DEBUG
> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
> >>>>>>>>>
> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
> id=1066575 to RecordBin[size=7, full=false, isComplete=true, id=4021]
> >>>>>>>>>
> >>>>>>>>> 10:13:46 UTC
> >>>>>>>>> DEBUG
> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
> >>>>>>>>>
> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group
> ID
> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
> for FlowFile[filename=9e9908f6-b28e-4615-b6c8-4bd163a3bc00]
> >>>>>>>>>
> >>>>>>>>> 10:13:46 UTC
> >>>>>>>>> DEBUG
> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
> >>>>>>>>>
> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
> id=1066612 to RecordBin[size=0, full=false, isComplete=false, id=4022]
> >>>>>>>>>
> >>>>>>>>> 10:13:46 UTC
> >>>>>>>>> DEBUG
> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
> >>>>>>>>>
> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Created
> OutputStream using session StandardProcessSession[id=83204] for
> RecordBin[size=0, full=false, isComplete=false, id=4022]
> >>>>>>>>>
> >>>>>>>>> 10:13:46 UTC
> >>>>>>>>> DEBUG
> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
> >>>>>>>>>
> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
> id=1066612 to RecordBin[size=1, full=false, isComplete=false, id=4022]
> >>>>>>>>>
> >>>>>>>>> 10:13:48 UTC
> >>>>>>>>> DEBUG
> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
> >>>>>>>>>
> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
> id=1066896 to RecordBin[size=2, full=false, isComplete=false, id=4022]
> >>>>>>>>>
> >>>>>>>>> 10:13:48 UTC
> >>>>>>>>> DEBUG
> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
> >>>>>>>>>
> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
> id=1066896 to RecordBin[size=3, full=false, isComplete=false, id=4022]
> >>>>>>>>>
> >>>>>>>>> 10:13:49 UTC
> >>>>>>>>> DEBUG
> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
> >>>>>>>>>
> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group
> ID
> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
> for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e]
> >>>>>>>>>
> >>>>>>>>> 10:13:49 UTC
> >>>>>>>>> DEBUG
> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
> >>>>>>>>>
> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
> id=1067051 to RecordBin[size=3, full=false, isComplete=false, id=4022]
> >>>>>>>>>
> >>>>>>>>> 10:13:49 UTC
> >>>>>>>>> DEBUG
> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
> >>>>>>>>>
> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
> id=1067051 to RecordBin[size=4, full=false, isComplete=false, id=4022]
> >>>>>>>>>
> >>>>>>>>> 10:13:52 UTC
> >>>>>>>>> DEBUG
> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
> >>>>>>>>>
> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
> id=1067254 to RecordBin[size=7, full=false, isComplete=true, id=4022]
> >>>>>>>>>
> >>>>>>>>> 10:13:53 UTC
> >>>>>>>>> DEBUG
> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
> >>>>>>>>>
> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group
> ID
> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
> for FlowFile[filename=b7f4498d-647a-46d1-ad9f-badaed8591f8]
> >>>>>>>>>
> >>>>>>>>> 10:13:53 UTC
> >>>>>>>>> DEBUG
> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
> >>>>>>>>>
> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
> id=1067395 to RecordBin[size=0, full=false, isComplete=false, id=4023]
> >>>>>>>>>
> >>>>>>>>> 10:13:53 UTC
> >>>>>>>>> DEBUG
> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
> >>>>>>>>>
> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Created
> OutputStream using session StandardProcessSession[id=83205] for
> RecordBin[size=0, full=false, isComplete=false, id=4023]
> >>>>>>>>>
> >>>>>>>>> 10:13:53 UTC
> >>>>>>>>> DEBUG
> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
> >>>>>>>>>
> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
> id=1067395 to RecordBin[size=1, full=false, isComplete=false, id=4023]
> >>>>>>>>>
> >>>>>>>>> 10:13:54 UTC
> >>>>>>>>> DEBUG
> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
> >>>>>>>>>
> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
> id=1068472 to RecordBin[size=1, full=false, isComplete=false, id=4023]
> >>>>>>>>>
> >>>>>>>>> 10:13:54 UTC
> >>>>>>>>> DEBUG
> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
> >>>>>>>>>
> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
> id=1068472 to RecordBin[size=2, full=false, isComplete=false, id=4023]
> >>>>>>>>>
> >>>>>>>>> 10:13:55 UTC
> >>>>>>>>> DEBUG
> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
> >>>>>>>>>
> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group
> ID
> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
> for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e]
> >>>>>>>>>
> >>>>>>>>> 10:13:55 UTC
> >>>>>>>>> DEBUG
> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
> >>>>>>>>>
> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
> id=1068597 to RecordBin[size=2, full=false, isComplete=false, id=4023]
> >>>>>>>>>
> >>>>>>>>> 10:13:55 UTC
> >>>>>>>>> DEBUG
> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
> >>>>>>>>>
> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
> id=1068597 to RecordBin[size=3, full=false, isComplete=false, id=4023]
> >>>>>>>>>
> >>>>>>>>> 10:13:58 UTC
> >>>>>>>>> DEBUG
> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
> >>>>>>>>>
> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1]
> RecordBin[size=6, full=false, isComplete=false, id=4023] is now expired.
> Completing bin.
> >>>>>>>>>
> >>>>>>>>> 10:13:58 UTC
> >>>>>>>>> DEBUG
> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
> >>>>>>>>>
> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Marked
> RecordBin[size=6, full=false, isComplete=true, id=4023] as complete because
> complete() was called
> >>>>>>>>>
> >>>>>>>>> 10:13:58 UTC
> >>>>>>>>> DEBUG
> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
> >>>>>>>>>
> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Closed
> Record Writer using session StandardProcessSession[id=83205] for
> RecordBin[size=6, full=false, isComplete=true, id=4023]
> >>>>>>>>>
> >>>>>>>>> 10:13:58 UTC
> >>>>>>>>> DEBUG
> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
> >>>>>>>>>
> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Completed
> bin RecordBin[size=6, full=false, isComplete=true, id=4023] with 6 records
> with Merged FlowFile
> FlowFile[filename=6824b503-82b9-444e-a77e-9b081e878948] using input
> FlowFiles [id=1067395, id=1068472, id=1068597, id=1068663, id=1068800,
> id=1068845]
> >>>>>>>>>
> >>>>>>>>> 10:13:58 UTC
> >>>>>>>>> DEBUG
> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
> >>>>>>>>>
> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
> id=1068845 to RecordBin[size=6, full=false, isComplete=true, id=4023]
> >>>>>>>>>
> >>>>>>>>> 10:14:01 UTC
> >>>>>>>>> DEBUG
> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
> >>>>>>>>>
> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
> id=1069272 to RecordBin[size=2, full=false, isComplete=false, id=4024]
> >>>>>>>>>
> >>>>>>>>> 10:14:01 UTC
> >>>>>>>>> DEBUG
> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
> >>>>>>>>>
> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
> id=1069272 to RecordBin[size=3, full=false, isComplete=false, id=4024]
> >>>>>>>>>
> >>>>>>>>> 10:14:02 UTC
> >>>>>>>>> DEBUG
> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
> >>>>>>>>>
> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group
> ID
> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
> for FlowFile[filename=b7f4498d-647a-46d1-ad9f-badaed8591f8]
> >>>>>>>>>
> >>>>>>>>> 10:14:02 UTC
> >>>>>>>>> DEBUG
> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
> >>>>>>>>>
> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
> id=1069316 to RecordBin[size=3, full=false, isComplete=false, id=4024]
> >>>>>>>>>
> >>>>>>>>> 10:14:02 UTC
> >>>>>>>>> DEBUG
> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
> >>>>>>>>>
> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
> id=1069316 to RecordBin[size=4, full=false, isComplete=false, id=4024]
> >>>>>>>>>
> >>>>>>>>> 10:14:05 UTC
> >>>>>>>>> DEBUG
> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
> >>>>>>>>>
> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1]
> RecordBin[size=6, full=false, isComplete=false, id=4024] is now expired.
> Completing bin.
> >>>>>>>>>
> >>>>>>>>> 10:14:05 UTC
> >>>>>>>>> DEBUG
> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
> >>>>>>>>>
> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Marked
> RecordBin[size=6, full=false, isComplete=true, id=4024] as complete because
> complete() was called
> >>>>>>>>>
> >>>>>>>>> 10:14:05 UTC
> >>>>>>>>> DEBUG
> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
> >>>>>>>>>
> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Closed
> Record Writer using session StandardProcessSession[id=83206] for
> RecordBin[size=6, full=false, isComplete=true, id=4024]
> >>>>>>>>>
> >>>>>>>>> 10:14:05 UTC
> >>>>>>>>> DEBUG
> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
> >>>>>>>>>
> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Completed
> bin RecordBin[size=6, full=false, isComplete=true, id=4024] with 6 records
> with Merged FlowFile
> FlowFile[filename=6c13e518-655b-4507-ad6c-d37f6b9c0a5d] using input
> FlowFiles [id=1069044, id=1069103, id=1069272, id=1069316, id=1069451,
> id=1069492]
> >>>>>>>>>
> >>>>>>>>> 10:14:05 UTC
> >>>>>>>>> DEBUG
> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
> >>>>>>>>>
> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
> id=1069492 to RecordBin[size=6, full=false, isComplete=true, id=4024]
> >>>>>>>>>
> >>>>>>>>> 10:14:07 UTC
> >>>>>>>>> DEBUG
> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
> >>>>>>>>>
> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
> id=1072118 to RecordBin[size=2, full=false, isComplete=false, id=4025]
> >>>>>>>>>
> >>>>>>>>> 10:14:07 UTC
> >>>>>>>>> DEBUG
> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
> >>>>>>>>>
> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
> id=1072118 to RecordBin[size=3, full=false, isComplete=false, id=4025]
> >>>>>>>>>
> >>>>>>>>> 10:14:08 UTC
> >>>>>>>>> DEBUG
> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
> >>>>>>>>>
> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group
> ID
> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
> for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e]
> >>>>>>>>>
> >>>>>>>>> 10:14:08 UTC
> >>>>>>>>> DEBUG
> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
> >>>>>>>>>
> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
> id=1072197 to RecordBin[size=3, full=false, isComplete=false, id=4025]
> >>>>>>>>>
> >>>>>>>>> 10:14:08 UTC
> >>>>>>>>> DEBUG
> >>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
> >>>>>>>>>
> >>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
> id=1072197 to RecordBin[size=4, full=false, isComplete=false, id=4025]
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Sat, Dec 3, 2022 at 4:21 PM Joe Witt <[email protected]>
> wrote:
> >>>>>>>>>>
> >>>>>>>>>> Hello
> >>>>>>>>>>
> >>>>>>>>>> Run schedule should be 0.
> >>>>>>>>>>
> >>>>>>>>>> 50 should be the min number of records
> >>>>>>>>>>
> >>>>>>>>>> 5 seconds is the max bin age it sounds like you want.
> >>>>>>>>>>
> >>>>>>>>>> Start with these changes and let us know what youre seeing.
> >>>>>>>>>>
> >>>>>>>>>> Thanks
> >>>>>>>>>>
> >>>>>>>>>> On Fri, Dec 2, 2022 at 10:12 PM Richard Beare <
> [email protected]> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> Hi,
> >>>>>>>>>>> I'm having a great deal of trouble configuring the mergerecord
> processor to deliver reasonable performance and I'm not sure where to look
> to correct it. One of my upstream processors requires a single record per
> flowfile, but I'd like to create larger flowfiles before passing to the
> next stage. The flowfiles are independent at this stage so there's no
> special processing required of the merging. I'd like to create flowfiles of
> about 50 to 100 records.
> >>>>>>>>>>>
> >>>>>>>>>>> I have two tests, both running on the same nifi system. One
> uses synthetic data, the other the production data. The performance of the
> mergerecord processor for the synthetic data is as I'd expect, and I can't
> figure out why the  production data is so much slower. Here's the
> >>>>>>>>>>> configuration:
> >>>>>>>>>>>
> >>>>>>>>>>> mergerecord has the following settings. Timer driven, 1
> concurrent task, 5 second run schedule, bin packing merge strategy, min
> records = 1, max records = 100, max bin age = 4.5 secs, maximum number of
> bins = 1.
> >>>>>>>>>>>
> >>>>>>>>>>> In the case of synthetic data the typical flowfile size is in
> the range 2 to 7KB.
> >>>>>>>>>>>
> >>>>>>>>>>> The size of flowfiles for the production case is smaller -
> typically around 1KB.
> >>>>>>>>>>>
> >>>>>>>>>>> The structure in the tests is slightly different. Synthetic is
> (note that I've removed the text part):
> >>>>>>>>>>>
> >>>>>>>>>>> [ {
> >>>>>>>>>>>   "sampleid" : 1075,
> >>>>>>>>>>>   "typeid" : 98,
> >>>>>>>>>>>   "dct" : "2020-01-25T21:40:25.515Z",
> >>>>>>>>>>>   "filename" : "__tmp/txt/mtsamples-type-98-sample-1075.txt",
> >>>>>>>>>>>   "document" : "Text removed - typically a few hundred words",
> >>>>>>>>>>>   "docid" : "9"
> >>>>>>>>>>> } ]
> >>>>>>>>>>>
> >>>>>>>>>>> Production is:
> >>>>>>>>>>> [ {
> >>>>>>>>>>>   "doc_id" : "5.60622895E8",
> >>>>>>>>>>>   "doc_text" : " Text deleted - typically a few hundred words",
> >>>>>>>>>>>   "processing_timestamp" : "2022-11-27T23:56:35.601Z",
> >>>>>>>>>>>   "metadata_x_ocr_applied" : true,
> >>>>>>>>>>>   "metadata_x_parsed_by" :
> "org.apache.tika.parser.DefaultParser;org.apache.tika.parser.microsoft.rtf.RTFParser;org.apache.tika.parser.AutoDetectParser",
> >>>>>>>>>>>   "metadata_content_type" : "application/rtf",
> >>>>>>>>>>>   "metadata_page_count" : null,
> >>>>>>>>>>>   "metadata_creation_date" : null,
> >>>>>>>>>>>   "metadata_last_modified" : null
> >>>>>>>>>>> } ]
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> I load up the queue feeding the mergerecord processor with
> several hundred individual flowfiles and activate it.
> >>>>>>>>>>>
> >>>>>>>>>>> The synthetic data is nicely placed into chunks of 100, with
> any remainder being flushed in a smaller chunk.
> >>>>>>>>>>>
> >>>>>>>>>>> The production data is generally bundled into groups of 6
> records, sometimes less. Certainly it never gets close to 100 records.
> >>>>>>>>>>>
> >>>>>>>>>>> Any ideas as to what I should look at to track down the
> difference?
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>
> >
>

Reply via email to