Thanks, that makes sense. I've had trouble getting a size parameter accepted, but will work on that later.
However, I'm unsure what I should expect to see in the following test scenario. A fixed query in the Query parameter - a match all. i.e. nothing dynamic set by upstream processing An empty input flowfile to trigger activity. The test index is large. (20M docs) Do I expect the processor to begin filling the output queue as fast as it can, with one flowfile per received page, pausing as the queue fills? That was what I was anticipating, but at the moment I'm getting no output and the input flowfile isn't being consumed. I suspect one flag is wrong, but can't see it. On Fri, Aug 18, 2023 at 12:06 AM Chris Sampson <chr...@apache.org> wrote: > Again, sounds like it's working as documented [1] - an input is required > to trigger the PaginatedJsonQueryElasticsearch processor, so something like > GenerateFlowFile is a way to achieve that if you want to periodically > execute a paginated query, e.g. by setting the Generate processor's > schedule to run every hour, or use cron syntax, etc. The advantage with > this processor is that you can use the output of another processor (e.g. > build a query using the results of another processor, such as an initial > query of Elasticsearch) to trigger the paginated query of Elasticsearch, > but once the query is finished, the processor won't keep firing. > > Conversely, SearchElasticsearch does not allow incoming connections, but > only triggers the same query on the defined schedule. If the query needs to > use parameters (or some sort of variable), you need to figure out how to > apply that in the Query parameter of the processor - it could be by > Elasticsearch notation (e.g. "now/d" for the start of the current day in a > date range filter), or something that can be achieved using NiFi Expression > Language [2], but without the flexibility of providing inputs in FlowFile > content, which could be the output of a previous query, or > GenerateFlowFile, etc. > > You need to figure out what query you want to run, what input(s) are > appropriate, and the schedule to which you want to execute. > > The Search processor is aimed more at a use case of "I want to continually > retrieve the contents of an Elasticsearch index/query as it is populated > from an extremal source", PaginatedQuery is more for "I want to retrieve > data from Elasticsearch that match a query"; both processors are meant to > "allow for the possibility of many documents to be retrieved". > > For various reasons, neither processor was designed to hold state between > initiation of paginated queries, e.g. they don't follow the pattern of a > "Consume" or "List" processor that attempts to retain the knowledge of the > "last timestamp" within NiFi itself. That's something that could be > considered, but would need a code change (feel free to raise a jira ticket > for the future [3] if you think that would be helpful). One of the reasons > for this is that, unlike an S3 Bucket (for example), documents are not > guaranteed to always be indexed within Elasticsearch in order/with such an > "updated at" field, although one could design their system that way, of > course. > > [1] > https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-elasticsearch-restapi-nar/1.23.0/org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch/index.html > > [2] > https://nifi.apache.org/docs/nifi-docs/html/expression-language-guide.html > > [3] https://issues.apache.org/jira/browse/NIFI > > On 2023/08/17 12:43:31 Richard Beare wrote: > > I must be missing something simple. I've copied the parameters and query > > from the SearchElasticSearch processor and I'm not getting errors, but no > > flowfiles are produced. > > > > I'm forced to add an input connection, despite coding the query in the > > Query property. I have a GenerateFlowFile processor connected. I'm > using.a > > basic match all as a starting point > > { > > "query" : > > { > > "match_all" : {} > > } > > } > > > > Sending the query via curl appears to work OK - I get a page of stuff > back. > > I'm using nifi 1.20. > > > > > > > > On Thu, Aug 17, 2023 at 2:24 PM Chris Sampson <chr...@apache.org> wrote: > > > > > Elasticsearch doesn't have a CDC-like capability (it doesn't maintain a > > > transaction log or such), so that approach isn't possible. > > > > > > What I've done previously is to maintain an audit log in a separate > index > > > within elasticsearch to track what data I've previously posted, e.g. > this > > > might be the last "updated_date" value read from the data index in a > > > previous run of the nifi processor. So your nifi Flow would be > something > > > like: > > > > > > Query for latest processed updated_date > paginated query for all new > data > > > > determine new latest updated_date (e.g. using QueryRecord) > put new > > > latest updated_date into elasticsearch, ready for the next run > > > > > > On 2023/08/16 23:15:19 Richard Beare wrote: > > > > One further question - what is the recommended way of checking for > > > updates > > > > in an index and fetching new records in a similar manner to > > > > GenerateTableFetch for an sql DB? > > > > Thanks > > > > > > > > On Thu, Aug 17, 2023 at 7:21 AM Richard Beare < > richard.be...@gmail.com> > > > > wrote: > > > > > > > > > Sounds perfect. Thanks > > > > > > > > > > On Thu, Aug 17, 2023 at 5:11 AM Chris Sampson <chr...@apache.org> > > > wrote: > > > > > > > > > >> What you describe sounds like the processor is working as > designed & > > > > >> documented, i.e. it will restart the same query once it has > reached > > > the end > > > > >> of the paginated scroll (or search_after, or point-in-time) query. > > > > >> > > > > >> Instead, it sounds like you want to try using the > > > > >> PaginatedJsonQueryElasticsearch [1] processor instead. This will > > > execute > > > > >> the query given to it, either as the query property or the body > of an > > > > >> incoming FlowFile, output the results, and then stop. > > > > >> > > > > >> > > > > >> [1] > > > > >> > > > > https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-elasticsearch-restapi-nar/1.23.0/org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch/index.html > > > > >> > > > > >> On 2023/08/16 07:57:43 Richard Beare wrote: > > > > >> > Hi, > > > > >> > I am using the SearchElasticSearch (1.20.0) processor to > retrieve > > > all > > > > >> > documents (~20M) from an index, process and eventually return > > > results > > > > >> to a > > > > >> > new index, although for this test I'm retrieving and processing > then > > > > >> > discarding. I'm using opensearch. > > > > >> > > > > > >> > My problem is that the process restarts after completion - I > > > discovered > > > > >> > this, and docs confirm, after seeing warnings from my processing > > > code > > > > >> > (which reformats json ready for other work) being repeated for > the > > > same > > > > >> > document ID. > > > > >> > > > > > >> > How do I configure the processor to stop after the completing > the > > > first > > > > >> > query. > > > > >> > > > > > >> > I've tried the following: > > > > >> > > > > > >> > Query: {"query" : {"match_all" :{}}} > > > > >> > > > > > >> > with pagination_type SCROLL > > > > >> > > > > > >> > I haven't found a combination of the properties that doesn't > lead to > > > > >> > repeated cycles through the index. > > > > >> > > > > > >> > I've also tried {"query" : {"match_all" :{}}, "sort" : > > > > >> [{"Visit_DateTime" : > > > > >> > "asc"]}} > > > > >> > > > > > >> > and SEARCH_AFTER pagination type, with the same problem. > > > > >> > > > > > >> > What am I missing? > > > > >> > Thanks > > > > >> > > > > > >> > > > > > > > > > > > > > > >