Thanks, that makes sense. I've had trouble getting a size parameter
accepted, but will work on that later.

However, I'm unsure what I should expect to see in the following test
scenario.

A fixed query in the Query parameter - a match all. i.e. nothing dynamic
set by upstream processing

An empty input flowfile to trigger activity.

The test index is large. (20M docs)

Do I expect the processor to begin filling the output queue as fast as it
can, with one flowfile per received page, pausing as the queue fills?
That was what I was anticipating, but at the moment I'm getting no output
and the input flowfile isn't being consumed. I suspect one flag is wrong,
but can't see it.







On Fri, Aug 18, 2023 at 12:06 AM Chris Sampson <chr...@apache.org> wrote:

> Again, sounds like it's working as documented [1] - an input is required
> to trigger the PaginatedJsonQueryElasticsearch processor, so something like
> GenerateFlowFile is a way to achieve that if you want to periodically
> execute a paginated query, e.g. by setting the Generate processor's
> schedule to run every hour, or use cron syntax, etc. The advantage with
> this processor is that you can use the output of another processor (e.g.
> build a query using the results of another processor, such as an initial
> query of Elasticsearch) to trigger the paginated query of Elasticsearch,
> but once the query is finished, the processor won't keep firing.
>
> Conversely, SearchElasticsearch does not allow incoming connections, but
> only triggers the same query on the defined schedule. If the query needs to
> use parameters (or some sort of variable), you need to figure out how to
> apply that in the Query parameter of the processor - it could be by
> Elasticsearch notation (e.g. "now/d" for the start of the current day in a
> date range filter), or something that can be achieved using NiFi Expression
> Language [2], but without the flexibility of providing inputs in FlowFile
> content, which could be the output of a previous query, or
> GenerateFlowFile, etc.
>
> You need to figure out what query you want to run, what input(s) are
> appropriate, and the schedule to which you want to execute.
>
> The Search processor is aimed more at a use case of "I want to continually
> retrieve the contents of an Elasticsearch index/query as it is populated
> from an extremal source", PaginatedQuery is more for "I want to retrieve
> data from Elasticsearch that match a query"; both processors are meant to
> "allow for the possibility of many documents to be retrieved".
>
> For various reasons, neither processor was designed to hold state between
> initiation of paginated queries, e.g. they don't follow the pattern of a
> "Consume" or "List" processor that attempts to retain the knowledge of the
> "last timestamp" within NiFi itself. That's something that could be
> considered, but would need a code change (feel free to raise a jira ticket
> for the future [3] if you think that would be helpful). One of the reasons
> for this is that, unlike an S3 Bucket (for example), documents are not
> guaranteed to always be indexed within Elasticsearch in order/with such an
> "updated at" field, although one could design their system that way, of
> course.
>
> [1]
> https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-elasticsearch-restapi-nar/1.23.0/org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch/index.html
>
> [2]
> https://nifi.apache.org/docs/nifi-docs/html/expression-language-guide.html
>
> [3] https://issues.apache.org/jira/browse/NIFI
>
> On 2023/08/17 12:43:31 Richard Beare wrote:
> > I must be missing something simple. I've copied the parameters and query
> > from the SearchElasticSearch processor and I'm not getting errors, but no
> > flowfiles are produced.
> >
> > I'm forced to add an input connection, despite coding the query in the
> > Query property. I have a GenerateFlowFile processor connected. I'm
> using.a
> > basic match all as a starting point
> > {
> > "query" :
> >     {
> >     "match_all" : {}
> >     }
> > }
> >
> > Sending the query via curl appears to work OK - I get a page of stuff
> back.
> > I'm using nifi 1.20.
> >
> >
> >
> > On Thu, Aug 17, 2023 at 2:24 PM Chris Sampson <chr...@apache.org> wrote:
> >
> > > Elasticsearch doesn't have a CDC-like capability (it doesn't maintain a
> > > transaction log or such), so that approach isn't possible.
> > >
> > > What I've done previously is to maintain an audit log in a separate
> index
> > > within elasticsearch to track what data I've previously posted, e.g.
> this
> > > might be the last "updated_date" value read from the data index in a
> > > previous run of the nifi processor. So your nifi Flow would be
> something
> > > like:
> > >
> > > Query for latest processed updated_date > paginated query for all new
> data
> > > > determine new latest updated_date (e.g. using QueryRecord) > put new
> > > latest updated_date into elasticsearch, ready for the next run
> > >
> > > On 2023/08/16 23:15:19 Richard Beare wrote:
> > > > One further question - what is the recommended way of checking for
> > > updates
> > > > in an index and fetching new records in a similar manner to
> > > > GenerateTableFetch for an sql DB?
> > > > Thanks
> > > >
> > > > On Thu, Aug 17, 2023 at 7:21 AM Richard Beare <
> richard.be...@gmail.com>
> > > > wrote:
> > > >
> > > > > Sounds perfect. Thanks
> > > > >
> > > > > On Thu, Aug 17, 2023 at 5:11 AM Chris Sampson <chr...@apache.org>
> > > wrote:
> > > > >
> > > > >> What you describe sounds like the processor is working as
> designed &
> > > > >> documented, i.e. it will restart the same query once it has
> reached
> > > the end
> > > > >> of the paginated scroll (or search_after, or point-in-time) query.
> > > > >>
> > > > >> Instead, it sounds like you want to try using the
> > > > >> PaginatedJsonQueryElasticsearch [1] processor instead. This will
> > > execute
> > > > >> the query given to it, either as the query property or the body
> of an
> > > > >> incoming FlowFile, output the results, and then stop.
> > > > >>
> > > > >>
> > > > >> [1]
> > > > >>
> > >
> https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-elasticsearch-restapi-nar/1.23.0/org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch/index.html
> > > > >>
> > > > >> On 2023/08/16 07:57:43 Richard Beare wrote:
> > > > >> > Hi,
> > > > >> > I am using the SearchElasticSearch (1.20.0) processor to
> retrieve
> > > all
> > > > >> > documents (~20M) from an index, process and eventually return
> > > results
> > > > >> to a
> > > > >> > new index, although for this test I'm retrieving and processing
> then
> > > > >> > discarding. I'm using opensearch.
> > > > >> >
> > > > >> > My problem is that the process restarts after completion - I
> > > discovered
> > > > >> > this, and docs confirm, after seeing warnings from my processing
> > > code
> > > > >> > (which reformats json ready for other work) being repeated for
> the
> > > same
> > > > >> > document ID.
> > > > >> >
> > > > >> > How do I configure the processor to stop after the completing
> the
> > > first
> > > > >> > query.
> > > > >> >
> > > > >> > I've tried the following:
> > > > >> >
> > > > >> > Query: {"query" : {"match_all" :{}}}
> > > > >> >
> > > > >> > with pagination_type SCROLL
> > > > >> >
> > > > >> > I haven't found a combination of the properties that doesn't
> lead to
> > > > >> > repeated cycles through the index.
> > > > >> >
> > > > >> > I've also tried {"query" : {"match_all" :{}}, "sort" :
> > > > >> [{"Visit_DateTime" :
> > > > >> > "asc"]}}
> > > > >> >
> > > > >> > and SEARCH_AFTER pagination type, with the same problem.
> > > > >> >
> > > > >> > What am I missing?
> > > > >> > Thanks
> > > > >> >
> > > > >>
> > > > >
> > > >
> > >
> >
>

Reply via email to