Using SearchElasticsearch for just an aggregation feels like it might not be the right choice (maybe look at JsonQueryElasticsearch instead), or are the dates constantly changing, i.e. new data is always appearing, so you want to keep triggering the flow, and you want to use this as the starting processor of your flow? You could, if course, set this processor to only run on a slower schedule e.g. once per hour/day, etc. N.B. JsonQueryElasticsearch allows, but doesn't require, an incoming connection, so you can use it as an initial processor within a flow.
Do you have "Output No Hits" set to true? That would explain the empty flowfile behaviour. If not, I know that there have been a couple of changes/fixes in that area in recent versions (you mention your on 1.20.0, latest now is 1.23.1), so it could be something that had now been fixed, or a bug with the processor. If you have "No Hits" set to false, please raise a jira with a much detail about your processor settings as you can provide, and it could be something for the community to look at fixing in a new version (or checking whether it's still an issue in the latest versions, if you're not in a position to try that yourself). On 2023/08/21 06:59:58 Richard Beare wrote: > I'm repeatedly selecting the min and max date stamp using a > SearchElasticSearch processor to begin creating the query generator. > > The query looks like: > { > "size" : 0, > "aggs" : { > "newest" : { "max" : { "field" : "Visit_DateTime"}}, > "oldest" : { "min" : { "field" : "Visit_DateTime"}} > } > } > > This seems to work, but I always end up with a document in the "hit" > relationship, rather than just the aggregation. I can terminate that > relationship, but it seems strange. > > On Sun, Aug 20, 2023 at 3:33 PM Chris Sampson <chr...@apache.org> wrote: > > > To retrieve large quantities of data from Elasticsearch into nifi, yes, > > it's probably the best way we have. > > > > The processor's don't currently use slicing (parallelism) internally for > > the Elasticsearch queries, but as you're writing a query for every month, > > you could increase the processor's Concurrency, and therefore run multiple > > queries in parallel at that level - bear in mind the impact this will have > > on your system resources and bandwidth, so test it and increase the > > Concurrency incrementally. > > > > If/when you get to the point of having less data to pull, e.g. just the > > most recent data, you could switch to one of the other Elasticsearch > > processors of you wanted, but sticking with the Paginated processors would > > give some safety for occasionally having large amounts of data to pull - > > the point of pagination primarily being to reduce the impact on your > > Elasticsearch instance/cluster and network. > > > > In terms of flowfile size in nifi, there's nothing wrong having multiple > > GB or even TB of content in a single file, but ideally you'd want to stick > > to Record-based processors if you need to make any changes to the content > > once it's in nifi. > > > > The flowfile size might be important for your data destination, but again, > > you can always split flowfiles up in nifi, e.g. using SplitRecord or > > PartitionRecord, etc. > > > > On 2023/08/19 23:06:50 Richard Beare wrote: > > > Good points - I've done some testing. > > > > > > About 1-2 minutes for 1 month's data with 1k page sizes and about half > > that > > > for 10k. About 8-10 minutes for 1 years worth of data at 10k pages. > > > > > > Per month looks like the sweet spot in terms of size - that's about > > > 500-750MB. > > > > > > In terms of building the upstream tools to generate the queries, is the > > > paginatedjsonquery the way to go to retrieve the oldest and most recent > > > date from an index? > > > > > > > > > ~ > > > > > > On Sun, Aug 20, 2023 at 1:53 AM Chris Sampson <chr...@apache.org> wrote: > > > > > > > I'd guess it depends on what you want to achieve downstream, e.g. would > > > > setting the query processor to output per_query and return everything > > in 1 > > > > to be useful? Internally, the processor is so fetching everything in > > pages > > > > from Elasticsearch, setting the size higher will reduce the number of > > > > network round-trips, but note that nifi will hold the entire response > > from > > > > Elasticsearch in memory until it is written to a flowfile - this is > > fine > > > > before the next loop within the processor, even if the prices session > > isn't > > > > committed and you don't see the output for a while. > > > > > > > > You've a choice to make between number of network calls (page fetches), > > > > number of queries (which kind of amounts to the same thing really), > > page > > > > size in memory (will impact both nifi and elasticsearch, as well as > > network > > > > performance), and number of flowfiles you want to deal with downstream > > - > > > > having all your data in a single flowfile might be useful, if you can > > use > > > > Record-based processors for everything you want to do later - the fewer > > > > flowfiles you have, the more performance your flow is likely to be > > (general > > > > oversimplification). > > > > > > > > How long did it take for you to fetch a day of data using 1k page > > sizes? > > > > Did it work if you up page size to 10k? How about 10k page for a month > > or a > > > > whole year? > > > > > > > > If you decide to break up the query by time range, e.g. years or > > months, > > > > then a python or groovy script is certainly an option in order to > > generate > > > > the parameters (e.g. attributes on a flowfile) to feed into the query. > > > > > > > > On 2023/08/19 05:05:39 Richard Beare wrote: > > > > > A bit of progress. > > > > > First up, firing a match_all at my index with 20M documents doesn't > > work, > > > > > as you probably expected. Or more precisely, is unlikely to be > > useful - I > > > > > left it overnight and nothing appeared to have happened, so I guess > > it > > > > was > > > > > madly fetching pages and filling up available storage. > > > > > > > > > > So I tested with a query of the form > > > > > { > > > > > query": { > > > > > "range" : { > > > > > "Visit_DateTime": { > > > > > "gte" : "01/07/2020", > > > > > "lte" : "02/07/2020", > > > > > "format" : "dd/MM/yyyy" > > > > > } > > > > > } > > > > > } > > > > > } > > > > > > > > > > i.e a single days worth of documents (38998 according to a curl > > _count > > > > > version of the query). This did indeed produce 3900 flowfiles in the > > hits > > > > > queue and consume the input. > > > > > > > > > > Including a size parameter as follows: > > > > > > > > > > { > > > > > "size" : 1000, > > > > > query": { > > > > > "range" : { > > > > > "Visit_DateTime": { > > > > > "gte" : "01/07/2020", > > > > > "lte" : "02/07/2020", > > > > > "format" : "dd/MM/yyyy" > > > > > } > > > > > } > > > > > } > > > > > } > > > > > > > > > > Leads to 39 flowfiles in the hits queue. > > > > > > > > > > So it looks like my best way forward processing many years worth of > > data > > > > is > > > > > to generate a set of day-based queries. Is a python script the best > > > > option? > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Aug 18, 2023 at 4:03 PM Chris Sampson <chr...@apache.org> > > wrote: > > > > > > > > > > > Ah, so these processors have all been written for Elasticsearch, > > and > > > > use > > > > > > the Elasticsearch low-level REST API library to form connections. > > > > They've > > > > > > not been tested against OpenSearch, although hopefully should work > > for > > > > any > > > > > > interactions where the API is the same, but the two products > > continue > > > > to > > > > > > diverge, so there's increasing chance that some things won't work. > > > > > > > > > > > > Any details of things that aren't working would be good to know > > about > > > > > > (e.g. raised as Jira tickets, containing a much detail as possible, > > > > like > > > > > > the query used and any log details of errors), so that the > > community > > > > could > > > > > > look into providing OpenSearch compatibility in the future. > > > > > > > > > > > > I've known a few people try with OpenSearch and things either > > work, or > > > > we > > > > > > don't hear about the errors that are received, so we don't know > > what > > > > needs > > > > > > looking at from a NiFi perspective. > > > > > > > > > > > > On 2023/08/18 04:37:10 Richard Beare wrote: > > > > > > > I did use the example and got errors. I'll revisit that (perhaps > > it > > > > is an > > > > > > > opensearch idiosyncrasy). The per response option is probably my > > > > issue. > > > > > > > I'll check that out and get back to you. > > > > > > > Thanks again > > > > > > > > > > > > > > On Fri, Aug 18, 2023 at 2:30 PM Chris Sampson <chr...@apache.org > > > > > > > wrote: > > > > > > > > > > > > > > > Check the example in the processor's additional details docs > > [1] > > > > for > > > > > > how > > > > > > > > you could set size and sort fields for the query - size is > > used to > > > > > > > > determine the number of documents returned per page, sorry is > > > > required > > > > > > if > > > > > > > > using a "search after" or "point in time" query type. > > > > > > > > > > > > > > > > If the Query property is set, the incoming FlowFile content > > should > > > > be > > > > > > > > ignored, i.e. it doesn't need to be empty. > > > > > > > > > > > > > > > > Use the "Search Results Split" property to determine how the > > > > results > > > > > > are > > > > > > > > output. This defaults to "per response", which outputs a > > flowfile > > > > for > > > > > > every > > > > > > > > page of results. As PaginatedJsonQueryElasticsearch takes an > > input > > > > > > > > flowfile, its internal "process session" remains active until > > the > > > > > > processor > > > > > > > > completes and commits is session - this happens when there are > > no > > > > more > > > > > > > > results to retrieve from Elasticsearch, at which point the > > input > > > > > > flowfile > > > > > > > > disappears from the input queue and all output flowfiles > > appear in > > > > the > > > > > > > > output queues. This is how the nifi framework handles > > sessions, but > > > > > > can be > > > > > > > > confusing if you're not aware of that beforehand. > > > > > > > > > > > > > > > > SearchElasticsearch is different in this regard because its > > session > > > > > > ends > > > > > > > > after every iteration (determined by the "Search Results > > Split", > > > > e.g. > > > > > > this > > > > > > > > could be per page or per entire query), then uses nifi state to > > > > setup > > > > > > the > > > > > > > > next iteration. This means you could start to see output > > flowfiles > > > > > > sooner. > > > > > > > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-elasticsearch-restapi-nar/1.23.0/org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch/additionalDetails.html > > > > > > > > > > > > > > > > On 2023/08/17 22:13:22 Richard Beare wrote: > > > > > > > > > Thanks, that makes sense. I've had trouble getting a size > > > > parameter > > > > > > > > > accepted, but will work on that later. > > > > > > > > > > > > > > > > > > However, I'm unsure what I should expect to see in the > > following > > > > test > > > > > > > > > scenario. > > > > > > > > > > > > > > > > > > A fixed query in the Query parameter - a match all. i.e. > > nothing > > > > > > dynamic > > > > > > > > > set by upstream processing > > > > > > > > > > > > > > > > > > An empty input flowfile to trigger activity. > > > > > > > > > > > > > > > > > > The test index is large. (20M docs) > > > > > > > > > > > > > > > > > > Do I expect the processor to begin filling the output queue > > as > > > > fast > > > > > > as it > > > > > > > > > can, with one flowfile per received page, pausing as the > > queue > > > > fills? > > > > > > > > > That was what I was anticipating, but at the moment I'm > > getting > > > > no > > > > > > output > > > > > > > > > and the input flowfile isn't being consumed. I suspect one > > flag > > > > is > > > > > > wrong, > > > > > > > > > but can't see it. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Aug 18, 2023 at 12:06 AM Chris Sampson < > > > > chr...@apache.org> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Again, sounds like it's working as documented [1] - an > > input is > > > > > > > > required > > > > > > > > > > to trigger the PaginatedJsonQueryElasticsearch processor, > > so > > > > > > something > > > > > > > > like > > > > > > > > > > GenerateFlowFile is a way to achieve that if you want to > > > > > > periodically > > > > > > > > > > execute a paginated query, e.g. by setting the Generate > > > > processor's > > > > > > > > > > schedule to run every hour, or use cron syntax, etc. The > > > > advantage > > > > > > with > > > > > > > > > > this processor is that you can use the output of another > > > > processor > > > > > > > > (e.g. > > > > > > > > > > build a query using the results of another processor, such > > as > > > > an > > > > > > > > initial > > > > > > > > > > query of Elasticsearch) to trigger the paginated query of > > > > > > > > Elasticsearch, > > > > > > > > > > but once the query is finished, the processor won't keep > > > > firing. > > > > > > > > > > > > > > > > > > > > Conversely, SearchElasticsearch does not allow incoming > > > > > > connections, > > > > > > > > but > > > > > > > > > > only triggers the same query on the defined schedule. If > > the > > > > query > > > > > > > > needs to > > > > > > > > > > use parameters (or some sort of variable), you need to > > figure > > > > out > > > > > > how > > > > > > > > to > > > > > > > > > > apply that in the Query parameter of the processor - it > > could > > > > be by > > > > > > > > > > Elasticsearch notation (e.g. "now/d" for the start of the > > > > current > > > > > > day > > > > > > > > in a > > > > > > > > > > date range filter), or something that can be achieved using > > > > NiFi > > > > > > > > Expression > > > > > > > > > > Language [2], but without the flexibility of providing > > inputs > > > > in > > > > > > > > FlowFile > > > > > > > > > > content, which could be the output of a previous query, or > > > > > > > > > > GenerateFlowFile, etc. > > > > > > > > > > > > > > > > > > > > You need to figure out what query you want to run, what > > > > input(s) > > > > > > are > > > > > > > > > > appropriate, and the schedule to which you want to execute. > > > > > > > > > > > > > > > > > > > > The Search processor is aimed more at a use case of "I > > want to > > > > > > > > continually > > > > > > > > > > retrieve the contents of an Elasticsearch index/query as > > it is > > > > > > > > populated > > > > > > > > > > from an extremal source", PaginatedQuery is more for "I > > want to > > > > > > > > retrieve > > > > > > > > > > data from Elasticsearch that match a query"; both > > processors > > > > are > > > > > > meant > > > > > > > > to > > > > > > > > > > "allow for the possibility of many documents to be > > retrieved". > > > > > > > > > > > > > > > > > > > > For various reasons, neither processor was designed to hold > > > > state > > > > > > > > between > > > > > > > > > > initiation of paginated queries, e.g. they don't follow the > > > > > > pattern of > > > > > > > > a > > > > > > > > > > "Consume" or "List" processor that attempts to retain the > > > > > > knowledge of > > > > > > > > the > > > > > > > > > > "last timestamp" within NiFi itself. That's something that > > > > could be > > > > > > > > > > considered, but would need a code change (feel free to > > raise a > > > > jira > > > > > > > > ticket > > > > > > > > > > for the future [3] if you think that would be helpful). > > One of > > > > the > > > > > > > > reasons > > > > > > > > > > for this is that, unlike an S3 Bucket (for example), > > documents > > > > are > > > > > > not > > > > > > > > > > guaranteed to always be indexed within Elasticsearch in > > > > order/with > > > > > > > > such an > > > > > > > > > > "updated at" field, although one could design their system > > that > > > > > > way, of > > > > > > > > > > course. > > > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-elasticsearch-restapi-nar/1.23.0/org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch/index.html > > > > > > > > > > > > > > > > > > > > [2] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://nifi.apache.org/docs/nifi-docs/html/expression-language-guide.html > > > > > > > > > > > > > > > > > > > > [3] https://issues.apache.org/jira/browse/NIFI > > > > > > > > > > > > > > > > > > > > On 2023/08/17 12:43:31 Richard Beare wrote: > > > > > > > > > > > I must be missing something simple. I've copied the > > > > parameters > > > > > > and > > > > > > > > query > > > > > > > > > > > from the SearchElasticSearch processor and I'm not > > getting > > > > > > errors, > > > > > > > > but no > > > > > > > > > > > flowfiles are produced. > > > > > > > > > > > > > > > > > > > > > > I'm forced to add an input connection, despite coding the > > > > query > > > > > > in > > > > > > > > the > > > > > > > > > > > Query property. I have a GenerateFlowFile processor > > > > connected. > > > > > > I'm > > > > > > > > > > using.a > > > > > > > > > > > basic match all as a starting point > > > > > > > > > > > { > > > > > > > > > > > "query" : > > > > > > > > > > > { > > > > > > > > > > > "match_all" : {} > > > > > > > > > > > } > > > > > > > > > > > } > > > > > > > > > > > > > > > > > > > > > > Sending the query via curl appears to work OK - I get a > > page > > > > of > > > > > > stuff > > > > > > > > > > back. > > > > > > > > > > > I'm using nifi 1.20. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Aug 17, 2023 at 2:24 PM Chris Sampson < > > > > chr...@apache.org > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Elasticsearch doesn't have a CDC-like capability (it > > > > doesn't > > > > > > > > maintain a > > > > > > > > > > > > transaction log or such), so that approach isn't > > possible. > > > > > > > > > > > > > > > > > > > > > > > > What I've done previously is to maintain an audit log > > in a > > > > > > separate > > > > > > > > > > index > > > > > > > > > > > > within elasticsearch to track what data I've previously > > > > posted, > > > > > > > > e.g. > > > > > > > > > > this > > > > > > > > > > > > might be the last "updated_date" value read from the > > data > > > > > > index in > > > > > > > > a > > > > > > > > > > > > previous run of the nifi processor. So your nifi Flow > > > > would be > > > > > > > > > > something > > > > > > > > > > > > like: > > > > > > > > > > > > > > > > > > > > > > > > Query for latest processed updated_date > paginated > > query > > > > for > > > > > > all > > > > > > > > new > > > > > > > > > > data > > > > > > > > > > > > > determine new latest updated_date (e.g. using > > > > QueryRecord) > > > > > > > put > > > > > > > > new > > > > > > > > > > > > latest updated_date into elasticsearch, ready for the > > next > > > > run > > > > > > > > > > > > > > > > > > > > > > > > On 2023/08/16 23:15:19 Richard Beare wrote: > > > > > > > > > > > > > One further question - what is the recommended way of > > > > > > checking > > > > > > > > for > > > > > > > > > > > > updates > > > > > > > > > > > > > in an index and fetching new records in a similar > > manner > > > > to > > > > > > > > > > > > > GenerateTableFetch for an sql DB? > > > > > > > > > > > > > Thanks > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Aug 17, 2023 at 7:21 AM Richard Beare < > > > > > > > > > > richard.be...@gmail.com> > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > Sounds perfect. Thanks > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Aug 17, 2023 at 5:11 AM Chris Sampson < > > > > > > > > chr...@apache.org> > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > >> What you describe sounds like the processor is > > > > working as > > > > > > > > > > designed & > > > > > > > > > > > > > >> documented, i.e. it will restart the same query > > once > > > > it > > > > > > has > > > > > > > > > > reached > > > > > > > > > > > > the end > > > > > > > > > > > > > >> of the paginated scroll (or search_after, or > > > > > > point-in-time) > > > > > > > > query. > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> Instead, it sounds like you want to try using the > > > > > > > > > > > > > >> PaginatedJsonQueryElasticsearch [1] processor > > instead. > > > > > > This > > > > > > > > will > > > > > > > > > > > > execute > > > > > > > > > > > > > >> the query given to it, either as the query > > property > > > > or the > > > > > > > > body > > > > > > > > > > of an > > > > > > > > > > > > > >> incoming FlowFile, output the results, and then > > stop. > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> [1] > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-elasticsearch-restapi-nar/1.23.0/org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch/index.html > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> On 2023/08/16 07:57:43 Richard Beare wrote: > > > > > > > > > > > > > >> > Hi, > > > > > > > > > > > > > >> > I am using the SearchElasticSearch (1.20.0) > > > > processor to > > > > > > > > > > retrieve > > > > > > > > > > > > all > > > > > > > > > > > > > >> > documents (~20M) from an index, process and > > > > eventually > > > > > > > > return > > > > > > > > > > > > results > > > > > > > > > > > > > >> to a > > > > > > > > > > > > > >> > new index, although for this test I'm > > retrieving and > > > > > > > > processing > > > > > > > > > > then > > > > > > > > > > > > > >> > discarding. I'm using opensearch. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > My problem is that the process restarts after > > > > > > completion - I > > > > > > > > > > > > discovered > > > > > > > > > > > > > >> > this, and docs confirm, after seeing warnings > > from > > > > my > > > > > > > > processing > > > > > > > > > > > > code > > > > > > > > > > > > > >> > (which reformats json ready for other work) > > being > > > > > > repeated > > > > > > > > for > > > > > > > > > > the > > > > > > > > > > > > same > > > > > > > > > > > > > >> > document ID. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > How do I configure the processor to stop after > > the > > > > > > > > completing > > > > > > > > > > the > > > > > > > > > > > > first > > > > > > > > > > > > > >> > query. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > I've tried the following: > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > Query: {"query" : {"match_all" :{}}} > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > with pagination_type SCROLL > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > I haven't found a combination of the properties > > that > > > > > > doesn't > > > > > > > > > > lead to > > > > > > > > > > > > > >> > repeated cycles through the index. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > I've also tried {"query" : {"match_all" :{}}, > > > > "sort" : > > > > > > > > > > > > > >> [{"Visit_DateTime" : > > > > > > > > > > > > > >> > "asc"]}} > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > and SEARCH_AFTER pagination type, with the same > > > > problem. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > What am I missing? > > > > > > > > > > > > > >> > Thanks > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >