A bit of progress. First up, firing a match_all at my index with 20M documents doesn't work, as you probably expected. Or more precisely, is unlikely to be useful - I left it overnight and nothing appeared to have happened, so I guess it was madly fetching pages and filling up available storage.
So I tested with a query of the form { query": { "range" : { "Visit_DateTime": { "gte" : "01/07/2020", "lte" : "02/07/2020", "format" : "dd/MM/yyyy" } } } } i.e a single days worth of documents (38998 according to a curl _count version of the query). This did indeed produce 3900 flowfiles in the hits queue and consume the input. Including a size parameter as follows: { "size" : 1000, query": { "range" : { "Visit_DateTime": { "gte" : "01/07/2020", "lte" : "02/07/2020", "format" : "dd/MM/yyyy" } } } } Leads to 39 flowfiles in the hits queue. So it looks like my best way forward processing many years worth of data is to generate a set of day-based queries. Is a python script the best option? On Fri, Aug 18, 2023 at 4:03 PM Chris Sampson <chr...@apache.org> wrote: > Ah, so these processors have all been written for Elasticsearch, and use > the Elasticsearch low-level REST API library to form connections. They've > not been tested against OpenSearch, although hopefully should work for any > interactions where the API is the same, but the two products continue to > diverge, so there's increasing chance that some things won't work. > > Any details of things that aren't working would be good to know about > (e.g. raised as Jira tickets, containing a much detail as possible, like > the query used and any log details of errors), so that the community could > look into providing OpenSearch compatibility in the future. > > I've known a few people try with OpenSearch and things either work, or we > don't hear about the errors that are received, so we don't know what needs > looking at from a NiFi perspective. > > On 2023/08/18 04:37:10 Richard Beare wrote: > > I did use the example and got errors. I'll revisit that (perhaps it is an > > opensearch idiosyncrasy). The per response option is probably my issue. > > I'll check that out and get back to you. > > Thanks again > > > > On Fri, Aug 18, 2023 at 2:30 PM Chris Sampson <chr...@apache.org> wrote: > > > > > Check the example in the processor's additional details docs [1] for > how > > > you could set size and sort fields for the query - size is used to > > > determine the number of documents returned per page, sorry is required > if > > > using a "search after" or "point in time" query type. > > > > > > If the Query property is set, the incoming FlowFile content should be > > > ignored, i.e. it doesn't need to be empty. > > > > > > Use the "Search Results Split" property to determine how the results > are > > > output. This defaults to "per response", which outputs a flowfile for > every > > > page of results. As PaginatedJsonQueryElasticsearch takes an input > > > flowfile, its internal "process session" remains active until the > processor > > > completes and commits is session - this happens when there are no more > > > results to retrieve from Elasticsearch, at which point the input > flowfile > > > disappears from the input queue and all output flowfiles appear in the > > > output queues. This is how the nifi framework handles sessions, but > can be > > > confusing if you're not aware of that beforehand. > > > > > > SearchElasticsearch is different in this regard because its session > ends > > > after every iteration (determined by the "Search Results Split", e.g. > this > > > could be per page or per entire query), then uses nifi state to setup > the > > > next iteration. This means you could start to see output flowfiles > sooner. > > > > > > > > > [1] > > > > https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-elasticsearch-restapi-nar/1.23.0/org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch/additionalDetails.html > > > > > > On 2023/08/17 22:13:22 Richard Beare wrote: > > > > Thanks, that makes sense. I've had trouble getting a size parameter > > > > accepted, but will work on that later. > > > > > > > > However, I'm unsure what I should expect to see in the following test > > > > scenario. > > > > > > > > A fixed query in the Query parameter - a match all. i.e. nothing > dynamic > > > > set by upstream processing > > > > > > > > An empty input flowfile to trigger activity. > > > > > > > > The test index is large. (20M docs) > > > > > > > > Do I expect the processor to begin filling the output queue as fast > as it > > > > can, with one flowfile per received page, pausing as the queue fills? > > > > That was what I was anticipating, but at the moment I'm getting no > output > > > > and the input flowfile isn't being consumed. I suspect one flag is > wrong, > > > > but can't see it. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Aug 18, 2023 at 12:06 AM Chris Sampson <chr...@apache.org> > > > wrote: > > > > > > > > > Again, sounds like it's working as documented [1] - an input is > > > required > > > > > to trigger the PaginatedJsonQueryElasticsearch processor, so > something > > > like > > > > > GenerateFlowFile is a way to achieve that if you want to > periodically > > > > > execute a paginated query, e.g. by setting the Generate processor's > > > > > schedule to run every hour, or use cron syntax, etc. The advantage > with > > > > > this processor is that you can use the output of another processor > > > (e.g. > > > > > build a query using the results of another processor, such as an > > > initial > > > > > query of Elasticsearch) to trigger the paginated query of > > > Elasticsearch, > > > > > but once the query is finished, the processor won't keep firing. > > > > > > > > > > Conversely, SearchElasticsearch does not allow incoming > connections, > > > but > > > > > only triggers the same query on the defined schedule. If the query > > > needs to > > > > > use parameters (or some sort of variable), you need to figure out > how > > > to > > > > > apply that in the Query parameter of the processor - it could be by > > > > > Elasticsearch notation (e.g. "now/d" for the start of the current > day > > > in a > > > > > date range filter), or something that can be achieved using NiFi > > > Expression > > > > > Language [2], but without the flexibility of providing inputs in > > > FlowFile > > > > > content, which could be the output of a previous query, or > > > > > GenerateFlowFile, etc. > > > > > > > > > > You need to figure out what query you want to run, what input(s) > are > > > > > appropriate, and the schedule to which you want to execute. > > > > > > > > > > The Search processor is aimed more at a use case of "I want to > > > continually > > > > > retrieve the contents of an Elasticsearch index/query as it is > > > populated > > > > > from an extremal source", PaginatedQuery is more for "I want to > > > retrieve > > > > > data from Elasticsearch that match a query"; both processors are > meant > > > to > > > > > "allow for the possibility of many documents to be retrieved". > > > > > > > > > > For various reasons, neither processor was designed to hold state > > > between > > > > > initiation of paginated queries, e.g. they don't follow the > pattern of > > > a > > > > > "Consume" or "List" processor that attempts to retain the > knowledge of > > > the > > > > > "last timestamp" within NiFi itself. That's something that could be > > > > > considered, but would need a code change (feel free to raise a jira > > > ticket > > > > > for the future [3] if you think that would be helpful). One of the > > > reasons > > > > > for this is that, unlike an S3 Bucket (for example), documents are > not > > > > > guaranteed to always be indexed within Elasticsearch in order/with > > > such an > > > > > "updated at" field, although one could design their system that > way, of > > > > > course. > > > > > > > > > > [1] > > > > > > > > > https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-elasticsearch-restapi-nar/1.23.0/org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch/index.html > > > > > > > > > > [2] > > > > > > > > > https://nifi.apache.org/docs/nifi-docs/html/expression-language-guide.html > > > > > > > > > > [3] https://issues.apache.org/jira/browse/NIFI > > > > > > > > > > On 2023/08/17 12:43:31 Richard Beare wrote: > > > > > > I must be missing something simple. I've copied the parameters > and > > > query > > > > > > from the SearchElasticSearch processor and I'm not getting > errors, > > > but no > > > > > > flowfiles are produced. > > > > > > > > > > > > I'm forced to add an input connection, despite coding the query > in > > > the > > > > > > Query property. I have a GenerateFlowFile processor connected. > I'm > > > > > using.a > > > > > > basic match all as a starting point > > > > > > { > > > > > > "query" : > > > > > > { > > > > > > "match_all" : {} > > > > > > } > > > > > > } > > > > > > > > > > > > Sending the query via curl appears to work OK - I get a page of > stuff > > > > > back. > > > > > > I'm using nifi 1.20. > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Aug 17, 2023 at 2:24 PM Chris Sampson <chr...@apache.org > > > > > wrote: > > > > > > > > > > > > > Elasticsearch doesn't have a CDC-like capability (it doesn't > > > maintain a > > > > > > > transaction log or such), so that approach isn't possible. > > > > > > > > > > > > > > What I've done previously is to maintain an audit log in a > separate > > > > > index > > > > > > > within elasticsearch to track what data I've previously posted, > > > e.g. > > > > > this > > > > > > > might be the last "updated_date" value read from the data > index in > > > a > > > > > > > previous run of the nifi processor. So your nifi Flow would be > > > > > something > > > > > > > like: > > > > > > > > > > > > > > Query for latest processed updated_date > paginated query for > all > > > new > > > > > data > > > > > > > > determine new latest updated_date (e.g. using QueryRecord) > > put > > > new > > > > > > > latest updated_date into elasticsearch, ready for the next run > > > > > > > > > > > > > > On 2023/08/16 23:15:19 Richard Beare wrote: > > > > > > > > One further question - what is the recommended way of > checking > > > for > > > > > > > updates > > > > > > > > in an index and fetching new records in a similar manner to > > > > > > > > GenerateTableFetch for an sql DB? > > > > > > > > Thanks > > > > > > > > > > > > > > > > On Thu, Aug 17, 2023 at 7:21 AM Richard Beare < > > > > > richard.be...@gmail.com> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Sounds perfect. Thanks > > > > > > > > > > > > > > > > > > On Thu, Aug 17, 2023 at 5:11 AM Chris Sampson < > > > chr...@apache.org> > > > > > > > wrote: > > > > > > > > > > > > > > > > > >> What you describe sounds like the processor is working as > > > > > designed & > > > > > > > > >> documented, i.e. it will restart the same query once it > has > > > > > reached > > > > > > > the end > > > > > > > > >> of the paginated scroll (or search_after, or > point-in-time) > > > query. > > > > > > > > >> > > > > > > > > >> Instead, it sounds like you want to try using the > > > > > > > > >> PaginatedJsonQueryElasticsearch [1] processor instead. > This > > > will > > > > > > > execute > > > > > > > > >> the query given to it, either as the query property or the > > > body > > > > > of an > > > > > > > > >> incoming FlowFile, output the results, and then stop. > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> [1] > > > > > > > > >> > > > > > > > > > > > > > > > > https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-elasticsearch-restapi-nar/1.23.0/org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch/index.html > > > > > > > > >> > > > > > > > > >> On 2023/08/16 07:57:43 Richard Beare wrote: > > > > > > > > >> > Hi, > > > > > > > > >> > I am using the SearchElasticSearch (1.20.0) processor to > > > > > retrieve > > > > > > > all > > > > > > > > >> > documents (~20M) from an index, process and eventually > > > return > > > > > > > results > > > > > > > > >> to a > > > > > > > > >> > new index, although for this test I'm retrieving and > > > processing > > > > > then > > > > > > > > >> > discarding. I'm using opensearch. > > > > > > > > >> > > > > > > > > > >> > My problem is that the process restarts after > completion - I > > > > > > > discovered > > > > > > > > >> > this, and docs confirm, after seeing warnings from my > > > processing > > > > > > > code > > > > > > > > >> > (which reformats json ready for other work) being > repeated > > > for > > > > > the > > > > > > > same > > > > > > > > >> > document ID. > > > > > > > > >> > > > > > > > > > >> > How do I configure the processor to stop after the > > > completing > > > > > the > > > > > > > first > > > > > > > > >> > query. > > > > > > > > >> > > > > > > > > > >> > I've tried the following: > > > > > > > > >> > > > > > > > > > >> > Query: {"query" : {"match_all" :{}}} > > > > > > > > >> > > > > > > > > > >> > with pagination_type SCROLL > > > > > > > > >> > > > > > > > > > >> > I haven't found a combination of the properties that > doesn't > > > > > lead to > > > > > > > > >> > repeated cycles through the index. > > > > > > > > >> > > > > > > > > > >> > I've also tried {"query" : {"match_all" :{}}, "sort" : > > > > > > > > >> [{"Visit_DateTime" : > > > > > > > > >> > "asc"]}} > > > > > > > > >> > > > > > > > > > >> > and SEARCH_AFTER pagination type, with the same problem. > > > > > > > > >> > > > > > > > > > >> > What am I missing? > > > > > > > > >> > Thanks > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >