A bit of progress.
First up, firing a match_all at my index with 20M documents doesn't work,
as you probably expected. Or more precisely, is unlikely to be useful - I
left it overnight and nothing appeared to have happened, so I guess it was
madly fetching pages and filling up available storage.

So I tested with a query of the form
{
query": {
  "range" : {
    "Visit_DateTime": {
      "gte" : "01/07/2020",
      "lte" : "02/07/2020",
      "format" : "dd/MM/yyyy"
    }
  }
}
}

i.e a single days worth of documents (38998 according to a curl _count
version of the query). This did indeed produce 3900 flowfiles in the hits
queue and consume the input.

Including a size parameter as follows:

{
"size" : 1000,
query": {
  "range" : {
    "Visit_DateTime": {
      "gte" : "01/07/2020",
      "lte" : "02/07/2020",
      "format" : "dd/MM/yyyy"
    }
  }
}
}

Leads to 39 flowfiles in the hits queue.

So it looks like my best way forward processing many years worth of data is
to generate a set of day-based queries. Is a python script the best option?




On Fri, Aug 18, 2023 at 4:03 PM Chris Sampson <chr...@apache.org> wrote:

> Ah, so these processors have all been written for Elasticsearch, and use
> the Elasticsearch low-level REST API library to form connections. They've
> not been tested against OpenSearch, although hopefully should work for any
> interactions where the API is the same, but the two products continue to
> diverge, so there's increasing chance that some things won't work.
>
> Any details of things that aren't working would be good to know about
> (e.g. raised as Jira tickets, containing a much detail as possible, like
> the query used and any log details of errors), so that the community could
> look into providing OpenSearch compatibility in the future.
>
> I've known a few people try with OpenSearch and things either work, or we
> don't hear about the errors that are received, so we don't know what needs
> looking at from a NiFi perspective.
>
> On 2023/08/18 04:37:10 Richard Beare wrote:
> > I did use the example and got errors. I'll revisit that (perhaps it is an
> > opensearch idiosyncrasy).  The per response option is probably my issue.
> > I'll check that out and get back to you.
> > Thanks again
> >
> > On Fri, Aug 18, 2023 at 2:30 PM Chris Sampson <chr...@apache.org> wrote:
> >
> > > Check the example in the processor's additional details docs [1] for
> how
> > > you could set size and sort fields for the query - size is used to
> > > determine the number of documents returned per page, sorry is required
> if
> > > using a "search after" or "point in time" query type.
> > >
> > > If the Query property is set, the incoming FlowFile content should be
> > > ignored, i.e. it doesn't need to be empty.
> > >
> > > Use the "Search Results Split" property to determine how the results
> are
> > > output. This defaults to "per response", which outputs a flowfile for
> every
> > > page of results. As PaginatedJsonQueryElasticsearch takes an input
> > > flowfile, its internal "process session" remains active until the
> processor
> > > completes and commits is session - this happens when there are no more
> > > results to retrieve from Elasticsearch, at which point the input
> flowfile
> > > disappears from the input queue and all output flowfiles appear in the
> > > output queues. This is how the nifi framework handles sessions, but
> can be
> > > confusing if you're not aware of that beforehand.
> > >
> > > SearchElasticsearch is different in this regard because its session
> ends
> > > after every iteration (determined by the "Search Results Split", e.g.
> this
> > > could be per page or per entire query), then uses nifi state to setup
> the
> > > next iteration. This means you could start to see output flowfiles
> sooner.
> > >
> > >
> > > [1]
> > >
> https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-elasticsearch-restapi-nar/1.23.0/org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch/additionalDetails.html
> > >
> > > On 2023/08/17 22:13:22 Richard Beare wrote:
> > > > Thanks, that makes sense. I've had trouble getting a size parameter
> > > > accepted, but will work on that later.
> > > >
> > > > However, I'm unsure what I should expect to see in the following test
> > > > scenario.
> > > >
> > > > A fixed query in the Query parameter - a match all. i.e. nothing
> dynamic
> > > > set by upstream processing
> > > >
> > > > An empty input flowfile to trigger activity.
> > > >
> > > > The test index is large. (20M docs)
> > > >
> > > > Do I expect the processor to begin filling the output queue as fast
> as it
> > > > can, with one flowfile per received page, pausing as the queue fills?
> > > > That was what I was anticipating, but at the moment I'm getting no
> output
> > > > and the input flowfile isn't being consumed. I suspect one flag is
> wrong,
> > > > but can't see it.
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Fri, Aug 18, 2023 at 12:06 AM Chris Sampson <chr...@apache.org>
> > > wrote:
> > > >
> > > > > Again, sounds like it's working as documented [1] - an input is
> > > required
> > > > > to trigger the PaginatedJsonQueryElasticsearch processor, so
> something
> > > like
> > > > > GenerateFlowFile is a way to achieve that if you want to
> periodically
> > > > > execute a paginated query, e.g. by setting the Generate processor's
> > > > > schedule to run every hour, or use cron syntax, etc. The advantage
> with
> > > > > this processor is that you can use the output of another processor
> > > (e.g.
> > > > > build a query using the results of another processor, such as an
> > > initial
> > > > > query of Elasticsearch) to trigger the paginated query of
> > > Elasticsearch,
> > > > > but once the query is finished, the processor won't keep firing.
> > > > >
> > > > > Conversely, SearchElasticsearch does not allow incoming
> connections,
> > > but
> > > > > only triggers the same query on the defined schedule. If the query
> > > needs to
> > > > > use parameters (or some sort of variable), you need to figure out
> how
> > > to
> > > > > apply that in the Query parameter of the processor - it could be by
> > > > > Elasticsearch notation (e.g. "now/d" for the start of the current
> day
> > > in a
> > > > > date range filter), or something that can be achieved using NiFi
> > > Expression
> > > > > Language [2], but without the flexibility of providing inputs in
> > > FlowFile
> > > > > content, which could be the output of a previous query, or
> > > > > GenerateFlowFile, etc.
> > > > >
> > > > > You need to figure out what query you want to run, what input(s)
> are
> > > > > appropriate, and the schedule to which you want to execute.
> > > > >
> > > > > The Search processor is aimed more at a use case of "I want to
> > > continually
> > > > > retrieve the contents of an Elasticsearch index/query as it is
> > > populated
> > > > > from an extremal source", PaginatedQuery is more for "I want to
> > > retrieve
> > > > > data from Elasticsearch that match a query"; both processors are
> meant
> > > to
> > > > > "allow for the possibility of many documents to be retrieved".
> > > > >
> > > > > For various reasons, neither processor was designed to hold state
> > > between
> > > > > initiation of paginated queries, e.g. they don't follow the
> pattern of
> > > a
> > > > > "Consume" or "List" processor that attempts to retain the
> knowledge of
> > > the
> > > > > "last timestamp" within NiFi itself. That's something that could be
> > > > > considered, but would need a code change (feel free to raise a jira
> > > ticket
> > > > > for the future [3] if you think that would be helpful). One of the
> > > reasons
> > > > > for this is that, unlike an S3 Bucket (for example), documents are
> not
> > > > > guaranteed to always be indexed within Elasticsearch in order/with
> > > such an
> > > > > "updated at" field, although one could design their system that
> way, of
> > > > > course.
> > > > >
> > > > > [1]
> > > > >
> > >
> https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-elasticsearch-restapi-nar/1.23.0/org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch/index.html
> > > > >
> > > > > [2]
> > > > >
> > >
> https://nifi.apache.org/docs/nifi-docs/html/expression-language-guide.html
> > > > >
> > > > > [3] https://issues.apache.org/jira/browse/NIFI
> > > > >
> > > > > On 2023/08/17 12:43:31 Richard Beare wrote:
> > > > > > I must be missing something simple. I've copied the parameters
> and
> > > query
> > > > > > from the SearchElasticSearch processor and I'm not getting
> errors,
> > > but no
> > > > > > flowfiles are produced.
> > > > > >
> > > > > > I'm forced to add an input connection, despite coding the query
> in
> > > the
> > > > > > Query property. I have a GenerateFlowFile processor connected.
> I'm
> > > > > using.a
> > > > > > basic match all as a starting point
> > > > > > {
> > > > > > "query" :
> > > > > >     {
> > > > > >     "match_all" : {}
> > > > > >     }
> > > > > > }
> > > > > >
> > > > > > Sending the query via curl appears to work OK - I get a page of
> stuff
> > > > > back.
> > > > > > I'm using nifi 1.20.
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Thu, Aug 17, 2023 at 2:24 PM Chris Sampson <chr...@apache.org
> >
> > > wrote:
> > > > > >
> > > > > > > Elasticsearch doesn't have a CDC-like capability (it doesn't
> > > maintain a
> > > > > > > transaction log or such), so that approach isn't possible.
> > > > > > >
> > > > > > > What I've done previously is to maintain an audit log in a
> separate
> > > > > index
> > > > > > > within elasticsearch to track what data I've previously posted,
> > > e.g.
> > > > > this
> > > > > > > might be the last "updated_date" value read from the data
> index in
> > > a
> > > > > > > previous run of the nifi processor. So your nifi Flow would be
> > > > > something
> > > > > > > like:
> > > > > > >
> > > > > > > Query for latest processed updated_date > paginated query for
> all
> > > new
> > > > > data
> > > > > > > > determine new latest updated_date (e.g. using QueryRecord) >
> put
> > > new
> > > > > > > latest updated_date into elasticsearch, ready for the next run
> > > > > > >
> > > > > > > On 2023/08/16 23:15:19 Richard Beare wrote:
> > > > > > > > One further question - what is the recommended way of
> checking
> > > for
> > > > > > > updates
> > > > > > > > in an index and fetching new records in a similar manner to
> > > > > > > > GenerateTableFetch for an sql DB?
> > > > > > > > Thanks
> > > > > > > >
> > > > > > > > On Thu, Aug 17, 2023 at 7:21 AM Richard Beare <
> > > > > richard.be...@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Sounds perfect. Thanks
> > > > > > > > >
> > > > > > > > > On Thu, Aug 17, 2023 at 5:11 AM Chris Sampson <
> > > chr...@apache.org>
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > >> What you describe sounds like the processor is working as
> > > > > designed &
> > > > > > > > >> documented, i.e. it will restart the same query once it
> has
> > > > > reached
> > > > > > > the end
> > > > > > > > >> of the paginated scroll (or search_after, or
> point-in-time)
> > > query.
> > > > > > > > >>
> > > > > > > > >> Instead, it sounds like you want to try using the
> > > > > > > > >> PaginatedJsonQueryElasticsearch [1] processor instead.
> This
> > > will
> > > > > > > execute
> > > > > > > > >> the query given to it, either as the query property or the
> > > body
> > > > > of an
> > > > > > > > >> incoming FlowFile, output the results, and then stop.
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >> [1]
> > > > > > > > >>
> > > > > > >
> > > > >
> > >
> https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-elasticsearch-restapi-nar/1.23.0/org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch/index.html
> > > > > > > > >>
> > > > > > > > >> On 2023/08/16 07:57:43 Richard Beare wrote:
> > > > > > > > >> > Hi,
> > > > > > > > >> > I am using the SearchElasticSearch (1.20.0) processor to
> > > > > retrieve
> > > > > > > all
> > > > > > > > >> > documents (~20M) from an index, process and eventually
> > > return
> > > > > > > results
> > > > > > > > >> to a
> > > > > > > > >> > new index, although for this test I'm retrieving and
> > > processing
> > > > > then
> > > > > > > > >> > discarding. I'm using opensearch.
> > > > > > > > >> >
> > > > > > > > >> > My problem is that the process restarts after
> completion - I
> > > > > > > discovered
> > > > > > > > >> > this, and docs confirm, after seeing warnings from my
> > > processing
> > > > > > > code
> > > > > > > > >> > (which reformats json ready for other work) being
> repeated
> > > for
> > > > > the
> > > > > > > same
> > > > > > > > >> > document ID.
> > > > > > > > >> >
> > > > > > > > >> > How do I configure the processor to stop after the
> > > completing
> > > > > the
> > > > > > > first
> > > > > > > > >> > query.
> > > > > > > > >> >
> > > > > > > > >> > I've tried the following:
> > > > > > > > >> >
> > > > > > > > >> > Query: {"query" : {"match_all" :{}}}
> > > > > > > > >> >
> > > > > > > > >> > with pagination_type SCROLL
> > > > > > > > >> >
> > > > > > > > >> > I haven't found a combination of the properties that
> doesn't
> > > > > lead to
> > > > > > > > >> > repeated cycles through the index.
> > > > > > > > >> >
> > > > > > > > >> > I've also tried {"query" : {"match_all" :{}}, "sort" :
> > > > > > > > >> [{"Visit_DateTime" :
> > > > > > > > >> > "asc"]}}
> > > > > > > > >> >
> > > > > > > > >> > and SEARCH_AFTER pagination type, with the same problem.
> > > > > > > > >> >
> > > > > > > > >> > What am I missing?
> > > > > > > > >> > Thanks
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to