Again, sounds like it's working as documented [1] - an input is required to 
trigger the PaginatedJsonQueryElasticsearch processor, so something like 
GenerateFlowFile is a way to achieve that if you want to periodically execute a 
paginated query, e.g. by setting the Generate processor's schedule to run every 
hour, or use cron syntax, etc. The advantage with this processor is that you 
can use the output of another processor (e.g. build a query using the results 
of another processor, such as an initial query of Elasticsearch) to trigger the 
paginated query of Elasticsearch, but once the query is finished, the processor 
won't keep firing.

Conversely, SearchElasticsearch does not allow incoming connections, but only 
triggers the same query on the defined schedule. If the query needs to use 
parameters (or some sort of variable), you need to figure out how to apply that 
in the Query parameter of the processor - it could be by Elasticsearch notation 
(e.g. "now/d" for the start of the current day in a date range filter), or 
something that can be achieved using NiFi Expression Language [2], but without 
the flexibility of providing inputs in FlowFile content, which could be the 
output of a previous query, or GenerateFlowFile, etc.

You need to figure out what query you want to run, what input(s) are 
appropriate, and the schedule to which you want to execute.

The Search processor is aimed more at a use case of "I want to continually 
retrieve the contents of an Elasticsearch index/query as it is populated from 
an extremal source", PaginatedQuery is more for "I want to retrieve data from 
Elasticsearch that match a query"; both processors are meant to "allow for the 
possibility of many documents to be retrieved".

For various reasons, neither processor was designed to hold state between 
initiation of paginated queries, e.g. they don't follow the pattern of a 
"Consume" or "List" processor that attempts to retain the knowledge of the 
"last timestamp" within NiFi itself. That's something that could be considered, 
but would need a code change (feel free to raise a jira ticket for the future 
[3] if you think that would be helpful). One of the reasons for this is that, 
unlike an S3 Bucket (for example), documents are not guaranteed to always be 
indexed within Elasticsearch in order/with such an "updated at" field, although 
one could design their system that way, of course.

[1] 
https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-elasticsearch-restapi-nar/1.23.0/org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch/index.html

[2] https://nifi.apache.org/docs/nifi-docs/html/expression-language-guide.html

[3] https://issues.apache.org/jira/browse/NIFI

On 2023/08/17 12:43:31 Richard Beare wrote:
> I must be missing something simple. I've copied the parameters and query
> from the SearchElasticSearch processor and I'm not getting errors, but no
> flowfiles are produced.
> 
> I'm forced to add an input connection, despite coding the query in the
> Query property. I have a GenerateFlowFile processor connected. I'm using.a
> basic match all as a starting point
> {
> "query" :
>     {
>     "match_all" : {}
>     }
> }
> 
> Sending the query via curl appears to work OK - I get a page of stuff back.
> I'm using nifi 1.20.
> 
> 
> 
> On Thu, Aug 17, 2023 at 2:24 PM Chris Sampson <chr...@apache.org> wrote:
> 
> > Elasticsearch doesn't have a CDC-like capability (it doesn't maintain a
> > transaction log or such), so that approach isn't possible.
> >
> > What I've done previously is to maintain an audit log in a separate index
> > within elasticsearch to track what data I've previously posted, e.g. this
> > might be the last "updated_date" value read from the data index in a
> > previous run of the nifi processor. So your nifi Flow would be something
> > like:
> >
> > Query for latest processed updated_date > paginated query for all new data
> > > determine new latest updated_date (e.g. using QueryRecord) > put new
> > latest updated_date into elasticsearch, ready for the next run
> >
> > On 2023/08/16 23:15:19 Richard Beare wrote:
> > > One further question - what is the recommended way of checking for
> > updates
> > > in an index and fetching new records in a similar manner to
> > > GenerateTableFetch for an sql DB?
> > > Thanks
> > >
> > > On Thu, Aug 17, 2023 at 7:21 AM Richard Beare <richard.be...@gmail.com>
> > > wrote:
> > >
> > > > Sounds perfect. Thanks
> > > >
> > > > On Thu, Aug 17, 2023 at 5:11 AM Chris Sampson <chr...@apache.org>
> > wrote:
> > > >
> > > >> What you describe sounds like the processor is working as designed &
> > > >> documented, i.e. it will restart the same query once it has reached
> > the end
> > > >> of the paginated scroll (or search_after, or point-in-time) query.
> > > >>
> > > >> Instead, it sounds like you want to try using the
> > > >> PaginatedJsonQueryElasticsearch [1] processor instead. This will
> > execute
> > > >> the query given to it, either as the query property or the body of an
> > > >> incoming FlowFile, output the results, and then stop.
> > > >>
> > > >>
> > > >> [1]
> > > >>
> > https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-elasticsearch-restapi-nar/1.23.0/org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch/index.html
> > > >>
> > > >> On 2023/08/16 07:57:43 Richard Beare wrote:
> > > >> > Hi,
> > > >> > I am using the SearchElasticSearch (1.20.0) processor to retrieve
> > all
> > > >> > documents (~20M) from an index, process and eventually return
> > results
> > > >> to a
> > > >> > new index, although for this test I'm retrieving and processing then
> > > >> > discarding. I'm using opensearch.
> > > >> >
> > > >> > My problem is that the process restarts after completion - I
> > discovered
> > > >> > this, and docs confirm, after seeing warnings from my processing
> > code
> > > >> > (which reformats json ready for other work) being repeated for the
> > same
> > > >> > document ID.
> > > >> >
> > > >> > How do I configure the processor to stop after the completing the
> > first
> > > >> > query.
> > > >> >
> > > >> > I've tried the following:
> > > >> >
> > > >> > Query: {"query" : {"match_all" :{}}}
> > > >> >
> > > >> > with pagination_type SCROLL
> > > >> >
> > > >> > I haven't found a combination of the properties that doesn't lead to
> > > >> > repeated cycles through the index.
> > > >> >
> > > >> > I've also tried {"query" : {"match_all" :{}}, "sort" :
> > > >> [{"Visit_DateTime" :
> > > >> > "asc"]}}
> > > >> >
> > > >> > and SEARCH_AFTER pagination type, with the same problem.
> > > >> >
> > > >> > What am I missing?
> > > >> > Thanks
> > > >> >
> > > >>
> > > >
> > >
> >
> 

Reply via email to