I'm repeatedly selecting the min and max date stamp using a
SearchElasticSearch processor to begin creating the query generator.

The query looks like:
{
"size" : 0,
"aggs" : {
"newest" : { "max" : { "field" : "Visit_DateTime"}},
"oldest" : { "min" : { "field" : "Visit_DateTime"}}
}
}

This seems to work, but I always end up with a document in the "hit"
relationship, rather than just the aggregation. I can terminate that
relationship, but it seems strange.

On Sun, Aug 20, 2023 at 3:33 PM Chris Sampson <chr...@apache.org> wrote:

> To retrieve large quantities of data from Elasticsearch into nifi, yes,
> it's probably the best way we have.
>
> The processor's don't currently use slicing (parallelism) internally for
> the Elasticsearch queries, but as you're writing a query for every month,
> you could increase the processor's Concurrency, and therefore run multiple
> queries in parallel at that level - bear in mind the impact this will have
> on your system resources and bandwidth, so test it and increase the
> Concurrency incrementally.
>
> If/when you get to the point of having less data to pull, e.g. just the
> most recent data, you could switch to one of the other Elasticsearch
> processors of you wanted, but sticking with the Paginated processors would
> give some safety for occasionally having large amounts of data to pull -
> the point of pagination primarily being to reduce the impact on your
> Elasticsearch instance/cluster and network.
>
> In terms of flowfile size in nifi, there's nothing wrong having multiple
> GB or even TB of content in a single file, but ideally you'd want to stick
> to Record-based processors if you need to make any changes to the content
> once it's in nifi.
>
> The flowfile size might be important for your data destination, but again,
> you can always split flowfiles up in nifi, e.g. using SplitRecord or
> PartitionRecord, etc.
>
> On 2023/08/19 23:06:50 Richard Beare wrote:
> > Good points - I've done some testing.
> >
> > About 1-2 minutes for 1 month's data with 1k page sizes and about half
> that
> > for 10k. About 8-10 minutes for 1 years worth of data at 10k pages.
> >
> > Per month looks like the sweet spot in terms of size - that's about
> > 500-750MB.
> >
> > In terms of building the upstream tools to generate the queries, is the
> > paginatedjsonquery the way to go to retrieve the oldest and most recent
> > date from an index?
> >
> >
> > ~
> >
> > On Sun, Aug 20, 2023 at 1:53 AM Chris Sampson <chr...@apache.org> wrote:
> >
> > > I'd guess it depends on what you want to achieve downstream, e.g. would
> > > setting the query processor to output per_query and return everything
> in 1
> > > to be useful? Internally, the processor is so fetching everything in
> pages
> > > from Elasticsearch, setting the size higher will reduce the number of
> > > network round-trips, but note that nifi will hold the entire response
> from
> > > Elasticsearch in memory until it is written to a flowfile - this is
> fine
> > > before the next loop within the processor, even if the prices session
> isn't
> > > committed and you don't see the output for a while.
> > >
> > > You've a choice to make between number of network calls (page fetches),
> > > number of queries (which kind of amounts to the same thing really),
> page
> > > size in memory (will impact both nifi and elasticsearch, as well as
> network
> > > performance), and number of flowfiles you want to deal with downstream
> -
> > > having all your data in a single flowfile might be useful, if you can
> use
> > > Record-based processors for everything you want to do later - the fewer
> > > flowfiles you have, the more performance your flow is likely to be
> (general
> > > oversimplification).
> > >
> > > How long did it take for you to fetch a day of data using 1k page
> sizes?
> > > Did it work if you up page size to 10k? How about 10k page for a month
> or a
> > > whole year?
> > >
> > > If you decide to break up the query by time range, e.g. years or
> months,
> > > then a python or groovy script is certainly an option in order to
> generate
> > > the parameters (e.g. attributes on a flowfile) to feed into the query.
> > >
> > > On 2023/08/19 05:05:39 Richard Beare wrote:
> > > > A bit of progress.
> > > > First up, firing a match_all at my index with 20M documents doesn't
> work,
> > > > as you probably expected. Or more precisely, is unlikely to be
> useful - I
> > > > left it overnight and nothing appeared to have happened, so I guess
> it
> > > was
> > > > madly fetching pages and filling up available storage.
> > > >
> > > > So I tested with a query of the form
> > > > {
> > > > query": {
> > > >   "range" : {
> > > >     "Visit_DateTime": {
> > > >       "gte" : "01/07/2020",
> > > >       "lte" : "02/07/2020",
> > > >       "format" : "dd/MM/yyyy"
> > > >     }
> > > >   }
> > > > }
> > > > }
> > > >
> > > > i.e a single days worth of documents (38998 according to a curl
> _count
> > > > version of the query). This did indeed produce 3900 flowfiles in the
> hits
> > > > queue and consume the input.
> > > >
> > > > Including a size parameter as follows:
> > > >
> > > > {
> > > > "size" : 1000,
> > > > query": {
> > > >   "range" : {
> > > >     "Visit_DateTime": {
> > > >       "gte" : "01/07/2020",
> > > >       "lte" : "02/07/2020",
> > > >       "format" : "dd/MM/yyyy"
> > > >     }
> > > >   }
> > > > }
> > > > }
> > > >
> > > > Leads to 39 flowfiles in the hits queue.
> > > >
> > > > So it looks like my best way forward processing many years worth of
> data
> > > is
> > > > to generate a set of day-based queries. Is a python script the best
> > > option?
> > > >
> > > >
> > > >
> > > >
> > > > On Fri, Aug 18, 2023 at 4:03 PM Chris Sampson <chr...@apache.org>
> wrote:
> > > >
> > > > > Ah, so these processors have all been written for Elasticsearch,
> and
> > > use
> > > > > the Elasticsearch low-level REST API library to form connections.
> > > They've
> > > > > not been tested against OpenSearch, although hopefully should work
> for
> > > any
> > > > > interactions where the API is the same, but the two products
> continue
> > > to
> > > > > diverge, so there's increasing chance that some things won't work.
> > > > >
> > > > > Any details of things that aren't working would be good to know
> about
> > > > > (e.g. raised as Jira tickets, containing a much detail as possible,
> > > like
> > > > > the query used and any log details of errors), so that the
> community
> > > could
> > > > > look into providing OpenSearch compatibility in the future.
> > > > >
> > > > > I've known a few people try with OpenSearch and things either
> work, or
> > > we
> > > > > don't hear about the errors that are received, so we don't know
> what
> > > needs
> > > > > looking at from a NiFi perspective.
> > > > >
> > > > > On 2023/08/18 04:37:10 Richard Beare wrote:
> > > > > > I did use the example and got errors. I'll revisit that (perhaps
> it
> > > is an
> > > > > > opensearch idiosyncrasy).  The per response option is probably my
> > > issue.
> > > > > > I'll check that out and get back to you.
> > > > > > Thanks again
> > > > > >
> > > > > > On Fri, Aug 18, 2023 at 2:30 PM Chris Sampson <chr...@apache.org
> >
> > > wrote:
> > > > > >
> > > > > > > Check the example in the processor's additional details docs
> [1]
> > > for
> > > > > how
> > > > > > > you could set size and sort fields for the query - size is
> used to
> > > > > > > determine the number of documents returned per page, sorry is
> > > required
> > > > > if
> > > > > > > using a "search after" or "point in time" query type.
> > > > > > >
> > > > > > > If the Query property is set, the incoming FlowFile content
> should
> > > be
> > > > > > > ignored, i.e. it doesn't need to be empty.
> > > > > > >
> > > > > > > Use the "Search Results Split" property to determine how the
> > > results
> > > > > are
> > > > > > > output. This defaults to "per response", which outputs a
> flowfile
> > > for
> > > > > every
> > > > > > > page of results. As PaginatedJsonQueryElasticsearch takes an
> input
> > > > > > > flowfile, its internal "process session" remains active until
> the
> > > > > processor
> > > > > > > completes and commits is session - this happens when there are
> no
> > > more
> > > > > > > results to retrieve from Elasticsearch, at which point the
> input
> > > > > flowfile
> > > > > > > disappears from the input queue and all output flowfiles
> appear in
> > > the
> > > > > > > output queues. This is how the nifi framework handles
> sessions, but
> > > > > can be
> > > > > > > confusing if you're not aware of that beforehand.
> > > > > > >
> > > > > > > SearchElasticsearch is different in this regard because its
> session
> > > > > ends
> > > > > > > after every iteration (determined by the "Search Results
> Split",
> > > e.g.
> > > > > this
> > > > > > > could be per page or per entire query), then uses nifi state to
> > > setup
> > > > > the
> > > > > > > next iteration. This means you could start to see output
> flowfiles
> > > > > sooner.
> > > > > > >
> > > > > > >
> > > > > > > [1]
> > > > > > >
> > > > >
> > >
> https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-elasticsearch-restapi-nar/1.23.0/org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch/additionalDetails.html
> > > > > > >
> > > > > > > On 2023/08/17 22:13:22 Richard Beare wrote:
> > > > > > > > Thanks, that makes sense. I've had trouble getting a size
> > > parameter
> > > > > > > > accepted, but will work on that later.
> > > > > > > >
> > > > > > > > However, I'm unsure what I should expect to see in the
> following
> > > test
> > > > > > > > scenario.
> > > > > > > >
> > > > > > > > A fixed query in the Query parameter - a match all. i.e.
> nothing
> > > > > dynamic
> > > > > > > > set by upstream processing
> > > > > > > >
> > > > > > > > An empty input flowfile to trigger activity.
> > > > > > > >
> > > > > > > > The test index is large. (20M docs)
> > > > > > > >
> > > > > > > > Do I expect the processor to begin filling the output queue
> as
> > > fast
> > > > > as it
> > > > > > > > can, with one flowfile per received page, pausing as the
> queue
> > > fills?
> > > > > > > > That was what I was anticipating, but at the moment I'm
> getting
> > > no
> > > > > output
> > > > > > > > and the input flowfile isn't being consumed. I suspect one
> flag
> > > is
> > > > > wrong,
> > > > > > > > but can't see it.
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Fri, Aug 18, 2023 at 12:06 AM Chris Sampson <
> > > chr...@apache.org>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Again, sounds like it's working as documented [1] - an
> input is
> > > > > > > required
> > > > > > > > > to trigger the PaginatedJsonQueryElasticsearch processor,
> so
> > > > > something
> > > > > > > like
> > > > > > > > > GenerateFlowFile is a way to achieve that if you want to
> > > > > periodically
> > > > > > > > > execute a paginated query, e.g. by setting the Generate
> > > processor's
> > > > > > > > > schedule to run every hour, or use cron syntax, etc. The
> > > advantage
> > > > > with
> > > > > > > > > this processor is that you can use the output of another
> > > processor
> > > > > > > (e.g.
> > > > > > > > > build a query using the results of another processor, such
> as
> > > an
> > > > > > > initial
> > > > > > > > > query of Elasticsearch) to trigger the paginated query of
> > > > > > > Elasticsearch,
> > > > > > > > > but once the query is finished, the processor won't keep
> > > firing.
> > > > > > > > >
> > > > > > > > > Conversely, SearchElasticsearch does not allow incoming
> > > > > connections,
> > > > > > > but
> > > > > > > > > only triggers the same query on the defined schedule. If
> the
> > > query
> > > > > > > needs to
> > > > > > > > > use parameters (or some sort of variable), you need to
> figure
> > > out
> > > > > how
> > > > > > > to
> > > > > > > > > apply that in the Query parameter of the processor - it
> could
> > > be by
> > > > > > > > > Elasticsearch notation (e.g. "now/d" for the start of the
> > > current
> > > > > day
> > > > > > > in a
> > > > > > > > > date range filter), or something that can be achieved using
> > > NiFi
> > > > > > > Expression
> > > > > > > > > Language [2], but without the flexibility of providing
> inputs
> > > in
> > > > > > > FlowFile
> > > > > > > > > content, which could be the output of a previous query, or
> > > > > > > > > GenerateFlowFile, etc.
> > > > > > > > >
> > > > > > > > > You need to figure out what query you want to run, what
> > > input(s)
> > > > > are
> > > > > > > > > appropriate, and the schedule to which you want to execute.
> > > > > > > > >
> > > > > > > > > The Search processor is aimed more at a use case of "I
> want to
> > > > > > > continually
> > > > > > > > > retrieve the contents of an Elasticsearch index/query as
> it is
> > > > > > > populated
> > > > > > > > > from an extremal source", PaginatedQuery is more for "I
> want to
> > > > > > > retrieve
> > > > > > > > > data from Elasticsearch that match a query"; both
> processors
> > > are
> > > > > meant
> > > > > > > to
> > > > > > > > > "allow for the possibility of many documents to be
> retrieved".
> > > > > > > > >
> > > > > > > > > For various reasons, neither processor was designed to hold
> > > state
> > > > > > > between
> > > > > > > > > initiation of paginated queries, e.g. they don't follow the
> > > > > pattern of
> > > > > > > a
> > > > > > > > > "Consume" or "List" processor that attempts to retain the
> > > > > knowledge of
> > > > > > > the
> > > > > > > > > "last timestamp" within NiFi itself. That's something that
> > > could be
> > > > > > > > > considered, but would need a code change (feel free to
> raise a
> > > jira
> > > > > > > ticket
> > > > > > > > > for the future [3] if you think that would be helpful).
> One of
> > > the
> > > > > > > reasons
> > > > > > > > > for this is that, unlike an S3 Bucket (for example),
> documents
> > > are
> > > > > not
> > > > > > > > > guaranteed to always be indexed within Elasticsearch in
> > > order/with
> > > > > > > such an
> > > > > > > > > "updated at" field, although one could design their system
> that
> > > > > way, of
> > > > > > > > > course.
> > > > > > > > >
> > > > > > > > > [1]
> > > > > > > > >
> > > > > > >
> > > > >
> > >
> https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-elasticsearch-restapi-nar/1.23.0/org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch/index.html
> > > > > > > > >
> > > > > > > > > [2]
> > > > > > > > >
> > > > > > >
> > > > >
> > >
> https://nifi.apache.org/docs/nifi-docs/html/expression-language-guide.html
> > > > > > > > >
> > > > > > > > > [3] https://issues.apache.org/jira/browse/NIFI
> > > > > > > > >
> > > > > > > > > On 2023/08/17 12:43:31 Richard Beare wrote:
> > > > > > > > > > I must be missing something simple. I've copied the
> > > parameters
> > > > > and
> > > > > > > query
> > > > > > > > > > from the SearchElasticSearch processor and I'm not
> getting
> > > > > errors,
> > > > > > > but no
> > > > > > > > > > flowfiles are produced.
> > > > > > > > > >
> > > > > > > > > > I'm forced to add an input connection, despite coding the
> > > query
> > > > > in
> > > > > > > the
> > > > > > > > > > Query property. I have a GenerateFlowFile processor
> > > connected.
> > > > > I'm
> > > > > > > > > using.a
> > > > > > > > > > basic match all as a starting point
> > > > > > > > > > {
> > > > > > > > > > "query" :
> > > > > > > > > >     {
> > > > > > > > > >     "match_all" : {}
> > > > > > > > > >     }
> > > > > > > > > > }
> > > > > > > > > >
> > > > > > > > > > Sending the query via curl appears to work OK - I get a
> page
> > > of
> > > > > stuff
> > > > > > > > > back.
> > > > > > > > > > I'm using nifi 1.20.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Thu, Aug 17, 2023 at 2:24 PM Chris Sampson <
> > > chr...@apache.org
> > > > > >
> > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Elasticsearch doesn't have a CDC-like capability (it
> > > doesn't
> > > > > > > maintain a
> > > > > > > > > > > transaction log or such), so that approach isn't
> possible.
> > > > > > > > > > >
> > > > > > > > > > > What I've done previously is to maintain an audit log
> in a
> > > > > separate
> > > > > > > > > index
> > > > > > > > > > > within elasticsearch to track what data I've previously
> > > posted,
> > > > > > > e.g.
> > > > > > > > > this
> > > > > > > > > > > might be the last "updated_date" value read from the
> data
> > > > > index in
> > > > > > > a
> > > > > > > > > > > previous run of the nifi processor. So your nifi Flow
> > > would be
> > > > > > > > > something
> > > > > > > > > > > like:
> > > > > > > > > > >
> > > > > > > > > > > Query for latest processed updated_date > paginated
> query
> > > for
> > > > > all
> > > > > > > new
> > > > > > > > > data
> > > > > > > > > > > > determine new latest updated_date (e.g. using
> > > QueryRecord) >
> > > > > put
> > > > > > > new
> > > > > > > > > > > latest updated_date into elasticsearch, ready for the
> next
> > > run
> > > > > > > > > > >
> > > > > > > > > > > On 2023/08/16 23:15:19 Richard Beare wrote:
> > > > > > > > > > > > One further question - what is the recommended way of
> > > > > checking
> > > > > > > for
> > > > > > > > > > > updates
> > > > > > > > > > > > in an index and fetching new records in a similar
> manner
> > > to
> > > > > > > > > > > > GenerateTableFetch for an sql DB?
> > > > > > > > > > > > Thanks
> > > > > > > > > > > >
> > > > > > > > > > > > On Thu, Aug 17, 2023 at 7:21 AM Richard Beare <
> > > > > > > > > richard.be...@gmail.com>
> > > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Sounds perfect. Thanks
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Thu, Aug 17, 2023 at 5:11 AM Chris Sampson <
> > > > > > > chr...@apache.org>
> > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > >> What you describe sounds like the processor is
> > > working as
> > > > > > > > > designed &
> > > > > > > > > > > > >> documented, i.e. it will restart the same query
> once
> > > it
> > > > > has
> > > > > > > > > reached
> > > > > > > > > > > the end
> > > > > > > > > > > > >> of the paginated scroll (or search_after, or
> > > > > point-in-time)
> > > > > > > query.
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> Instead, it sounds like you want to try using the
> > > > > > > > > > > > >> PaginatedJsonQueryElasticsearch [1] processor
> instead.
> > > > > This
> > > > > > > will
> > > > > > > > > > > execute
> > > > > > > > > > > > >> the query given to it, either as the query
> property
> > > or the
> > > > > > > body
> > > > > > > > > of an
> > > > > > > > > > > > >> incoming FlowFile, output the results, and then
> stop.
> > > > > > > > > > > > >>
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> [1]
> > > > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > >
> > >
> https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-elasticsearch-restapi-nar/1.23.0/org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch/index.html
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> On 2023/08/16 07:57:43 Richard Beare wrote:
> > > > > > > > > > > > >> > Hi,
> > > > > > > > > > > > >> > I am using the SearchElasticSearch (1.20.0)
> > > processor to
> > > > > > > > > retrieve
> > > > > > > > > > > all
> > > > > > > > > > > > >> > documents (~20M) from an index, process and
> > > eventually
> > > > > > > return
> > > > > > > > > > > results
> > > > > > > > > > > > >> to a
> > > > > > > > > > > > >> > new index, although for this test I'm
> retrieving and
> > > > > > > processing
> > > > > > > > > then
> > > > > > > > > > > > >> > discarding. I'm using opensearch.
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > My problem is that the process restarts after
> > > > > completion - I
> > > > > > > > > > > discovered
> > > > > > > > > > > > >> > this, and docs confirm, after seeing warnings
> from
> > > my
> > > > > > > processing
> > > > > > > > > > > code
> > > > > > > > > > > > >> > (which reformats json ready for other work)
> being
> > > > > repeated
> > > > > > > for
> > > > > > > > > the
> > > > > > > > > > > same
> > > > > > > > > > > > >> > document ID.
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > How do I configure the processor to stop after
> the
> > > > > > > completing
> > > > > > > > > the
> > > > > > > > > > > first
> > > > > > > > > > > > >> > query.
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > I've tried the following:
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > Query: {"query" : {"match_all" :{}}}
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > with pagination_type SCROLL
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > I haven't found a combination of the properties
> that
> > > > > doesn't
> > > > > > > > > lead to
> > > > > > > > > > > > >> > repeated cycles through the index.
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > I've also tried {"query" : {"match_all" :{}},
> > > "sort" :
> > > > > > > > > > > > >> [{"Visit_DateTime" :
> > > > > > > > > > > > >> > "asc"]}}
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > and SEARCH_AFTER pagination type, with the same
> > > problem.
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > What am I missing?
> > > > > > > > > > > > >> > Thanks
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >>
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to