[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2021-01-15 Thread Michael Gibney (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17266244#comment-17266244
 ] 

Michael Gibney commented on SOLR-15036:
---

{quote}So it is certainly worth investigating.
{quote}
Great, this makes sense! And I think leveraging JSON Facet impl really would be 
a viable way to go (to avoid duplication, etc.)

Re: JSON Facet "streaming" method: yes, absolutely! Non-stream JSON Facet 
iterates over docs, incrementing an accumulation of counts based on docValues 
term ords. IIUC this is analogous to the way that export/drill/rollup works 
(with the fundamental difference that the latter works against BytesRefs). This 
similarity is why I focused on thinking about the non-stream JSON-Facet methods 
... to compare apples to apples.

JSON Facet {{stream}} method works by iterating over indexed terms (in index 
order) and calculating intersection with the docSet domain -- so yeah, 
completely different from either export/drill/rollup or JSON Facet non-stream. 
I'm not as familiar with JSON Facet {{stream}} method, but that should 
definitely be part of the new issue that feels like it's coalescing here. I 
also have unresolved questions about the performance tradeoffs involved; it's 
tempting to believe "streaming == better!", but there may be cases (even 
high-cardinality) where non-streaming count accumulation via docValues could 
perform better than a streaming implementation? To complicate things further, 
it occurs to me that it should be possible for the {{docValues}} accumulation 
case to stream the _serialization_, even if the _accumulation_ itself is not 
"streaming" ...

I would be willing to open an issue for this ... but I probably won't be able 
to actually work on it for the foreseeable future, and in any case such an 
issue might be better opened by someone more familiar with the higher-level 
context (streaming expressions). I'd be happy to weigh in on aspects related to 
leveraging JSON Facet under the hood, if that would be helpful.

> Use plist automatically for executing a facet expression against a collection 
> alias backed by multiple collections
> --
>
> Key: SOLR-15036
> URL: https://issues.apache.org/jira/browse/SOLR-15036
> Project: Solr
>  Issue Type: Improvement
>  Security Level: Public(Default Security Level. Issues are Public) 
>  Components: streaming expressions
>Reporter: Timothy Potter
>Assignee: Timothy Potter
>Priority: Major
> Fix For: 8.8, master (9.0)
>
> Attachments: relay-approach.patch
>
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> For analytics use cases, streaming expressions make it possible to compute 
> basic aggregations (count, min, max, sum, and avg) over massive data sets. 
> Moreover, with massive data sets, it is common to use collection aliases over 
> many underlying collections, for instance time-partitioned aliases backed by 
> a set of collections, each covering a specific time range. In some cases, we 
> can end up with many collections (think 50-60) each with 100's of shards. 
> Aliases help insulate client applications from complex collection topologies 
> on the server side.
> Let's take a basic facet expression that computes some useful aggregation 
> metrics:
> {code:java}
> facet(
>   some_alias, 
>   q="*:*", 
>   fl="a_i", 
>   sort="a_i asc", 
>   buckets="a_i", 
>   bucketSorts="count(*) asc", 
>   bucketSizeLimit=1, 
>   sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
> )
> {code}
> Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr 
> which then expands the alias to a list of collections. For each collection, 
> the top-level distributed query controller gathers a candidate set of 
> replicas to query and then scatters {{distrib=false}} queries to each replica 
> in the list. For instance, if we have 60 collections with 200 shards each, 
> then this results in 12,000 shard requests from the query controller node to 
> the other nodes in the cluster. The requests are sent in an async manner (see 
> {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases 
> where we hit 18,000 replicas and these queries don’t always come back in a 
> timely manner. Put simply, this also puts a lot of load on the top-level 
> query controller node in terms of open connections and new object creation.
> Instead, we can use {{plist}} to send the JSON facet query to each collection 
> in the alias in parallel, which reduces the overhead of each top-level 
> distributed query from 12,000 to 200 in my example above. With this approach, 
> you’ll then need to sort the tuples back from each collection and do a 
> rollup, something like:
> {code:java}
> select(
>   

[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2021-01-15 Thread Joel Bernstein (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17266196#comment-17266196
 ] 

Joel Bernstein commented on SOLR-15036:
---

It would be good to understand how JSON facets Streaming is implemented as 
well. There are no free lunches here, in order to achieve the map/reduce effect 
you need to have ordered buckets. JSON facet streaming achieves this in a way 
that has different performance characteristics then the JSON facet in-memory 
approach.

> Use plist automatically for executing a facet expression against a collection 
> alias backed by multiple collections
> --
>
> Key: SOLR-15036
> URL: https://issues.apache.org/jira/browse/SOLR-15036
> Project: Solr
>  Issue Type: Improvement
>  Security Level: Public(Default Security Level. Issues are Public) 
>  Components: streaming expressions
>Reporter: Timothy Potter
>Assignee: Timothy Potter
>Priority: Major
> Fix For: 8.8, master (9.0)
>
> Attachments: relay-approach.patch
>
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> For analytics use cases, streaming expressions make it possible to compute 
> basic aggregations (count, min, max, sum, and avg) over massive data sets. 
> Moreover, with massive data sets, it is common to use collection aliases over 
> many underlying collections, for instance time-partitioned aliases backed by 
> a set of collections, each covering a specific time range. In some cases, we 
> can end up with many collections (think 50-60) each with 100's of shards. 
> Aliases help insulate client applications from complex collection topologies 
> on the server side.
> Let's take a basic facet expression that computes some useful aggregation 
> metrics:
> {code:java}
> facet(
>   some_alias, 
>   q="*:*", 
>   fl="a_i", 
>   sort="a_i asc", 
>   buckets="a_i", 
>   bucketSorts="count(*) asc", 
>   bucketSizeLimit=1, 
>   sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
> )
> {code}
> Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr 
> which then expands the alias to a list of collections. For each collection, 
> the top-level distributed query controller gathers a candidate set of 
> replicas to query and then scatters {{distrib=false}} queries to each replica 
> in the list. For instance, if we have 60 collections with 200 shards each, 
> then this results in 12,000 shard requests from the query controller node to 
> the other nodes in the cluster. The requests are sent in an async manner (see 
> {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases 
> where we hit 18,000 replicas and these queries don’t always come back in a 
> timely manner. Put simply, this also puts a lot of load on the top-level 
> query controller node in terms of open connections and new object creation.
> Instead, we can use {{plist}} to send the JSON facet query to each collection 
> in the alias in parallel, which reduces the overhead of each top-level 
> distributed query from 12,000 to 200 in my example above. With this approach, 
> you’ll then need to sort the tuples back from each collection and do a 
> rollup, something like:
> {code:java}
> select(
>   rollup(
> sort(
>   plist(
> select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt),
> select(facet(coll2,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt)
>   ),
>   by="a_i asc"
> ),
> over="a_i",
> sum(the_sum), avg(the_avg), min(the_min), max(the_max), sum(cnt)
>   ),
>   a_i, sum(the_sum) as the_sum, avg(the_avg) as the_avg, min(the_min) as 
> the_min, max(the_max) as the_max, sum(cnt) as cnt
> )
> {code}
> One thing to point out is that you can’t just avg. the averages back from 
> each collection in the rollup. It needs to be a *weighted avg.* when rolling 
> up the avg. from each facet expression in the plist. However, we have the 
> count per collection, so this is doable but will require some changes to the 
> rollup expression to support weighted average.
> While this plist approach is doable, it’s a pain for users to have to create 
> the rollup / sort over plist expression for collection aliases. After all, 
> aliases are supposed to hide these types of complexities from client 
> applications!
> The point of this 

[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2021-01-15 Thread Joel Bernstein (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17266191#comment-17266191
 ] 

Joel Bernstein commented on SOLR-15036:
---

The drill approach does some caching of BytesRef based on the sort order. So if 
the ord is the same as last ord it just uses the last BytesRef. So if there are 
long runs of the same ord it will help quite a bit. That being said you're 
right that we could do a better job probably of minimizing BytesRef lookups by 
aggregating on the ordinals. So it is certainly worth investigating.

> Use plist automatically for executing a facet expression against a collection 
> alias backed by multiple collections
> --
>
> Key: SOLR-15036
> URL: https://issues.apache.org/jira/browse/SOLR-15036
> Project: Solr
>  Issue Type: Improvement
>  Security Level: Public(Default Security Level. Issues are Public) 
>  Components: streaming expressions
>Reporter: Timothy Potter
>Assignee: Timothy Potter
>Priority: Major
> Fix For: 8.8, master (9.0)
>
> Attachments: relay-approach.patch
>
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> For analytics use cases, streaming expressions make it possible to compute 
> basic aggregations (count, min, max, sum, and avg) over massive data sets. 
> Moreover, with massive data sets, it is common to use collection aliases over 
> many underlying collections, for instance time-partitioned aliases backed by 
> a set of collections, each covering a specific time range. In some cases, we 
> can end up with many collections (think 50-60) each with 100's of shards. 
> Aliases help insulate client applications from complex collection topologies 
> on the server side.
> Let's take a basic facet expression that computes some useful aggregation 
> metrics:
> {code:java}
> facet(
>   some_alias, 
>   q="*:*", 
>   fl="a_i", 
>   sort="a_i asc", 
>   buckets="a_i", 
>   bucketSorts="count(*) asc", 
>   bucketSizeLimit=1, 
>   sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
> )
> {code}
> Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr 
> which then expands the alias to a list of collections. For each collection, 
> the top-level distributed query controller gathers a candidate set of 
> replicas to query and then scatters {{distrib=false}} queries to each replica 
> in the list. For instance, if we have 60 collections with 200 shards each, 
> then this results in 12,000 shard requests from the query controller node to 
> the other nodes in the cluster. The requests are sent in an async manner (see 
> {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases 
> where we hit 18,000 replicas and these queries don’t always come back in a 
> timely manner. Put simply, this also puts a lot of load on the top-level 
> query controller node in terms of open connections and new object creation.
> Instead, we can use {{plist}} to send the JSON facet query to each collection 
> in the alias in parallel, which reduces the overhead of each top-level 
> distributed query from 12,000 to 200 in my example above. With this approach, 
> you’ll then need to sort the tuples back from each collection and do a 
> rollup, something like:
> {code:java}
> select(
>   rollup(
> sort(
>   plist(
> select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt),
> select(facet(coll2,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt)
>   ),
>   by="a_i asc"
> ),
> over="a_i",
> sum(the_sum), avg(the_avg), min(the_min), max(the_max), sum(cnt)
>   ),
>   a_i, sum(the_sum) as the_sum, avg(the_avg) as the_avg, min(the_min) as 
> the_min, max(the_max) as the_max, sum(cnt) as cnt
> )
> {code}
> One thing to point out is that you can’t just avg. the averages back from 
> each collection in the rollup. It needs to be a *weighted avg.* when rolling 
> up the avg. from each facet expression in the plist. However, we have the 
> count per collection, so this is doable but will require some changes to the 
> rollup expression to support weighted average.
> While this plist approach is doable, it’s a pain for users to have to create 
> the rollup / sort over plist expression for collection aliases. After all, 
> aliases are supposed to hide these types of 

[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2021-01-15 Thread Michael Gibney (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17266173#comment-17266173
 ] 

Michael Gibney commented on SOLR-15036:
---

{quote}The cardinality in this case is the combination of all the fields. But 
having one huge field would also cause performance problems.
{quote}
Ah ok ... I'd be curious what the comparable threshold (and performance 
differential below-, at-, and above-threshold) would be for the 
single-dimensional case, if you have a moment to check that!

{quote}When you get into the high cardinality use case almost all the time is 
going be spent looking up the BytesRefs. You'll have to do that with either 
JSON facet or export. JSON facet still needs to lookup the ByteRef's after the 
aggregation to send them out. So it's going to be slow in either case.
{quote}
Hmm ... this is the crux of my question. I really don't think this is an 
apples-to-apples comparison. Sure, single-dimensional JSON-Facet ultimately 
converts term ords to BytesRefs to serialize the response -- but that's 
per-term, _after_ all of the counts have been accumulated, with all of the 
per-term-per-doc comparisons relying strictly on term ords. The 
{{drill}}/{{export}}/{{rollup}} case, iiuc, converts to BytesRef immediately 
per-term-per-doc, so in addition to the overhead of looking up the BytesRef for 
every term _for every doc_, all comparisons and accumulation data structures 
are also BytesRef-based. That's not the case for JSON-Facet.

> Use plist automatically for executing a facet expression against a collection 
> alias backed by multiple collections
> --
>
> Key: SOLR-15036
> URL: https://issues.apache.org/jira/browse/SOLR-15036
> Project: Solr
>  Issue Type: Improvement
>  Security Level: Public(Default Security Level. Issues are Public) 
>  Components: streaming expressions
>Reporter: Timothy Potter
>Assignee: Timothy Potter
>Priority: Major
> Fix For: 8.8, master (9.0)
>
> Attachments: relay-approach.patch
>
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> For analytics use cases, streaming expressions make it possible to compute 
> basic aggregations (count, min, max, sum, and avg) over massive data sets. 
> Moreover, with massive data sets, it is common to use collection aliases over 
> many underlying collections, for instance time-partitioned aliases backed by 
> a set of collections, each covering a specific time range. In some cases, we 
> can end up with many collections (think 50-60) each with 100's of shards. 
> Aliases help insulate client applications from complex collection topologies 
> on the server side.
> Let's take a basic facet expression that computes some useful aggregation 
> metrics:
> {code:java}
> facet(
>   some_alias, 
>   q="*:*", 
>   fl="a_i", 
>   sort="a_i asc", 
>   buckets="a_i", 
>   bucketSorts="count(*) asc", 
>   bucketSizeLimit=1, 
>   sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
> )
> {code}
> Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr 
> which then expands the alias to a list of collections. For each collection, 
> the top-level distributed query controller gathers a candidate set of 
> replicas to query and then scatters {{distrib=false}} queries to each replica 
> in the list. For instance, if we have 60 collections with 200 shards each, 
> then this results in 12,000 shard requests from the query controller node to 
> the other nodes in the cluster. The requests are sent in an async manner (see 
> {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases 
> where we hit 18,000 replicas and these queries don’t always come back in a 
> timely manner. Put simply, this also puts a lot of load on the top-level 
> query controller node in terms of open connections and new object creation.
> Instead, we can use {{plist}} to send the JSON facet query to each collection 
> in the alias in parallel, which reduces the overhead of each top-level 
> distributed query from 12,000 to 200 in my example above. With this approach, 
> you’ll then need to sort the tuples back from each collection and do a 
> rollup, something like:
> {code:java}
> select(
>   rollup(
> sort(
>   plist(
> select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt),
> select(facet(coll2,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as 

[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2021-01-14 Thread Joel Bernstein (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17265616#comment-17265616
 ] 

Joel Bernstein commented on SOLR-15036:
---

The cardinality in this case is the combination of all the fields. But having 
one huge field would also cause performance problems. 

When you get into the high cardinality use case almost all the time is going be 
spent looking up the BytesRefs. You'll have to do that with either JSON facet 
or export. JSON facet still needs to lookup the ByteRef's after the aggregation 
to send them out. So it's going to be slow in either case. But it's worth 
trying JSON facet in streaming mode to see how it compares to drill. 

> Use plist automatically for executing a facet expression against a collection 
> alias backed by multiple collections
> --
>
> Key: SOLR-15036
> URL: https://issues.apache.org/jira/browse/SOLR-15036
> Project: Solr
>  Issue Type: Improvement
>  Security Level: Public(Default Security Level. Issues are Public) 
>  Components: streaming expressions
>Reporter: Timothy Potter
>Assignee: Timothy Potter
>Priority: Major
> Fix For: 8.8, master (9.0)
>
> Attachments: relay-approach.patch
>
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> For analytics use cases, streaming expressions make it possible to compute 
> basic aggregations (count, min, max, sum, and avg) over massive data sets. 
> Moreover, with massive data sets, it is common to use collection aliases over 
> many underlying collections, for instance time-partitioned aliases backed by 
> a set of collections, each covering a specific time range. In some cases, we 
> can end up with many collections (think 50-60) each with 100's of shards. 
> Aliases help insulate client applications from complex collection topologies 
> on the server side.
> Let's take a basic facet expression that computes some useful aggregation 
> metrics:
> {code:java}
> facet(
>   some_alias, 
>   q="*:*", 
>   fl="a_i", 
>   sort="a_i asc", 
>   buckets="a_i", 
>   bucketSorts="count(*) asc", 
>   bucketSizeLimit=1, 
>   sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
> )
> {code}
> Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr 
> which then expands the alias to a list of collections. For each collection, 
> the top-level distributed query controller gathers a candidate set of 
> replicas to query and then scatters {{distrib=false}} queries to each replica 
> in the list. For instance, if we have 60 collections with 200 shards each, 
> then this results in 12,000 shard requests from the query controller node to 
> the other nodes in the cluster. The requests are sent in an async manner (see 
> {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases 
> where we hit 18,000 replicas and these queries don’t always come back in a 
> timely manner. Put simply, this also puts a lot of load on the top-level 
> query controller node in terms of open connections and new object creation.
> Instead, we can use {{plist}} to send the JSON facet query to each collection 
> in the alias in parallel, which reduces the overhead of each top-level 
> distributed query from 12,000 to 200 in my example above. With this approach, 
> you’ll then need to sort the tuples back from each collection and do a 
> rollup, something like:
> {code:java}
> select(
>   rollup(
> sort(
>   plist(
> select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt),
> select(facet(coll2,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt)
>   ),
>   by="a_i asc"
> ),
> over="a_i",
> sum(the_sum), avg(the_avg), min(the_min), max(the_max), sum(cnt)
>   ),
>   a_i, sum(the_sum) as the_sum, avg(the_avg) as the_avg, min(the_min) as 
> the_min, max(the_max) as the_max, sum(cnt) as cnt
> )
> {code}
> One thing to point out is that you can’t just avg. the averages back from 
> each collection in the rollup. It needs to be a *weighted avg.* when rolling 
> up the avg. from each facet expression in the plist. However, we have the 
> count per collection, so this is doable but will require some changes to the 
> rollup expression to support weighted average.
> While this plist approach is doable, it’s a pain for users to have to 

[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2021-01-14 Thread Michael Gibney (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17265202#comment-17265202
 ] 

Michael Gibney commented on SOLR-15036:
---

Got it, thanks for the extra explanation! In the multi-dimensional case it's 
still ambiguous to me what's meant my "cardinality" -- is this the cardinality 
of _each_ field/dimension? or of the top-level dimension (in which case I'd 
wonder about the cardinality of the nested dimensions)? or of the number of 
leaf buckets? or the number of nodes in the hierarchical/dimensional tree?

Would you be able to take another look at my [comment above|#comment-17246612] 
and weigh in on my question about "individual high-cardinality fields" vs. 
"multi-dimensional fanout over high-cardinality fields"? I'm asking because I'm 
increasingly convinced that if you're really chasing performance for 
multi-dimensional high-cardinality cases like this, a solution that leverages 
single-dimensional "JSON Facet" at each level (via term ords), but streams the 
result, could be a _huge_ win.

To re-phrase the last paragraph ("Taking this a step further ... _huge_ 
performance gain") from the earlier comment linked to above:

I think the reason why JSON Facet falters in the multi-dimensional 
high-cardinality case is _strictly_ because the "multi-dimensional" aspect of a 
multi-dimensional facet streaming expression is delegated wholesale to 
multi-dimensional "JSON Facet". With many buckets requested to be returned, 
JSON Facet will buffer the entire hierarchical response in the response builder 
-- it's the _fanout_ that kills, not the high cardinality _per se_.

IIUC {{drill}} addresses this by streaming each node of the hierarchical 
response as it's populated. So far so good; but as implemented over export 
writer, it incurs significant performance hit(s) by dealing with BytesRefs (as 
opposed to term ords, as JSON Facet does). What I'm suggesting is that it 
should be possible to stream the hierarchical response nodes -- handling the 
"multi-dimensional" aspect of the multi-dimensional request in "streaming 
expression"-level code -- but each individual node would delegate "child" facet 
count calculation to a single-dimensional JSON Facet request.

Am I missing something? To be sure, this would warrant a new issue if anyone's 
interested in pursuing it. But it springs organically from my desire to clarify 
the meaning of "cardinality" here, and to confirm (or contradict) my conviction 
that the underlying JSON Facet implementation should not be incompatible with 
(and should in fact nicely complement) high-cardinality streaming use cases, if 
employed over a single dimension at a time.

> Use plist automatically for executing a facet expression against a collection 
> alias backed by multiple collections
> --
>
> Key: SOLR-15036
> URL: https://issues.apache.org/jira/browse/SOLR-15036
> Project: Solr
>  Issue Type: Improvement
>  Security Level: Public(Default Security Level. Issues are Public) 
>  Components: streaming expressions
>Reporter: Timothy Potter
>Assignee: Timothy Potter
>Priority: Major
> Fix For: 8.8, master (9.0)
>
> Attachments: relay-approach.patch
>
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> For analytics use cases, streaming expressions make it possible to compute 
> basic aggregations (count, min, max, sum, and avg) over massive data sets. 
> Moreover, with massive data sets, it is common to use collection aliases over 
> many underlying collections, for instance time-partitioned aliases backed by 
> a set of collections, each covering a specific time range. In some cases, we 
> can end up with many collections (think 50-60) each with 100's of shards. 
> Aliases help insulate client applications from complex collection topologies 
> on the server side.
> Let's take a basic facet expression that computes some useful aggregation 
> metrics:
> {code:java}
> facet(
>   some_alias, 
>   q="*:*", 
>   fl="a_i", 
>   sort="a_i asc", 
>   buckets="a_i", 
>   bucketSorts="count(*) asc", 
>   bucketSizeLimit=1, 
>   sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
> )
> {code}
> Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr 
> which then expands the alias to a list of collections. For each collection, 
> the top-level distributed query controller gathers a candidate set of 
> replicas to query and then scatters {{distrib=false}} queries to each replica 
> in the list. For instance, if we have 60 collections with 200 shards each, 
> then this results in 12,000 shard requests from the query controller node to 
> the other nodes in the cluster. The requests are sent in an async manner (see 
> 

[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2021-01-14 Thread Joel Bernstein (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17265151#comment-17265151
 ] 

Joel Bernstein commented on SOLR-15036:
---

The facet expression actually did not complete. Not only was the rollup able to 
complete but I was able to run 3 concurrently and they all completed. So, at 
this level of cardinality (3 plus million) facet just is not an option. But for 
lower cardinality facet is 10x faster then the rollup. 

> Use plist automatically for executing a facet expression against a collection 
> alias backed by multiple collections
> --
>
> Key: SOLR-15036
> URL: https://issues.apache.org/jira/browse/SOLR-15036
> Project: Solr
>  Issue Type: Improvement
>  Security Level: Public(Default Security Level. Issues are Public) 
>  Components: streaming expressions
>Reporter: Timothy Potter
>Assignee: Timothy Potter
>Priority: Major
> Fix For: 8.8, master (9.0)
>
> Attachments: relay-approach.patch
>
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> For analytics use cases, streaming expressions make it possible to compute 
> basic aggregations (count, min, max, sum, and avg) over massive data sets. 
> Moreover, with massive data sets, it is common to use collection aliases over 
> many underlying collections, for instance time-partitioned aliases backed by 
> a set of collections, each covering a specific time range. In some cases, we 
> can end up with many collections (think 50-60) each with 100's of shards. 
> Aliases help insulate client applications from complex collection topologies 
> on the server side.
> Let's take a basic facet expression that computes some useful aggregation 
> metrics:
> {code:java}
> facet(
>   some_alias, 
>   q="*:*", 
>   fl="a_i", 
>   sort="a_i asc", 
>   buckets="a_i", 
>   bucketSorts="count(*) asc", 
>   bucketSizeLimit=1, 
>   sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
> )
> {code}
> Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr 
> which then expands the alias to a list of collections. For each collection, 
> the top-level distributed query controller gathers a candidate set of 
> replicas to query and then scatters {{distrib=false}} queries to each replica 
> in the list. For instance, if we have 60 collections with 200 shards each, 
> then this results in 12,000 shard requests from the query controller node to 
> the other nodes in the cluster. The requests are sent in an async manner (see 
> {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases 
> where we hit 18,000 replicas and these queries don’t always come back in a 
> timely manner. Put simply, this also puts a lot of load on the top-level 
> query controller node in terms of open connections and new object creation.
> Instead, we can use {{plist}} to send the JSON facet query to each collection 
> in the alias in parallel, which reduces the overhead of each top-level 
> distributed query from 12,000 to 200 in my example above. With this approach, 
> you’ll then need to sort the tuples back from each collection and do a 
> rollup, something like:
> {code:java}
> select(
>   rollup(
> sort(
>   plist(
> select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt),
> select(facet(coll2,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt)
>   ),
>   by="a_i asc"
> ),
> over="a_i",
> sum(the_sum), avg(the_avg), min(the_min), max(the_max), sum(cnt)
>   ),
>   a_i, sum(the_sum) as the_sum, avg(the_avg) as the_avg, min(the_min) as 
> the_min, max(the_max) as the_max, sum(cnt) as cnt
> )
> {code}
> One thing to point out is that you can’t just avg. the averages back from 
> each collection in the rollup. It needs to be a *weighted avg.* when rolling 
> up the avg. from each facet expression in the plist. However, we have the 
> count per collection, so this is doable but will require some changes to the 
> rollup expression to support weighted average.
> While this plist approach is doable, it’s a pain for users to have to create 
> the rollup / sort over plist expression for collection aliases. After all, 
> aliases are supposed to hide these types of complexities from client 
> applications!
> The point of this ticket is to investigate the 

[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2021-01-14 Thread Joel Bernstein (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17265147#comment-17265147
 ] 

Joel Bernstein commented on SOLR-15036:
---

It was a multi-dimensional. Here are the two expressions:

The facet expressions:
{code}
facet(testex, buckets="stock_s,day_i,num_s,test_s", bucketSorts="index desc", 
rows="-1")
{code}

The rollup expression. I used I rollup -> search rather drill because I was 
actually testing export performance.
{code}
rollup(search(testex, q="*:*", fl="stock_s,day_i,num_s,test_s", sort="stock_s 
desc,day_i desc,num_s desc,test_s desc", qt="/export", queueSize="15"), 
over="stock_s,day_i,num_s,test_s", count(*))
{code}




> Use plist automatically for executing a facet expression against a collection 
> alias backed by multiple collections
> --
>
> Key: SOLR-15036
> URL: https://issues.apache.org/jira/browse/SOLR-15036
> Project: Solr
>  Issue Type: Improvement
>  Security Level: Public(Default Security Level. Issues are Public) 
>  Components: streaming expressions
>Reporter: Timothy Potter
>Assignee: Timothy Potter
>Priority: Major
> Fix For: 8.8, master (9.0)
>
> Attachments: relay-approach.patch
>
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> For analytics use cases, streaming expressions make it possible to compute 
> basic aggregations (count, min, max, sum, and avg) over massive data sets. 
> Moreover, with massive data sets, it is common to use collection aliases over 
> many underlying collections, for instance time-partitioned aliases backed by 
> a set of collections, each covering a specific time range. In some cases, we 
> can end up with many collections (think 50-60) each with 100's of shards. 
> Aliases help insulate client applications from complex collection topologies 
> on the server side.
> Let's take a basic facet expression that computes some useful aggregation 
> metrics:
> {code:java}
> facet(
>   some_alias, 
>   q="*:*", 
>   fl="a_i", 
>   sort="a_i asc", 
>   buckets="a_i", 
>   bucketSorts="count(*) asc", 
>   bucketSizeLimit=1, 
>   sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
> )
> {code}
> Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr 
> which then expands the alias to a list of collections. For each collection, 
> the top-level distributed query controller gathers a candidate set of 
> replicas to query and then scatters {{distrib=false}} queries to each replica 
> in the list. For instance, if we have 60 collections with 200 shards each, 
> then this results in 12,000 shard requests from the query controller node to 
> the other nodes in the cluster. The requests are sent in an async manner (see 
> {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases 
> where we hit 18,000 replicas and these queries don’t always come back in a 
> timely manner. Put simply, this also puts a lot of load on the top-level 
> query controller node in terms of open connections and new object creation.
> Instead, we can use {{plist}} to send the JSON facet query to each collection 
> in the alias in parallel, which reduces the overhead of each top-level 
> distributed query from 12,000 to 200 in my example above. With this approach, 
> you’ll then need to sort the tuples back from each collection and do a 
> rollup, something like:
> {code:java}
> select(
>   rollup(
> sort(
>   plist(
> select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt),
> select(facet(coll2,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt)
>   ),
>   by="a_i asc"
> ),
> over="a_i",
> sum(the_sum), avg(the_avg), min(the_min), max(the_max), sum(cnt)
>   ),
>   a_i, sum(the_sum) as the_sum, avg(the_avg) as the_avg, min(the_min) as 
> the_min, max(the_max) as the_max, sum(cnt) as cnt
> )
> {code}
> One thing to point out is that you can’t just avg. the averages back from 
> each collection in the rollup. It needs to be a *weighted avg.* when rolling 
> up the avg. from each facet expression in the plist. However, we have the 
> count per collection, so this is doable but will require some changes to the 
> rollup expression to support weighted average.
> While this plist approach is doable, it’s a pain for users to have to 

[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2021-01-14 Thread Michael Gibney (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17265111#comment-17265111
 ] 

Michael Gibney commented on SOLR-15036:
---

Thanks for following up on this, [~jbernste]. Is this test case 
one-dimensional, or multi-dimensional? Would you be able to share examples of 
the streaming expressions your comparing to arrive at this rough threshold?

> Use plist automatically for executing a facet expression against a collection 
> alias backed by multiple collections
> --
>
> Key: SOLR-15036
> URL: https://issues.apache.org/jira/browse/SOLR-15036
> Project: Solr
>  Issue Type: Improvement
>  Security Level: Public(Default Security Level. Issues are Public) 
>  Components: streaming expressions
>Reporter: Timothy Potter
>Assignee: Timothy Potter
>Priority: Major
> Fix For: 8.8, master (9.0)
>
> Attachments: relay-approach.patch
>
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> For analytics use cases, streaming expressions make it possible to compute 
> basic aggregations (count, min, max, sum, and avg) over massive data sets. 
> Moreover, with massive data sets, it is common to use collection aliases over 
> many underlying collections, for instance time-partitioned aliases backed by 
> a set of collections, each covering a specific time range. In some cases, we 
> can end up with many collections (think 50-60) each with 100's of shards. 
> Aliases help insulate client applications from complex collection topologies 
> on the server side.
> Let's take a basic facet expression that computes some useful aggregation 
> metrics:
> {code:java}
> facet(
>   some_alias, 
>   q="*:*", 
>   fl="a_i", 
>   sort="a_i asc", 
>   buckets="a_i", 
>   bucketSorts="count(*) asc", 
>   bucketSizeLimit=1, 
>   sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
> )
> {code}
> Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr 
> which then expands the alias to a list of collections. For each collection, 
> the top-level distributed query controller gathers a candidate set of 
> replicas to query and then scatters {{distrib=false}} queries to each replica 
> in the list. For instance, if we have 60 collections with 200 shards each, 
> then this results in 12,000 shard requests from the query controller node to 
> the other nodes in the cluster. The requests are sent in an async manner (see 
> {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases 
> where we hit 18,000 replicas and these queries don’t always come back in a 
> timely manner. Put simply, this also puts a lot of load on the top-level 
> query controller node in terms of open connections and new object creation.
> Instead, we can use {{plist}} to send the JSON facet query to each collection 
> in the alias in parallel, which reduces the overhead of each top-level 
> distributed query from 12,000 to 200 in my example above. With this approach, 
> you’ll then need to sort the tuples back from each collection and do a 
> rollup, something like:
> {code:java}
> select(
>   rollup(
> sort(
>   plist(
> select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt),
> select(facet(coll2,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt)
>   ),
>   by="a_i asc"
> ),
> over="a_i",
> sum(the_sum), avg(the_avg), min(the_min), max(the_max), sum(cnt)
>   ),
>   a_i, sum(the_sum) as the_sum, avg(the_avg) as the_avg, min(the_min) as 
> the_min, max(the_max) as the_max, sum(cnt) as cnt
> )
> {code}
> One thing to point out is that you can’t just avg. the averages back from 
> each collection in the rollup. It needs to be a *weighted avg.* when rolling 
> up the avg. from each facet expression in the plist. However, we have the 
> count per collection, so this is doable but will require some changes to the 
> rollup expression to support weighted average.
> While this plist approach is doable, it’s a pain for users to have to create 
> the rollup / sort over plist expression for collection aliases. After all, 
> aliases are supposed to hide these types of complexities from client 
> applications!
> The point of this ticket is to investigate the feasibility of auto-wrapping 
> the facet expression with a rollup / sort / 

[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2021-01-14 Thread Joel Bernstein (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17265047#comment-17265047
 ] 

Joel Bernstein commented on SOLR-15036:
---

In some of the earlier comments on this ticket there was discussion about facet 
VS drill. I mentioned that drill is a high cardinality tool. The question came 
up about what high cardinality actually means. I've been testing SOLR-14608 and 
as part of that testing I've been testing drill VS facet performance. What I've 
found is that at around 3 million unique buckets drill starts to perform 
better. Facet still worked at 3 million but eventually it will break.

> Use plist automatically for executing a facet expression against a collection 
> alias backed by multiple collections
> --
>
> Key: SOLR-15036
> URL: https://issues.apache.org/jira/browse/SOLR-15036
> Project: Solr
>  Issue Type: Improvement
>  Security Level: Public(Default Security Level. Issues are Public) 
>  Components: streaming expressions
>Reporter: Timothy Potter
>Assignee: Timothy Potter
>Priority: Major
> Fix For: 8.8, master (9.0)
>
> Attachments: relay-approach.patch
>
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> For analytics use cases, streaming expressions make it possible to compute 
> basic aggregations (count, min, max, sum, and avg) over massive data sets. 
> Moreover, with massive data sets, it is common to use collection aliases over 
> many underlying collections, for instance time-partitioned aliases backed by 
> a set of collections, each covering a specific time range. In some cases, we 
> can end up with many collections (think 50-60) each with 100's of shards. 
> Aliases help insulate client applications from complex collection topologies 
> on the server side.
> Let's take a basic facet expression that computes some useful aggregation 
> metrics:
> {code:java}
> facet(
>   some_alias, 
>   q="*:*", 
>   fl="a_i", 
>   sort="a_i asc", 
>   buckets="a_i", 
>   bucketSorts="count(*) asc", 
>   bucketSizeLimit=1, 
>   sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
> )
> {code}
> Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr 
> which then expands the alias to a list of collections. For each collection, 
> the top-level distributed query controller gathers a candidate set of 
> replicas to query and then scatters {{distrib=false}} queries to each replica 
> in the list. For instance, if we have 60 collections with 200 shards each, 
> then this results in 12,000 shard requests from the query controller node to 
> the other nodes in the cluster. The requests are sent in an async manner (see 
> {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases 
> where we hit 18,000 replicas and these queries don’t always come back in a 
> timely manner. Put simply, this also puts a lot of load on the top-level 
> query controller node in terms of open connections and new object creation.
> Instead, we can use {{plist}} to send the JSON facet query to each collection 
> in the alias in parallel, which reduces the overhead of each top-level 
> distributed query from 12,000 to 200 in my example above. With this approach, 
> you’ll then need to sort the tuples back from each collection and do a 
> rollup, something like:
> {code:java}
> select(
>   rollup(
> sort(
>   plist(
> select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt),
> select(facet(coll2,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt)
>   ),
>   by="a_i asc"
> ),
> over="a_i",
> sum(the_sum), avg(the_avg), min(the_min), max(the_max), sum(cnt)
>   ),
>   a_i, sum(the_sum) as the_sum, avg(the_avg) as the_avg, min(the_min) as 
> the_min, max(the_max) as the_max, sum(cnt) as cnt
> )
> {code}
> One thing to point out is that you can’t just avg. the averages back from 
> each collection in the rollup. It needs to be a *weighted avg.* when rolling 
> up the avg. from each facet expression in the plist. However, we have the 
> count per collection, so this is doable but will require some changes to the 
> rollup expression to support weighted average.
> While this plist approach is doable, it’s a pain for users to have to create 
> the rollup / sort over plist expression for 

[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2021-01-11 Thread ASF subversion and git services (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17262848#comment-17262848
 ] 

ASF subversion and git services commented on SOLR-15036:


Commit 7933c9004e79acb9a05c9ad92cd316ea7d868454 in lucene-solr's branch 
refs/heads/branch_8x from Timothy Potter
[ https://gitbox.apache.org/repos/asf?p=lucene-solr.git;h=7933c90 ]

SOLR-15036: Automatically wrap a facet expression with a select / rollup / sort 
/ plist when using a collection alias with multiple collections and 
count|sum|min|max|avg metrics. (backport of #2132) (#2195)



> Use plist automatically for executing a facet expression against a collection 
> alias backed by multiple collections
> --
>
> Key: SOLR-15036
> URL: https://issues.apache.org/jira/browse/SOLR-15036
> Project: Solr
>  Issue Type: Improvement
>  Security Level: Public(Default Security Level. Issues are Public) 
>  Components: streaming expressions
>Reporter: Timothy Potter
>Assignee: Timothy Potter
>Priority: Major
> Attachments: relay-approach.patch
>
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> For analytics use cases, streaming expressions make it possible to compute 
> basic aggregations (count, min, max, sum, and avg) over massive data sets. 
> Moreover, with massive data sets, it is common to use collection aliases over 
> many underlying collections, for instance time-partitioned aliases backed by 
> a set of collections, each covering a specific time range. In some cases, we 
> can end up with many collections (think 50-60) each with 100's of shards. 
> Aliases help insulate client applications from complex collection topologies 
> on the server side.
> Let's take a basic facet expression that computes some useful aggregation 
> metrics:
> {code:java}
> facet(
>   some_alias, 
>   q="*:*", 
>   fl="a_i", 
>   sort="a_i asc", 
>   buckets="a_i", 
>   bucketSorts="count(*) asc", 
>   bucketSizeLimit=1, 
>   sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
> )
> {code}
> Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr 
> which then expands the alias to a list of collections. For each collection, 
> the top-level distributed query controller gathers a candidate set of 
> replicas to query and then scatters {{distrib=false}} queries to each replica 
> in the list. For instance, if we have 60 collections with 200 shards each, 
> then this results in 12,000 shard requests from the query controller node to 
> the other nodes in the cluster. The requests are sent in an async manner (see 
> {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases 
> where we hit 18,000 replicas and these queries don’t always come back in a 
> timely manner. Put simply, this also puts a lot of load on the top-level 
> query controller node in terms of open connections and new object creation.
> Instead, we can use {{plist}} to send the JSON facet query to each collection 
> in the alias in parallel, which reduces the overhead of each top-level 
> distributed query from 12,000 to 200 in my example above. With this approach, 
> you’ll then need to sort the tuples back from each collection and do a 
> rollup, something like:
> {code:java}
> select(
>   rollup(
> sort(
>   plist(
> select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt),
> select(facet(coll2,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt)
>   ),
>   by="a_i asc"
> ),
> over="a_i",
> sum(the_sum), avg(the_avg), min(the_min), max(the_max), sum(cnt)
>   ),
>   a_i, sum(the_sum) as the_sum, avg(the_avg) as the_avg, min(the_min) as 
> the_min, max(the_max) as the_max, sum(cnt) as cnt
> )
> {code}
> One thing to point out is that you can’t just avg. the averages back from 
> each collection in the rollup. It needs to be a *weighted avg.* when rolling 
> up the avg. from each facet expression in the plist. However, we have the 
> count per collection, so this is doable but will require some changes to the 
> rollup expression to support weighted average.
> While this plist approach is doable, it’s a pain for users to have to create 
> the rollup / sort over plist expression for collection aliases. After all, 
> aliases are supposed to hide these types of 

[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2021-01-11 Thread ASF subversion and git services (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17262823#comment-17262823
 ] 

ASF subversion and git services commented on SOLR-15036:


Commit 6711eb7571727552aad3ace53c52c9a8fe07dc40 in lucene-solr's branch 
refs/heads/master from Timothy Potter
[ https://gitbox.apache.org/repos/asf?p=lucene-solr.git;h=6711eb7 ]

SOLR-15036: auto- select / rollup / sort / plist over facet expression when 
using a collection alias with multiple collections (#2132)



> Use plist automatically for executing a facet expression against a collection 
> alias backed by multiple collections
> --
>
> Key: SOLR-15036
> URL: https://issues.apache.org/jira/browse/SOLR-15036
> Project: Solr
>  Issue Type: Improvement
>  Security Level: Public(Default Security Level. Issues are Public) 
>  Components: streaming expressions
>Reporter: Timothy Potter
>Assignee: Timothy Potter
>Priority: Major
> Attachments: relay-approach.patch
>
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> For analytics use cases, streaming expressions make it possible to compute 
> basic aggregations (count, min, max, sum, and avg) over massive data sets. 
> Moreover, with massive data sets, it is common to use collection aliases over 
> many underlying collections, for instance time-partitioned aliases backed by 
> a set of collections, each covering a specific time range. In some cases, we 
> can end up with many collections (think 50-60) each with 100's of shards. 
> Aliases help insulate client applications from complex collection topologies 
> on the server side.
> Let's take a basic facet expression that computes some useful aggregation 
> metrics:
> {code:java}
> facet(
>   some_alias, 
>   q="*:*", 
>   fl="a_i", 
>   sort="a_i asc", 
>   buckets="a_i", 
>   bucketSorts="count(*) asc", 
>   bucketSizeLimit=1, 
>   sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
> )
> {code}
> Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr 
> which then expands the alias to a list of collections. For each collection, 
> the top-level distributed query controller gathers a candidate set of 
> replicas to query and then scatters {{distrib=false}} queries to each replica 
> in the list. For instance, if we have 60 collections with 200 shards each, 
> then this results in 12,000 shard requests from the query controller node to 
> the other nodes in the cluster. The requests are sent in an async manner (see 
> {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases 
> where we hit 18,000 replicas and these queries don’t always come back in a 
> timely manner. Put simply, this also puts a lot of load on the top-level 
> query controller node in terms of open connections and new object creation.
> Instead, we can use {{plist}} to send the JSON facet query to each collection 
> in the alias in parallel, which reduces the overhead of each top-level 
> distributed query from 12,000 to 200 in my example above. With this approach, 
> you’ll then need to sort the tuples back from each collection and do a 
> rollup, something like:
> {code:java}
> select(
>   rollup(
> sort(
>   plist(
> select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt),
> select(facet(coll2,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt)
>   ),
>   by="a_i asc"
> ),
> over="a_i",
> sum(the_sum), avg(the_avg), min(the_min), max(the_max), sum(cnt)
>   ),
>   a_i, sum(the_sum) as the_sum, avg(the_avg) as the_avg, min(the_min) as 
> the_min, max(the_max) as the_max, sum(cnt) as cnt
> )
> {code}
> One thing to point out is that you can’t just avg. the averages back from 
> each collection in the rollup. It needs to be a *weighted avg.* when rolling 
> up the avg. from each facet expression in the plist. However, we have the 
> count per collection, so this is doable but will require some changes to the 
> rollup expression to support weighted average.
> While this plist approach is doable, it’s a pain for users to have to create 
> the rollup / sort over plist expression for collection aliases. After all, 
> aliases are supposed to hide these types of complexities from client 
> applications!
> The point of this ticket is to 

[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2021-01-11 Thread Joel Bernstein (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17262822#comment-17262822
 ] 

Joel Bernstein commented on SOLR-15036:
---

It is indeed missing from the docs. I will add it.

> Use plist automatically for executing a facet expression against a collection 
> alias backed by multiple collections
> --
>
> Key: SOLR-15036
> URL: https://issues.apache.org/jira/browse/SOLR-15036
> Project: Solr
>  Issue Type: Improvement
>  Security Level: Public(Default Security Level. Issues are Public) 
>  Components: streaming expressions
>Reporter: Timothy Potter
>Assignee: Timothy Potter
>Priority: Major
> Attachments: relay-approach.patch
>
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> For analytics use cases, streaming expressions make it possible to compute 
> basic aggregations (count, min, max, sum, and avg) over massive data sets. 
> Moreover, with massive data sets, it is common to use collection aliases over 
> many underlying collections, for instance time-partitioned aliases backed by 
> a set of collections, each covering a specific time range. In some cases, we 
> can end up with many collections (think 50-60) each with 100's of shards. 
> Aliases help insulate client applications from complex collection topologies 
> on the server side.
> Let's take a basic facet expression that computes some useful aggregation 
> metrics:
> {code:java}
> facet(
>   some_alias, 
>   q="*:*", 
>   fl="a_i", 
>   sort="a_i asc", 
>   buckets="a_i", 
>   bucketSorts="count(*) asc", 
>   bucketSizeLimit=1, 
>   sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
> )
> {code}
> Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr 
> which then expands the alias to a list of collections. For each collection, 
> the top-level distributed query controller gathers a candidate set of 
> replicas to query and then scatters {{distrib=false}} queries to each replica 
> in the list. For instance, if we have 60 collections with 200 shards each, 
> then this results in 12,000 shard requests from the query controller node to 
> the other nodes in the cluster. The requests are sent in an async manner (see 
> {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases 
> where we hit 18,000 replicas and these queries don’t always come back in a 
> timely manner. Put simply, this also puts a lot of load on the top-level 
> query controller node in terms of open connections and new object creation.
> Instead, we can use {{plist}} to send the JSON facet query to each collection 
> in the alias in parallel, which reduces the overhead of each top-level 
> distributed query from 12,000 to 200 in my example above. With this approach, 
> you’ll then need to sort the tuples back from each collection and do a 
> rollup, something like:
> {code:java}
> select(
>   rollup(
> sort(
>   plist(
> select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt),
> select(facet(coll2,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt)
>   ),
>   by="a_i asc"
> ),
> over="a_i",
> sum(the_sum), avg(the_avg), min(the_min), max(the_max), sum(cnt)
>   ),
>   a_i, sum(the_sum) as the_sum, avg(the_avg) as the_avg, min(the_min) as 
> the_min, max(the_max) as the_max, sum(cnt) as cnt
> )
> {code}
> One thing to point out is that you can’t just avg. the averages back from 
> each collection in the rollup. It needs to be a *weighted avg.* when rolling 
> up the avg. from each facet expression in the plist. However, we have the 
> count per collection, so this is doable but will require some changes to the 
> rollup expression to support weighted average.
> While this plist approach is doable, it’s a pain for users to have to create 
> the rollup / sort over plist expression for collection aliases. After all, 
> aliases are supposed to hide these types of complexities from client 
> applications!
> The point of this ticket is to investigate the feasibility of auto-wrapping 
> the facet expression with a rollup / sort / plist when the collection 
> argument is an alias with multiple collections; other stream sources will be 
> considered after facet is proven out.
> Lastly, I also considered an alternative approach of doing a 

[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2021-01-11 Thread Timothy Potter (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17262786#comment-17262786
 ] 

Timothy Potter commented on SOLR-15036:
---

Thanks. Updated the impl. to use {{HashRollupStream}} (doesn't seem to be 
documented btw) ... definitely much cleaner ;-)

> Use plist automatically for executing a facet expression against a collection 
> alias backed by multiple collections
> --
>
> Key: SOLR-15036
> URL: https://issues.apache.org/jira/browse/SOLR-15036
> Project: Solr
>  Issue Type: Improvement
>  Security Level: Public(Default Security Level. Issues are Public) 
>  Components: streaming expressions
>Reporter: Timothy Potter
>Assignee: Timothy Potter
>Priority: Major
> Attachments: relay-approach.patch
>
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> For analytics use cases, streaming expressions make it possible to compute 
> basic aggregations (count, min, max, sum, and avg) over massive data sets. 
> Moreover, with massive data sets, it is common to use collection aliases over 
> many underlying collections, for instance time-partitioned aliases backed by 
> a set of collections, each covering a specific time range. In some cases, we 
> can end up with many collections (think 50-60) each with 100's of shards. 
> Aliases help insulate client applications from complex collection topologies 
> on the server side.
> Let's take a basic facet expression that computes some useful aggregation 
> metrics:
> {code:java}
> facet(
>   some_alias, 
>   q="*:*", 
>   fl="a_i", 
>   sort="a_i asc", 
>   buckets="a_i", 
>   bucketSorts="count(*) asc", 
>   bucketSizeLimit=1, 
>   sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
> )
> {code}
> Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr 
> which then expands the alias to a list of collections. For each collection, 
> the top-level distributed query controller gathers a candidate set of 
> replicas to query and then scatters {{distrib=false}} queries to each replica 
> in the list. For instance, if we have 60 collections with 200 shards each, 
> then this results in 12,000 shard requests from the query controller node to 
> the other nodes in the cluster. The requests are sent in an async manner (see 
> {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases 
> where we hit 18,000 replicas and these queries don’t always come back in a 
> timely manner. Put simply, this also puts a lot of load on the top-level 
> query controller node in terms of open connections and new object creation.
> Instead, we can use {{plist}} to send the JSON facet query to each collection 
> in the alias in parallel, which reduces the overhead of each top-level 
> distributed query from 12,000 to 200 in my example above. With this approach, 
> you’ll then need to sort the tuples back from each collection and do a 
> rollup, something like:
> {code:java}
> select(
>   rollup(
> sort(
>   plist(
> select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt),
> select(facet(coll2,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt)
>   ),
>   by="a_i asc"
> ),
> over="a_i",
> sum(the_sum), avg(the_avg), min(the_min), max(the_max), sum(cnt)
>   ),
>   a_i, sum(the_sum) as the_sum, avg(the_avg) as the_avg, min(the_min) as 
> the_min, max(the_max) as the_max, sum(cnt) as cnt
> )
> {code}
> One thing to point out is that you can’t just avg. the averages back from 
> each collection in the rollup. It needs to be a *weighted avg.* when rolling 
> up the avg. from each facet expression in the plist. However, we have the 
> count per collection, so this is doable but will require some changes to the 
> rollup expression to support weighted average.
> While this plist approach is doable, it’s a pain for users to have to create 
> the rollup / sort over plist expression for collection aliases. After all, 
> aliases are supposed to hide these types of complexities from client 
> applications!
> The point of this ticket is to investigate the feasibility of auto-wrapping 
> the facet expression with a rollup / sort / plist when the collection 
> argument is an alias with multiple collections; other stream sources will be 
> considered after facet is 

[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2021-01-11 Thread Joel Bernstein (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17262773#comment-17262773
 ] 

Joel Bernstein commented on SOLR-15036:
---

One thing I noticed in the implementation that could be improved is to switch 
to using the HashRollupStream which avoids the need to sort and should be 
significantly faster in this scenario. 

> Use plist automatically for executing a facet expression against a collection 
> alias backed by multiple collections
> --
>
> Key: SOLR-15036
> URL: https://issues.apache.org/jira/browse/SOLR-15036
> Project: Solr
>  Issue Type: Improvement
>  Security Level: Public(Default Security Level. Issues are Public) 
>  Components: streaming expressions
>Reporter: Timothy Potter
>Assignee: Timothy Potter
>Priority: Major
> Attachments: relay-approach.patch
>
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> For analytics use cases, streaming expressions make it possible to compute 
> basic aggregations (count, min, max, sum, and avg) over massive data sets. 
> Moreover, with massive data sets, it is common to use collection aliases over 
> many underlying collections, for instance time-partitioned aliases backed by 
> a set of collections, each covering a specific time range. In some cases, we 
> can end up with many collections (think 50-60) each with 100's of shards. 
> Aliases help insulate client applications from complex collection topologies 
> on the server side.
> Let's take a basic facet expression that computes some useful aggregation 
> metrics:
> {code:java}
> facet(
>   some_alias, 
>   q="*:*", 
>   fl="a_i", 
>   sort="a_i asc", 
>   buckets="a_i", 
>   bucketSorts="count(*) asc", 
>   bucketSizeLimit=1, 
>   sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
> )
> {code}
> Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr 
> which then expands the alias to a list of collections. For each collection, 
> the top-level distributed query controller gathers a candidate set of 
> replicas to query and then scatters {{distrib=false}} queries to each replica 
> in the list. For instance, if we have 60 collections with 200 shards each, 
> then this results in 12,000 shard requests from the query controller node to 
> the other nodes in the cluster. The requests are sent in an async manner (see 
> {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases 
> where we hit 18,000 replicas and these queries don’t always come back in a 
> timely manner. Put simply, this also puts a lot of load on the top-level 
> query controller node in terms of open connections and new object creation.
> Instead, we can use {{plist}} to send the JSON facet query to each collection 
> in the alias in parallel, which reduces the overhead of each top-level 
> distributed query from 12,000 to 200 in my example above. With this approach, 
> you’ll then need to sort the tuples back from each collection and do a 
> rollup, something like:
> {code:java}
> select(
>   rollup(
> sort(
>   plist(
> select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt),
> select(facet(coll2,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt)
>   ),
>   by="a_i asc"
> ),
> over="a_i",
> sum(the_sum), avg(the_avg), min(the_min), max(the_max), sum(cnt)
>   ),
>   a_i, sum(the_sum) as the_sum, avg(the_avg) as the_avg, min(the_min) as 
> the_min, max(the_max) as the_max, sum(cnt) as cnt
> )
> {code}
> One thing to point out is that you can’t just avg. the averages back from 
> each collection in the rollup. It needs to be a *weighted avg.* when rolling 
> up the avg. from each facet expression in the plist. However, we have the 
> count per collection, so this is doable but will require some changes to the 
> rollup expression to support weighted average.
> While this plist approach is doable, it’s a pain for users to have to create 
> the rollup / sort over plist expression for collection aliases. After all, 
> aliases are supposed to hide these types of complexities from client 
> applications!
> The point of this ticket is to investigate the feasibility of auto-wrapping 
> the facet expression with a rollup / sort / plist when the collection 
> argument is an alias with multiple 

[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2021-01-08 Thread Joel Bernstein (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17261727#comment-17261727
 ] 

Joel Bernstein commented on SOLR-15036:
---

A little more the SolrStream. The SolrStream is meant to be the only external 
Streaming Expression at this point. You use it to send a string expression to 
the export handler and iterate the results. All other Streaming Expressions are 
meant to be executed by the /stream handler.

One of the reasons for this is that SolrStream is the only stream source that 
uses an HttpSolrClient rather then a CloudSolrClient. So SolrStream is the only 
truly stateless Streaming Expression as it holds no Zk state. It is also 
loosely coupled with Solr you don't have to worry about zk disconnects etc... 
And you don't have expose zk to the client.

> Use plist automatically for executing a facet expression against a collection 
> alias backed by multiple collections
> --
>
> Key: SOLR-15036
> URL: https://issues.apache.org/jira/browse/SOLR-15036
> Project: Solr
>  Issue Type: Improvement
>  Security Level: Public(Default Security Level. Issues are Public) 
>  Components: streaming expressions
>Reporter: Timothy Potter
>Assignee: Timothy Potter
>Priority: Major
> Attachments: relay-approach.patch
>
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> For analytics use cases, streaming expressions make it possible to compute 
> basic aggregations (count, min, max, sum, and avg) over massive data sets. 
> Moreover, with massive data sets, it is common to use collection aliases over 
> many underlying collections, for instance time-partitioned aliases backed by 
> a set of collections, each covering a specific time range. In some cases, we 
> can end up with many collections (think 50-60) each with 100's of shards. 
> Aliases help insulate client applications from complex collection topologies 
> on the server side.
> Let's take a basic facet expression that computes some useful aggregation 
> metrics:
> {code:java}
> facet(
>   some_alias, 
>   q="*:*", 
>   fl="a_i", 
>   sort="a_i asc", 
>   buckets="a_i", 
>   bucketSorts="count(*) asc", 
>   bucketSizeLimit=1, 
>   sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
> )
> {code}
> Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr 
> which then expands the alias to a list of collections. For each collection, 
> the top-level distributed query controller gathers a candidate set of 
> replicas to query and then scatters {{distrib=false}} queries to each replica 
> in the list. For instance, if we have 60 collections with 200 shards each, 
> then this results in 12,000 shard requests from the query controller node to 
> the other nodes in the cluster. The requests are sent in an async manner (see 
> {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases 
> where we hit 18,000 replicas and these queries don’t always come back in a 
> timely manner. Put simply, this also puts a lot of load on the top-level 
> query controller node in terms of open connections and new object creation.
> Instead, we can use {{plist}} to send the JSON facet query to each collection 
> in the alias in parallel, which reduces the overhead of each top-level 
> distributed query from 12,000 to 200 in my example above. With this approach, 
> you’ll then need to sort the tuples back from each collection and do a 
> rollup, something like:
> {code:java}
> select(
>   rollup(
> sort(
>   plist(
> select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt),
> select(facet(coll2,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt)
>   ),
>   by="a_i asc"
> ),
> over="a_i",
> sum(the_sum), avg(the_avg), min(the_min), max(the_max), sum(cnt)
>   ),
>   a_i, sum(the_sum) as the_sum, avg(the_avg) as the_avg, min(the_min) as 
> the_min, max(the_max) as the_max, sum(cnt) as cnt
> )
> {code}
> One thing to point out is that you can’t just avg. the averages back from 
> each collection in the rollup. It needs to be a *weighted avg.* when rolling 
> up the avg. from each facet expression in the plist. However, we have the 
> count per collection, so this is doable but will require some changes to the 
> rollup expression to support 

[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2021-01-08 Thread Joel Bernstein (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17261708#comment-17261708
 ] 

Joel Bernstein commented on SOLR-15036:
---

So, basic auth happens automatically under the covers. This is due to how 
CloudSolrClient behaves. Streaming Expressions got basic auth for free. I was 
skeptical about this at first but there are test cases now that support this 
and after code reviewing and lot's of manual testing I feel confident that this 
is correct. 

SolrStream has explicit support for basic auth params because it's used as an 
external Solrj client for sending streaming expressions to the stream handler. 
So it needs to initiate the basic auth request. But once the params make it to 
the stream handler they are propagated automatically.

> Use plist automatically for executing a facet expression against a collection 
> alias backed by multiple collections
> --
>
> Key: SOLR-15036
> URL: https://issues.apache.org/jira/browse/SOLR-15036
> Project: Solr
>  Issue Type: Improvement
>  Security Level: Public(Default Security Level. Issues are Public) 
>  Components: streaming expressions
>Reporter: Timothy Potter
>Assignee: Timothy Potter
>Priority: Major
> Attachments: relay-approach.patch
>
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> For analytics use cases, streaming expressions make it possible to compute 
> basic aggregations (count, min, max, sum, and avg) over massive data sets. 
> Moreover, with massive data sets, it is common to use collection aliases over 
> many underlying collections, for instance time-partitioned aliases backed by 
> a set of collections, each covering a specific time range. In some cases, we 
> can end up with many collections (think 50-60) each with 100's of shards. 
> Aliases help insulate client applications from complex collection topologies 
> on the server side.
> Let's take a basic facet expression that computes some useful aggregation 
> metrics:
> {code:java}
> facet(
>   some_alias, 
>   q="*:*", 
>   fl="a_i", 
>   sort="a_i asc", 
>   buckets="a_i", 
>   bucketSorts="count(*) asc", 
>   bucketSizeLimit=1, 
>   sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
> )
> {code}
> Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr 
> which then expands the alias to a list of collections. For each collection, 
> the top-level distributed query controller gathers a candidate set of 
> replicas to query and then scatters {{distrib=false}} queries to each replica 
> in the list. For instance, if we have 60 collections with 200 shards each, 
> then this results in 12,000 shard requests from the query controller node to 
> the other nodes in the cluster. The requests are sent in an async manner (see 
> {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases 
> where we hit 18,000 replicas and these queries don’t always come back in a 
> timely manner. Put simply, this also puts a lot of load on the top-level 
> query controller node in terms of open connections and new object creation.
> Instead, we can use {{plist}} to send the JSON facet query to each collection 
> in the alias in parallel, which reduces the overhead of each top-level 
> distributed query from 12,000 to 200 in my example above. With this approach, 
> you’ll then need to sort the tuples back from each collection and do a 
> rollup, something like:
> {code:java}
> select(
>   rollup(
> sort(
>   plist(
> select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt),
> select(facet(coll2,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt)
>   ),
>   by="a_i asc"
> ),
> over="a_i",
> sum(the_sum), avg(the_avg), min(the_min), max(the_max), sum(cnt)
>   ),
>   a_i, sum(the_sum) as the_sum, avg(the_avg) as the_avg, min(the_min) as 
> the_min, max(the_max) as the_max, sum(cnt) as cnt
> )
> {code}
> One thing to point out is that you can’t just avg. the averages back from 
> each collection in the rollup. It needs to be a *weighted avg.* when rolling 
> up the avg. from each facet expression in the plist. However, we have the 
> count per collection, so this is doable but will require some changes to the 
> rollup expression to support weighted average.
> While 

[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2021-01-08 Thread Timothy Potter (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17261575#comment-17261575
 ] 

Timothy Potter commented on SOLR-15036:
---

Ok thank you for the guidance. Tiered makes sense now (y). So I'll clean that 
up in the PR and add the explicit opt-in in the backport for 8.x.

Btw ~ while in the {{FacetStream}} code, I noticed it doesn't support basic 
auth like {{SolrStream}} does, i.e. there's no way to pass basic auth 
credentials to the {{QueryRequest}} constructed in the open method that I can 
see ... is this by design or just oversight?

> Use plist automatically for executing a facet expression against a collection 
> alias backed by multiple collections
> --
>
> Key: SOLR-15036
> URL: https://issues.apache.org/jira/browse/SOLR-15036
> Project: Solr
>  Issue Type: Improvement
>  Security Level: Public(Default Security Level. Issues are Public) 
>  Components: streaming expressions
>Reporter: Timothy Potter
>Assignee: Timothy Potter
>Priority: Major
> Attachments: relay-approach.patch
>
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> For analytics use cases, streaming expressions make it possible to compute 
> basic aggregations (count, min, max, sum, and avg) over massive data sets. 
> Moreover, with massive data sets, it is common to use collection aliases over 
> many underlying collections, for instance time-partitioned aliases backed by 
> a set of collections, each covering a specific time range. In some cases, we 
> can end up with many collections (think 50-60) each with 100's of shards. 
> Aliases help insulate client applications from complex collection topologies 
> on the server side.
> Let's take a basic facet expression that computes some useful aggregation 
> metrics:
> {code:java}
> facet(
>   some_alias, 
>   q="*:*", 
>   fl="a_i", 
>   sort="a_i asc", 
>   buckets="a_i", 
>   bucketSorts="count(*) asc", 
>   bucketSizeLimit=1, 
>   sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
> )
> {code}
> Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr 
> which then expands the alias to a list of collections. For each collection, 
> the top-level distributed query controller gathers a candidate set of 
> replicas to query and then scatters {{distrib=false}} queries to each replica 
> in the list. For instance, if we have 60 collections with 200 shards each, 
> then this results in 12,000 shard requests from the query controller node to 
> the other nodes in the cluster. The requests are sent in an async manner (see 
> {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases 
> where we hit 18,000 replicas and these queries don’t always come back in a 
> timely manner. Put simply, this also puts a lot of load on the top-level 
> query controller node in terms of open connections and new object creation.
> Instead, we can use {{plist}} to send the JSON facet query to each collection 
> in the alias in parallel, which reduces the overhead of each top-level 
> distributed query from 12,000 to 200 in my example above. With this approach, 
> you’ll then need to sort the tuples back from each collection and do a 
> rollup, something like:
> {code:java}
> select(
>   rollup(
> sort(
>   plist(
> select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt),
> select(facet(coll2,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt)
>   ),
>   by="a_i asc"
> ),
> over="a_i",
> sum(the_sum), avg(the_avg), min(the_min), max(the_max), sum(cnt)
>   ),
>   a_i, sum(the_sum) as the_sum, avg(the_avg) as the_avg, min(the_min) as 
> the_min, max(the_max) as the_max, sum(cnt) as cnt
> )
> {code}
> One thing to point out is that you can’t just avg. the averages back from 
> each collection in the rollup. It needs to be a *weighted avg.* when rolling 
> up the avg. from each facet expression in the plist. However, we have the 
> count per collection, so this is doable but will require some changes to the 
> rollup expression to support weighted average.
> While this plist approach is doable, it’s a pain for users to have to create 
> the rollup / sort over plist expression for collection aliases. After all, 
> aliases are supposed to hide these types of 

[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2021-01-08 Thread Joel Bernstein (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17261568#comment-17261568
 ] 

Joel Bernstein commented on SOLR-15036:
---

Ok, let's go with an explicit opt-in for 8.8 if you want to get it in quickly. 
Then let's make this the default behavior in 9.0. 

The reason I like "tiered" is that the plist sets up a middle tier of 
aggregator nodes, one per aliased collection. This happens because of how the 
facet expression works, which is to setup a CloudSolrClient and push the facet 
call to the collection. Calling it "parallel" makes it sound like it's just 
threaded on the same node, which would not scale as nearly as well. But, since 
the parameter is going away anyway I'm not sure it matters that much.  

About adding more intelligence to the functions... One approach to do this 
would be to build a top-level optimizer that rewrites expressions. That way we 
could separate the optimization logic from the functions.


> Use plist automatically for executing a facet expression against a collection 
> alias backed by multiple collections
> --
>
> Key: SOLR-15036
> URL: https://issues.apache.org/jira/browse/SOLR-15036
> Project: Solr
>  Issue Type: Improvement
>  Security Level: Public(Default Security Level. Issues are Public) 
>  Components: streaming expressions
>Reporter: Timothy Potter
>Assignee: Timothy Potter
>Priority: Major
> Attachments: relay-approach.patch
>
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> For analytics use cases, streaming expressions make it possible to compute 
> basic aggregations (count, min, max, sum, and avg) over massive data sets. 
> Moreover, with massive data sets, it is common to use collection aliases over 
> many underlying collections, for instance time-partitioned aliases backed by 
> a set of collections, each covering a specific time range. In some cases, we 
> can end up with many collections (think 50-60) each with 100's of shards. 
> Aliases help insulate client applications from complex collection topologies 
> on the server side.
> Let's take a basic facet expression that computes some useful aggregation 
> metrics:
> {code:java}
> facet(
>   some_alias, 
>   q="*:*", 
>   fl="a_i", 
>   sort="a_i asc", 
>   buckets="a_i", 
>   bucketSorts="count(*) asc", 
>   bucketSizeLimit=1, 
>   sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
> )
> {code}
> Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr 
> which then expands the alias to a list of collections. For each collection, 
> the top-level distributed query controller gathers a candidate set of 
> replicas to query and then scatters {{distrib=false}} queries to each replica 
> in the list. For instance, if we have 60 collections with 200 shards each, 
> then this results in 12,000 shard requests from the query controller node to 
> the other nodes in the cluster. The requests are sent in an async manner (see 
> {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases 
> where we hit 18,000 replicas and these queries don’t always come back in a 
> timely manner. Put simply, this also puts a lot of load on the top-level 
> query controller node in terms of open connections and new object creation.
> Instead, we can use {{plist}} to send the JSON facet query to each collection 
> in the alias in parallel, which reduces the overhead of each top-level 
> distributed query from 12,000 to 200 in my example above. With this approach, 
> you’ll then need to sort the tuples back from each collection and do a 
> rollup, something like:
> {code:java}
> select(
>   rollup(
> sort(
>   plist(
> select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt),
> select(facet(coll2,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt)
>   ),
>   by="a_i asc"
> ),
> over="a_i",
> sum(the_sum), avg(the_avg), min(the_min), max(the_max), sum(cnt)
>   ),
>   a_i, sum(the_sum) as the_sum, avg(the_avg) as the_avg, min(the_min) as 
> the_min, max(the_max) as the_max, sum(cnt) as cnt
> )
> {code}
> One thing to point out is that you can’t just avg. the averages back from 
> each collection in the rollup. It needs to be a *weighted avg.* when rolling 
> up the avg. from each facet 

[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2021-01-08 Thread Timothy Potter (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17261555#comment-17261555
 ] 

Timothy Potter commented on SOLR-15036:
---

Also, regarding composability ... I think that approach has served us well 
historically as the streaming expression module matured. However, I believe 
going forward that we can start to build more intelligence into the expressions 
themselves vs. putting the onus on users to figure out the optimal way to 
compose an expression, especially for common analytics use cases. Either that 
or we should start to re-invest in the Solr SQL to streaming expression 
translator to apply more intelligence / optimizations automatically.

> Use plist automatically for executing a facet expression against a collection 
> alias backed by multiple collections
> --
>
> Key: SOLR-15036
> URL: https://issues.apache.org/jira/browse/SOLR-15036
> Project: Solr
>  Issue Type: Improvement
>  Security Level: Public(Default Security Level. Issues are Public) 
>  Components: streaming expressions
>Reporter: Timothy Potter
>Assignee: Timothy Potter
>Priority: Major
> Attachments: relay-approach.patch
>
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> For analytics use cases, streaming expressions make it possible to compute 
> basic aggregations (count, min, max, sum, and avg) over massive data sets. 
> Moreover, with massive data sets, it is common to use collection aliases over 
> many underlying collections, for instance time-partitioned aliases backed by 
> a set of collections, each covering a specific time range. In some cases, we 
> can end up with many collections (think 50-60) each with 100's of shards. 
> Aliases help insulate client applications from complex collection topologies 
> on the server side.
> Let's take a basic facet expression that computes some useful aggregation 
> metrics:
> {code:java}
> facet(
>   some_alias, 
>   q="*:*", 
>   fl="a_i", 
>   sort="a_i asc", 
>   buckets="a_i", 
>   bucketSorts="count(*) asc", 
>   bucketSizeLimit=1, 
>   sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
> )
> {code}
> Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr 
> which then expands the alias to a list of collections. For each collection, 
> the top-level distributed query controller gathers a candidate set of 
> replicas to query and then scatters {{distrib=false}} queries to each replica 
> in the list. For instance, if we have 60 collections with 200 shards each, 
> then this results in 12,000 shard requests from the query controller node to 
> the other nodes in the cluster. The requests are sent in an async manner (see 
> {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases 
> where we hit 18,000 replicas and these queries don’t always come back in a 
> timely manner. Put simply, this also puts a lot of load on the top-level 
> query controller node in terms of open connections and new object creation.
> Instead, we can use {{plist}} to send the JSON facet query to each collection 
> in the alias in parallel, which reduces the overhead of each top-level 
> distributed query from 12,000 to 200 in my example above. With this approach, 
> you’ll then need to sort the tuples back from each collection and do a 
> rollup, something like:
> {code:java}
> select(
>   rollup(
> sort(
>   plist(
> select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt),
> select(facet(coll2,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt)
>   ),
>   by="a_i asc"
> ),
> over="a_i",
> sum(the_sum), avg(the_avg), min(the_min), max(the_max), sum(cnt)
>   ),
>   a_i, sum(the_sum) as the_sum, avg(the_avg) as the_avg, min(the_min) as 
> the_min, max(the_max) as the_max, sum(cnt) as cnt
> )
> {code}
> One thing to point out is that you can’t just avg. the averages back from 
> each collection in the rollup. It needs to be a *weighted avg.* when rolling 
> up the avg. from each facet expression in the plist. However, we have the 
> count per collection, so this is doable but will require some changes to the 
> rollup expression to support weighted average.
> While this plist approach is doable, it’s a pain for users to have to create 
> the rollup / sort 

[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2021-01-08 Thread Timothy Potter (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17261550#comment-17261550
 ] 

Timothy Potter commented on SOLR-15036:
---

Thanks for looking [~jbernste]

I don't see any real risk to existing applications with this functionality (I 
know ~ famous last words right ;-). The auto-wrapping only happens if the facet 
expression only requests metrics that are safe to rollup, namely count, sum, 
min, max, or avg. If while attempting to auto-wrap, we encounter a metric that 
isn't safe to rollup, the impl. falls back to non-parallel.

In terms of being simpler, I'd counter that the current impl. is not simple for 
users with collection alias and that the additions here are not very 
complicated. The diff of {{FacetStream}} looks big because of code reformatting 
to match our standard as there were many improperly formatted sections in this 
file. I'm happy to undo that and just add my changes if that makes a review 
easier, I think you'll see the changes are quite simple.

I'm ok with this being an explicit opt-in (vs. opt-out as I have it) your 
{{tiered="true"}} idea. However, I prefer the parameter be named {{plist}} or 
{{parallel} instead of {{tiered}} if you're ok with that? The reason it is 
opt-out now is because I didn't want users to have to explicitly specify this 
when using a collection alias, i.e. opt-in requires changes to existing apps to 
make use of this. However, I appreciate your concern about existing apps and 
feel it's OK to require a change to enable an optimization, esp. if we keep the 
System property to allow enabling this globally vs. in each expression 
specifically.

> Use plist automatically for executing a facet expression against a collection 
> alias backed by multiple collections
> --
>
> Key: SOLR-15036
> URL: https://issues.apache.org/jira/browse/SOLR-15036
> Project: Solr
>  Issue Type: Improvement
>  Security Level: Public(Default Security Level. Issues are Public) 
>  Components: streaming expressions
>Reporter: Timothy Potter
>Assignee: Timothy Potter
>Priority: Major
> Attachments: relay-approach.patch
>
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> For analytics use cases, streaming expressions make it possible to compute 
> basic aggregations (count, min, max, sum, and avg) over massive data sets. 
> Moreover, with massive data sets, it is common to use collection aliases over 
> many underlying collections, for instance time-partitioned aliases backed by 
> a set of collections, each covering a specific time range. In some cases, we 
> can end up with many collections (think 50-60) each with 100's of shards. 
> Aliases help insulate client applications from complex collection topologies 
> on the server side.
> Let's take a basic facet expression that computes some useful aggregation 
> metrics:
> {code:java}
> facet(
>   some_alias, 
>   q="*:*", 
>   fl="a_i", 
>   sort="a_i asc", 
>   buckets="a_i", 
>   bucketSorts="count(*) asc", 
>   bucketSizeLimit=1, 
>   sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
> )
> {code}
> Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr 
> which then expands the alias to a list of collections. For each collection, 
> the top-level distributed query controller gathers a candidate set of 
> replicas to query and then scatters {{distrib=false}} queries to each replica 
> in the list. For instance, if we have 60 collections with 200 shards each, 
> then this results in 12,000 shard requests from the query controller node to 
> the other nodes in the cluster. The requests are sent in an async manner (see 
> {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases 
> where we hit 18,000 replicas and these queries don’t always come back in a 
> timely manner. Put simply, this also puts a lot of load on the top-level 
> query controller node in terms of open connections and new object creation.
> Instead, we can use {{plist}} to send the JSON facet query to each collection 
> in the alias in parallel, which reduces the overhead of each top-level 
> distributed query from 12,000 to 200 in my example above. With this approach, 
> you’ll then need to sort the tuples back from each collection and do a 
> rollup, something like:
> {code:java}
> select(
>   rollup(
> sort(
>   plist(
> select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt),
> select(facet(coll2,q="*:*", fl="a_i", sort="a_i asc", 

[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2021-01-08 Thread Joel Bernstein (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17261539#comment-17261539
 ] 

Joel Bernstein commented on SOLR-15036:
---

One thing we could do to remove any risks and move faster with this would be 
make the default behavior by the standard, non-plist, approach. Then we could 
turn it on with named param or a system param.

> Use plist automatically for executing a facet expression against a collection 
> alias backed by multiple collections
> --
>
> Key: SOLR-15036
> URL: https://issues.apache.org/jira/browse/SOLR-15036
> Project: Solr
>  Issue Type: Improvement
>  Security Level: Public(Default Security Level. Issues are Public) 
>  Components: streaming expressions
>Reporter: Timothy Potter
>Assignee: Timothy Potter
>Priority: Major
> Attachments: relay-approach.patch
>
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> For analytics use cases, streaming expressions make it possible to compute 
> basic aggregations (count, min, max, sum, and avg) over massive data sets. 
> Moreover, with massive data sets, it is common to use collection aliases over 
> many underlying collections, for instance time-partitioned aliases backed by 
> a set of collections, each covering a specific time range. In some cases, we 
> can end up with many collections (think 50-60) each with 100's of shards. 
> Aliases help insulate client applications from complex collection topologies 
> on the server side.
> Let's take a basic facet expression that computes some useful aggregation 
> metrics:
> {code:java}
> facet(
>   some_alias, 
>   q="*:*", 
>   fl="a_i", 
>   sort="a_i asc", 
>   buckets="a_i", 
>   bucketSorts="count(*) asc", 
>   bucketSizeLimit=1, 
>   sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
> )
> {code}
> Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr 
> which then expands the alias to a list of collections. For each collection, 
> the top-level distributed query controller gathers a candidate set of 
> replicas to query and then scatters {{distrib=false}} queries to each replica 
> in the list. For instance, if we have 60 collections with 200 shards each, 
> then this results in 12,000 shard requests from the query controller node to 
> the other nodes in the cluster. The requests are sent in an async manner (see 
> {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases 
> where we hit 18,000 replicas and these queries don’t always come back in a 
> timely manner. Put simply, this also puts a lot of load on the top-level 
> query controller node in terms of open connections and new object creation.
> Instead, we can use {{plist}} to send the JSON facet query to each collection 
> in the alias in parallel, which reduces the overhead of each top-level 
> distributed query from 12,000 to 200 in my example above. With this approach, 
> you’ll then need to sort the tuples back from each collection and do a 
> rollup, something like:
> {code:java}
> select(
>   rollup(
> sort(
>   plist(
> select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt),
> select(facet(coll2,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt)
>   ),
>   by="a_i asc"
> ),
> over="a_i",
> sum(the_sum), avg(the_avg), min(the_min), max(the_max), sum(cnt)
>   ),
>   a_i, sum(the_sum) as the_sum, avg(the_avg) as the_avg, min(the_min) as 
> the_min, max(the_max) as the_max, sum(cnt) as cnt
> )
> {code}
> One thing to point out is that you can’t just avg. the averages back from 
> each collection in the rollup. It needs to be a *weighted avg.* when rolling 
> up the avg. from each facet expression in the plist. However, we have the 
> count per collection, so this is doable but will require some changes to the 
> rollup expression to support weighted average.
> While this plist approach is doable, it’s a pain for users to have to create 
> the rollup / sort over plist expression for collection aliases. After all, 
> aliases are supposed to hide these types of complexities from client 
> applications!
> The point of this ticket is to investigate the feasibility of auto-wrapping 
> the facet expression with a rollup / sort / plist when the collection 
> argument is an alias with 

[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2021-01-08 Thread Joel Bernstein (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17261532#comment-17261532
 ] 

Joel Bernstein commented on SOLR-15036:
---

A couple of thoughts based mainly on the description of the PR. 

It sounds like a change was made directly to the facet expression so that it 
behaves differently. In general with streaming expressions I like to create 
wrapper classes, or new implementations, where possible, rather than changing 
the behavior of existing functions. I like to do this for several reasons:

1) Lower risk. There are applications in the field that use the facet 
expression heavily. Changing the behavior of the facet expression adds more 
risk that a bug or behavioral changes impacts existing apps.

2) Using composition keeps the class implementations simpler.

3) The design of streaming expressions is really all about composable 
functions. So in keeping with that design if possible I usually choose to add a 
new function rather than changing behavior of existing functions.
 
4) Speed of development. When adding new functions you can move fast and break 
things, without breaking any existing applications. The amount of review and 
testing I would do for a change to the facet expression is 10x the amount of 
review and testing I would do for a new function.

That all being said, if there is some important reason to change the facet 
expression directly then of course it can be done.


> Use plist automatically for executing a facet expression against a collection 
> alias backed by multiple collections
> --
>
> Key: SOLR-15036
> URL: https://issues.apache.org/jira/browse/SOLR-15036
> Project: Solr
>  Issue Type: Improvement
>  Security Level: Public(Default Security Level. Issues are Public) 
>  Components: streaming expressions
>Reporter: Timothy Potter
>Assignee: Timothy Potter
>Priority: Major
> Attachments: relay-approach.patch
>
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> For analytics use cases, streaming expressions make it possible to compute 
> basic aggregations (count, min, max, sum, and avg) over massive data sets. 
> Moreover, with massive data sets, it is common to use collection aliases over 
> many underlying collections, for instance time-partitioned aliases backed by 
> a set of collections, each covering a specific time range. In some cases, we 
> can end up with many collections (think 50-60) each with 100's of shards. 
> Aliases help insulate client applications from complex collection topologies 
> on the server side.
> Let's take a basic facet expression that computes some useful aggregation 
> metrics:
> {code:java}
> facet(
>   some_alias, 
>   q="*:*", 
>   fl="a_i", 
>   sort="a_i asc", 
>   buckets="a_i", 
>   bucketSorts="count(*) asc", 
>   bucketSizeLimit=1, 
>   sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
> )
> {code}
> Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr 
> which then expands the alias to a list of collections. For each collection, 
> the top-level distributed query controller gathers a candidate set of 
> replicas to query and then scatters {{distrib=false}} queries to each replica 
> in the list. For instance, if we have 60 collections with 200 shards each, 
> then this results in 12,000 shard requests from the query controller node to 
> the other nodes in the cluster. The requests are sent in an async manner (see 
> {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases 
> where we hit 18,000 replicas and these queries don’t always come back in a 
> timely manner. Put simply, this also puts a lot of load on the top-level 
> query controller node in terms of open connections and new object creation.
> Instead, we can use {{plist}} to send the JSON facet query to each collection 
> in the alias in parallel, which reduces the overhead of each top-level 
> distributed query from 12,000 to 200 in my example above. With this approach, 
> you’ll then need to sort the tuples back from each collection and do a 
> rollup, something like:
> {code:java}
> select(
>   rollup(
> sort(
>   plist(
> select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt),
> select(facet(coll2,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, 

[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2021-01-08 Thread Timothy Potter (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17261434#comment-17261434
 ] 

Timothy Potter commented on SOLR-15036:
---

[~jbernste] I think this is ready to go and would appreciate your eyes on it: 
https://github.com/apache/lucene-solr/pull/2132

> Use plist automatically for executing a facet expression against a collection 
> alias backed by multiple collections
> --
>
> Key: SOLR-15036
> URL: https://issues.apache.org/jira/browse/SOLR-15036
> Project: Solr
>  Issue Type: Improvement
>  Security Level: Public(Default Security Level. Issues are Public) 
>  Components: streaming expressions
>Reporter: Timothy Potter
>Assignee: Timothy Potter
>Priority: Major
> Attachments: relay-approach.patch
>
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> For analytics use cases, streaming expressions make it possible to compute 
> basic aggregations (count, min, max, sum, and avg) over massive data sets. 
> Moreover, with massive data sets, it is common to use collection aliases over 
> many underlying collections, for instance time-partitioned aliases backed by 
> a set of collections, each covering a specific time range. In some cases, we 
> can end up with many collections (think 50-60) each with 100's of shards. 
> Aliases help insulate client applications from complex collection topologies 
> on the server side.
> Let's take a basic facet expression that computes some useful aggregation 
> metrics:
> {code:java}
> facet(
>   some_alias, 
>   q="*:*", 
>   fl="a_i", 
>   sort="a_i asc", 
>   buckets="a_i", 
>   bucketSorts="count(*) asc", 
>   bucketSizeLimit=1, 
>   sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
> )
> {code}
> Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr 
> which then expands the alias to a list of collections. For each collection, 
> the top-level distributed query controller gathers a candidate set of 
> replicas to query and then scatters {{distrib=false}} queries to each replica 
> in the list. For instance, if we have 60 collections with 200 shards each, 
> then this results in 12,000 shard requests from the query controller node to 
> the other nodes in the cluster. The requests are sent in an async manner (see 
> {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases 
> where we hit 18,000 replicas and these queries don’t always come back in a 
> timely manner. Put simply, this also puts a lot of load on the top-level 
> query controller node in terms of open connections and new object creation.
> Instead, we can use {{plist}} to send the JSON facet query to each collection 
> in the alias in parallel, which reduces the overhead of each top-level 
> distributed query from 12,000 to 200 in my example above. With this approach, 
> you’ll then need to sort the tuples back from each collection and do a 
> rollup, something like:
> {code:java}
> select(
>   rollup(
> sort(
>   plist(
> select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt),
> select(facet(coll2,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt)
>   ),
>   by="a_i asc"
> ),
> over="a_i",
> sum(the_sum), avg(the_avg), min(the_min), max(the_max), sum(cnt)
>   ),
>   a_i, sum(the_sum) as the_sum, avg(the_avg) as the_avg, min(the_min) as 
> the_min, max(the_max) as the_max, sum(cnt) as cnt
> )
> {code}
> One thing to point out is that you can’t just avg. the averages back from 
> each collection in the rollup. It needs to be a *weighted avg.* when rolling 
> up the avg. from each facet expression in the plist. However, we have the 
> count per collection, so this is doable but will require some changes to the 
> rollup expression to support weighted average.
> While this plist approach is doable, it’s a pain for users to have to create 
> the rollup / sort over plist expression for collection aliases. After all, 
> aliases are supposed to hide these types of complexities from client 
> applications!
> The point of this ticket is to investigate the feasibility of auto-wrapping 
> the facet expression with a rollup / sort / plist when the collection 
> argument is an alias with multiple collections; other stream sources will be 
> considered after facet 

[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2020-12-15 Thread Timothy Potter (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17249765#comment-17249765
 ] 

Timothy Potter commented on SOLR-15036:
---

Confirmed, it works nicely now! Thanks for your help Joel

{code}
{count(*)=6, a_i=0, max(max(a_d))=2.2515625018914305, 
min(min(a_d))=-0.5859583807765252, sum(sum(a_d))=5.894460990302006, 
wsum(avg(a_d), count(*))=0.9824101650503342}
{count(*)=4, a_i=1, max(max(a_d))=3.338305310115201, 
min(min(a_d))=0.03050220236482515, sum(sum(a_d))=12.517492417715335, 
wsum(avg(a_d), count(*))=2.086248736285889}
{count(*)=4, a_i=2, max(max(a_d))=4.832815828279073, 
min(min(a_d))=3.16905458918893, sum(sum(a_d))=24.076139429000165, 
wsum(avg(a_d), count(*))=4.012689904833361}
{count(*)=4, a_i=3, max(max(a_d))=5.66831997419713, 
min(min(a_d))=2.902262184046103, sum(sum(a_d))=22.58303980377591, 
wsum(avg(a_d), count(*))=3.763839967295984}
{count(*)=4, a_i=4, max(max(a_d))=6.531585917691583, 
min(min(a_d))=2.6395698661907963, sum(sum(a_d))=28.243748570490624, 
wsum(avg(a_d), count(*))=4.707291428415103}
{count(*)=5, a_i=5, max(max(a_d))=7.555382540979672, 
min(min(a_d))=4.808772939476107, sum(sum(a_d))=37.88196903407075, 
wsum(avg(a_d), count(*))=6.313661505678459}
{count(*)=5, a_i=6, max(max(a_d))=8.416136012729918, 
min(min(a_d))=5.422492404700898, sum(sum(a_d))=39.25679972070782, 
wsum(avg(a_d), count(*))=6.542799953451303}
{count(*)=5, a_i=7, max(max(a_d))=8.667999236934058, 
min(min(a_d))=6.934577412906803, sum(sum(a_d))=46.7622185952807, wsum(avg(a_d), 
count(*))=7.793703099213451}
{count(*)=5, a_i=8, max(max(a_d))=9.566181963643201, 
min(min(a_d))=7.4397380388592556, sum(sum(a_d))=53.296172957938325, 
wsum(avg(a_d), count(*))=8.88269549298972}
{count(*)=4, a_i=9, max(max(a_d))=12.251349466753346, 
min(min(a_d))=9.232427215193514, sum(sum(a_d))=63.46244550204135, 
wsum(avg(a_d), count(*))=10.577074250340223}
{code}

> Use plist automatically for executing a facet expression against a collection 
> alias backed by multiple collections
> --
>
> Key: SOLR-15036
> URL: https://issues.apache.org/jira/browse/SOLR-15036
> Project: Solr
>  Issue Type: Improvement
>  Security Level: Public(Default Security Level. Issues are Public) 
>  Components: streaming expressions
>Reporter: Timothy Potter
>Assignee: Timothy Potter
>Priority: Major
> Attachments: relay-approach.patch
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> For analytics use cases, streaming expressions make it possible to compute 
> basic aggregations (count, min, max, sum, and avg) over massive data sets. 
> Moreover, with massive data sets, it is common to use collection aliases over 
> many underlying collections, for instance time-partitioned aliases backed by 
> a set of collections, each covering a specific time range. In some cases, we 
> can end up with many collections (think 50-60) each with 100's of shards. 
> Aliases help insulate client applications from complex collection topologies 
> on the server side.
> Let's take a basic facet expression that computes some useful aggregation 
> metrics:
> {code:java}
> facet(
>   some_alias, 
>   q="*:*", 
>   fl="a_i", 
>   sort="a_i asc", 
>   buckets="a_i", 
>   bucketSorts="count(*) asc", 
>   bucketSizeLimit=1, 
>   sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
> )
> {code}
> Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr 
> which then expands the alias to a list of collections. For each collection, 
> the top-level distributed query controller gathers a candidate set of 
> replicas to query and then scatters {{distrib=false}} queries to each replica 
> in the list. For instance, if we have 60 collections with 200 shards each, 
> then this results in 12,000 shard requests from the query controller node to 
> the other nodes in the cluster. The requests are sent in an async manner (see 
> {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases 
> where we hit 18,000 replicas and these queries don’t always come back in a 
> timely manner. Put simply, this also puts a lot of load on the top-level 
> query controller node in terms of open connections and new object creation.
> Instead, we can use {{plist}} to send the JSON facet query to each collection 
> in the alias in parallel, which reduces the overhead of each top-level 
> distributed query from 12,000 to 200 in my example above. With this approach, 
> you’ll then need to sort the tuples back from each collection and do a 
> rollup, something like:
> {code:java}
> select(
>   rollup(
> sort(
>   plist(
> select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", 

[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2020-12-15 Thread Timothy Potter (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17249747#comment-17249747
 ] 

Timothy Potter commented on SOLR-15036:
---

Oh darn, I should have spotted that! Sorry for the noise there [~jbernste] ... 
seems like we could improve the error handling in the drill code to barf if the 
user is trying to compute metrics for fields not in the fl? That could help 
with silly mistakes like this ...

> Use plist automatically for executing a facet expression against a collection 
> alias backed by multiple collections
> --
>
> Key: SOLR-15036
> URL: https://issues.apache.org/jira/browse/SOLR-15036
> Project: Solr
>  Issue Type: Improvement
>  Security Level: Public(Default Security Level. Issues are Public) 
>  Components: streaming expressions
>Reporter: Timothy Potter
>Assignee: Timothy Potter
>Priority: Major
> Attachments: relay-approach.patch
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> For analytics use cases, streaming expressions make it possible to compute 
> basic aggregations (count, min, max, sum, and avg) over massive data sets. 
> Moreover, with massive data sets, it is common to use collection aliases over 
> many underlying collections, for instance time-partitioned aliases backed by 
> a set of collections, each covering a specific time range. In some cases, we 
> can end up with many collections (think 50-60) each with 100's of shards. 
> Aliases help insulate client applications from complex collection topologies 
> on the server side.
> Let's take a basic facet expression that computes some useful aggregation 
> metrics:
> {code:java}
> facet(
>   some_alias, 
>   q="*:*", 
>   fl="a_i", 
>   sort="a_i asc", 
>   buckets="a_i", 
>   bucketSorts="count(*) asc", 
>   bucketSizeLimit=1, 
>   sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
> )
> {code}
> Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr 
> which then expands the alias to a list of collections. For each collection, 
> the top-level distributed query controller gathers a candidate set of 
> replicas to query and then scatters {{distrib=false}} queries to each replica 
> in the list. For instance, if we have 60 collections with 200 shards each, 
> then this results in 12,000 shard requests from the query controller node to 
> the other nodes in the cluster. The requests are sent in an async manner (see 
> {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases 
> where we hit 18,000 replicas and these queries don’t always come back in a 
> timely manner. Put simply, this also puts a lot of load on the top-level 
> query controller node in terms of open connections and new object creation.
> Instead, we can use {{plist}} to send the JSON facet query to each collection 
> in the alias in parallel, which reduces the overhead of each top-level 
> distributed query from 12,000 to 200 in my example above. With this approach, 
> you’ll then need to sort the tuples back from each collection and do a 
> rollup, something like:
> {code:java}
> select(
>   rollup(
> sort(
>   plist(
> select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt),
> select(facet(coll2,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt)
>   ),
>   by="a_i asc"
> ),
> over="a_i",
> sum(the_sum), avg(the_avg), min(the_min), max(the_max), sum(cnt)
>   ),
>   a_i, sum(the_sum) as the_sum, avg(the_avg) as the_avg, min(the_min) as 
> the_min, max(the_max) as the_max, sum(cnt) as cnt
> )
> {code}
> One thing to point out is that you can’t just avg. the averages back from 
> each collection in the rollup. It needs to be a *weighted avg.* when rolling 
> up the avg. from each facet expression in the plist. However, we have the 
> count per collection, so this is doable but will require some changes to the 
> rollup expression to support weighted average.
> While this plist approach is doable, it’s a pain for users to have to create 
> the rollup / sort over plist expression for collection aliases. After all, 
> aliases are supposed to hide these types of complexities from client 
> applications!
> The point of this ticket is to investigate the feasibility of auto-wrapping 
> the facet expression with a rollup 

[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2020-12-15 Thread Joel Bernstein (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17249692#comment-17249692
 ] 

Joel Bernstein commented on SOLR-15036:
---

The fl for drill needs to include the *a_d* field. Basically you're rolling up 
and aggregating from the exported field.s

> Use plist automatically for executing a facet expression against a collection 
> alias backed by multiple collections
> --
>
> Key: SOLR-15036
> URL: https://issues.apache.org/jira/browse/SOLR-15036
> Project: Solr
>  Issue Type: Improvement
>  Security Level: Public(Default Security Level. Issues are Public) 
>  Components: streaming expressions
>Reporter: Timothy Potter
>Assignee: Timothy Potter
>Priority: Major
> Attachments: relay-approach.patch
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> For analytics use cases, streaming expressions make it possible to compute 
> basic aggregations (count, min, max, sum, and avg) over massive data sets. 
> Moreover, with massive data sets, it is common to use collection aliases over 
> many underlying collections, for instance time-partitioned aliases backed by 
> a set of collections, each covering a specific time range. In some cases, we 
> can end up with many collections (think 50-60) each with 100's of shards. 
> Aliases help insulate client applications from complex collection topologies 
> on the server side.
> Let's take a basic facet expression that computes some useful aggregation 
> metrics:
> {code:java}
> facet(
>   some_alias, 
>   q="*:*", 
>   fl="a_i", 
>   sort="a_i asc", 
>   buckets="a_i", 
>   bucketSorts="count(*) asc", 
>   bucketSizeLimit=1, 
>   sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
> )
> {code}
> Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr 
> which then expands the alias to a list of collections. For each collection, 
> the top-level distributed query controller gathers a candidate set of 
> replicas to query and then scatters {{distrib=false}} queries to each replica 
> in the list. For instance, if we have 60 collections with 200 shards each, 
> then this results in 12,000 shard requests from the query controller node to 
> the other nodes in the cluster. The requests are sent in an async manner (see 
> {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases 
> where we hit 18,000 replicas and these queries don’t always come back in a 
> timely manner. Put simply, this also puts a lot of load on the top-level 
> query controller node in terms of open connections and new object creation.
> Instead, we can use {{plist}} to send the JSON facet query to each collection 
> in the alias in parallel, which reduces the overhead of each top-level 
> distributed query from 12,000 to 200 in my example above. With this approach, 
> you’ll then need to sort the tuples back from each collection and do a 
> rollup, something like:
> {code:java}
> select(
>   rollup(
> sort(
>   plist(
> select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt),
> select(facet(coll2,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt)
>   ),
>   by="a_i asc"
> ),
> over="a_i",
> sum(the_sum), avg(the_avg), min(the_min), max(the_max), sum(cnt)
>   ),
>   a_i, sum(the_sum) as the_sum, avg(the_avg) as the_avg, min(the_min) as 
> the_min, max(the_max) as the_max, sum(cnt) as cnt
> )
> {code}
> One thing to point out is that you can’t just avg. the averages back from 
> each collection in the rollup. It needs to be a *weighted avg.* when rolling 
> up the avg. from each facet expression in the plist. However, we have the 
> count per collection, so this is doable but will require some changes to the 
> rollup expression to support weighted average.
> While this plist approach is doable, it’s a pain for users to have to create 
> the rollup / sort over plist expression for collection aliases. After all, 
> aliases are supposed to hide these types of complexities from client 
> applications!
> The point of this ticket is to investigate the feasibility of auto-wrapping 
> the facet expression with a rollup / sort / plist when the collection 
> argument is an alias with multiple collections; other stream sources will be 
> considered after facet is proven 

[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2020-12-14 Thread Timothy Potter (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17249387#comment-17249387
 ] 

Timothy Potter commented on SOLR-15036:
---

[~jbernste] Does drill only support counting per dimension? 

For my test case, I'm expecting these tuples:
{code}
{sum(a_d)=5.894460990302005, a_i=0, count(*)=6, max(a_d)=2.2515625018914305, 
avg(a_d)=0.9824101650503341, min(a_d)=-0.5859583807765252}
{sum(a_d)=12.517492417715335, a_i=1, count(*)=6, max(a_d)=3.338305310115201, 
avg(a_d)=2.086248736285889, min(a_d)=0.03050220236482515}
{sum(a_d)=24.076139429000165, a_i=2, count(*)=6, max(a_d)=4.832815828279073, 
avg(a_d)=4.01268990483336, min(a_d)=3.16905458918893}
{sum(a_d)=22.583039803775907, a_i=3, count(*)=6, max(a_d)=5.66831997419713, 
avg(a_d)=3.7638399672959846, min(a_d)=2.902262184046103}
{sum(a_d)=28.243748570490624, a_i=4, count(*)=6, max(a_d)=6.531585917691583, 
avg(a_d)=4.707291428415104, min(a_d)=2.6395698661907963}
{sum(a_d)=37.88196903407075, a_i=5, count(*)=6, max(a_d)=7.555382540979672, 
avg(a_d)=6.313661505678458, min(a_d)=4.808772939476107}
{sum(a_d)=39.25679972070782, a_i=6, count(*)=6, max(a_d)=8.416136012729918, 
avg(a_d)=6.542799953451302, min(a_d)=5.422492404700898}
{sum(a_d)=46.7622185952807, a_i=7, count(*)=6, max(a_d)=8.667999236934058, 
avg(a_d)=7.793703099213451, min(a_d)=6.934577412906803}
{sum(a_d)=53.296172957938325, a_i=8, count(*)=6, max(a_d)=9.566181963643201, 
avg(a_d)=8.88269549298972, min(a_d)=7.4397380388592556}
{sum(a_d)=63.46244550204135, a_i=9, count(*)=6, max(a_d)=12.251349466753346, 
avg(a_d)=10.577074250340225, min(a_d)=9.232427215193514}
{code}
Which is what is produced by the facet expression in my description for this 
JIRA.

But {{drill}} doesn't seem like it's aggregating by the {{a_i}} dimension as 
expected. The following drill expression (w/o rollup for now) produces the 
following tuples for the same setup ^
{code}
drill(
  SOME_ALIAS_WITH_MANY_COLLS,
  q="*:*",
  fl="a_i",
  sort="a_i asc",
  rollup(
input(),
over="a_i",
sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
  )
)
{code}

Produces Tuple stream:
{code}
{sum(a_d)=0.0, count(*)=1, a_i=0, avg(a_d)=0.0, 
max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308}
{sum(a_d)=0.0, count(*)=3, a_i=0, avg(a_d)=0.0, 
max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308}
{sum(a_d)=0.0, count(*)=1, a_i=0, avg(a_d)=0.0, 
max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308}
{sum(a_d)=0.0, count(*)=1, a_i=0, avg(a_d)=0.0, 
max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308}
{sum(a_d)=0.0, count(*)=2, a_i=1, avg(a_d)=0.0, 
max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308}
{sum(a_d)=0.0, count(*)=1, a_i=1, avg(a_d)=0.0, 
max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308}
{sum(a_d)=0.0, count(*)=2, a_i=1, avg(a_d)=0.0, 
max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308}
{sum(a_d)=0.0, count(*)=1, a_i=1, avg(a_d)=0.0, 
max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308}
{sum(a_d)=0.0, count(*)=1, a_i=2, avg(a_d)=0.0, 
max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308}
{sum(a_d)=0.0, count(*)=2, a_i=2, avg(a_d)=0.0, 
max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308}
{sum(a_d)=0.0, count(*)=2, a_i=2, avg(a_d)=0.0, 
max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308}
{sum(a_d)=0.0, count(*)=1, a_i=2, avg(a_d)=0.0, 
max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308}
{sum(a_d)=0.0, count(*)=1, a_i=3, avg(a_d)=0.0, 
max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308}
{sum(a_d)=0.0, count(*)=1, a_i=3, avg(a_d)=0.0, 
max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308}
{sum(a_d)=0.0, count(*)=1, a_i=3, avg(a_d)=0.0, 
max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308}
{sum(a_d)=0.0, count(*)=2, a_i=3, avg(a_d)=0.0, 
max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308}
{sum(a_d)=0.0, count(*)=1, a_i=3, avg(a_d)=0.0, 
max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308}
{sum(a_d)=0.0, count(*)=1, a_i=4, avg(a_d)=0.0, 
max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308}
{sum(a_d)=0.0, count(*)=1, a_i=4, avg(a_d)=0.0, 
max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308}
{sum(a_d)=0.0, count(*)=1, a_i=4, avg(a_d)=0.0, 
max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308}
{sum(a_d)=0.0, count(*)=1, a_i=4, avg(a_d)=0.0, 
max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308}
{sum(a_d)=0.0, count(*)=1, a_i=4, avg(a_d)=0.0, 
max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308}
{sum(a_d)=0.0, count(*)=1, a_i=4, avg(a_d)=0.0, 
max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308}
{sum(a_d)=0.0, count(*)=2, a_i=5, avg(a_d)=0.0, 
max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308}

[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2020-12-14 Thread Timothy Potter (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17249142#comment-17249142
 ] 

Timothy Potter commented on SOLR-15036:
---

Thank you! I read that in the docs but it didn't quite sink in I guess. 

> Use plist automatically for executing a facet expression against a collection 
> alias backed by multiple collections
> --
>
> Key: SOLR-15036
> URL: https://issues.apache.org/jira/browse/SOLR-15036
> Project: Solr
>  Issue Type: Improvement
>  Security Level: Public(Default Security Level. Issues are Public) 
>  Components: streaming expressions
>Reporter: Timothy Potter
>Assignee: Timothy Potter
>Priority: Major
> Attachments: relay-approach.patch
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> For analytics use cases, streaming expressions make it possible to compute 
> basic aggregations (count, min, max, sum, and avg) over massive data sets. 
> Moreover, with massive data sets, it is common to use collection aliases over 
> many underlying collections, for instance time-partitioned aliases backed by 
> a set of collections, each covering a specific time range. In some cases, we 
> can end up with many collections (think 50-60) each with 100's of shards. 
> Aliases help insulate client applications from complex collection topologies 
> on the server side.
> Let's take a basic facet expression that computes some useful aggregation 
> metrics:
> {code:java}
> facet(
>   some_alias, 
>   q="*:*", 
>   fl="a_i", 
>   sort="a_i asc", 
>   buckets="a_i", 
>   bucketSorts="count(*) asc", 
>   bucketSizeLimit=1, 
>   sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
> )
> {code}
> Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr 
> which then expands the alias to a list of collections. For each collection, 
> the top-level distributed query controller gathers a candidate set of 
> replicas to query and then scatters {{distrib=false}} queries to each replica 
> in the list. For instance, if we have 60 collections with 200 shards each, 
> then this results in 12,000 shard requests from the query controller node to 
> the other nodes in the cluster. The requests are sent in an async manner (see 
> {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases 
> where we hit 18,000 replicas and these queries don’t always come back in a 
> timely manner. Put simply, this also puts a lot of load on the top-level 
> query controller node in terms of open connections and new object creation.
> Instead, we can use {{plist}} to send the JSON facet query to each collection 
> in the alias in parallel, which reduces the overhead of each top-level 
> distributed query from 12,000 to 200 in my example above. With this approach, 
> you’ll then need to sort the tuples back from each collection and do a 
> rollup, something like:
> {code:java}
> select(
>   rollup(
> sort(
>   plist(
> select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt),
> select(facet(coll2,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt)
>   ),
>   by="a_i asc"
> ),
> over="a_i",
> sum(the_sum), avg(the_avg), min(the_min), max(the_max), sum(cnt)
>   ),
>   a_i, sum(the_sum) as the_sum, avg(the_avg) as the_avg, min(the_min) as 
> the_min, max(the_max) as the_max, sum(cnt) as cnt
> )
> {code}
> One thing to point out is that you can’t just avg. the averages back from 
> each collection in the rollup. It needs to be a *weighted avg.* when rolling 
> up the avg. from each facet expression in the plist. However, we have the 
> count per collection, so this is doable but will require some changes to the 
> rollup expression to support weighted average.
> While this plist approach is doable, it’s a pain for users to have to create 
> the rollup / sort over plist expression for collection aliases. After all, 
> aliases are supposed to hide these types of complexities from client 
> applications!
> The point of this ticket is to investigate the feasibility of auto-wrapping 
> the facet expression with a rollup / sort / plist when the collection 
> argument is an alias with multiple collections; other stream sources will be 
> considered after facet is proven out.
> Lastly, I also considered an alternative 

[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2020-12-14 Thread Joel Bernstein (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17249110#comment-17249110
 ] 

Joel Bernstein commented on SOLR-15036:
---

>From the test cases:

{code}
rollup(
select(
 drill(collection1, 
   q="*:*", 
   fl="a_s, a_f", 
   sort="a_s desc", 
   rollup(input(), over="a_s", count(*), sum(a_f))),
  a_s, count(*) as cnt, sum(a_f) as saf),
 over="a_s",
 sum(cnt), sum(saf))
{code}

> Use plist automatically for executing a facet expression against a collection 
> alias backed by multiple collections
> --
>
> Key: SOLR-15036
> URL: https://issues.apache.org/jira/browse/SOLR-15036
> Project: Solr
>  Issue Type: Improvement
>  Security Level: Public(Default Security Level. Issues are Public) 
>  Components: streaming expressions
>Reporter: Timothy Potter
>Assignee: Timothy Potter
>Priority: Major
> Attachments: relay-approach.patch
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> For analytics use cases, streaming expressions make it possible to compute 
> basic aggregations (count, min, max, sum, and avg) over massive data sets. 
> Moreover, with massive data sets, it is common to use collection aliases over 
> many underlying collections, for instance time-partitioned aliases backed by 
> a set of collections, each covering a specific time range. In some cases, we 
> can end up with many collections (think 50-60) each with 100's of shards. 
> Aliases help insulate client applications from complex collection topologies 
> on the server side.
> Let's take a basic facet expression that computes some useful aggregation 
> metrics:
> {code:java}
> facet(
>   some_alias, 
>   q="*:*", 
>   fl="a_i", 
>   sort="a_i asc", 
>   buckets="a_i", 
>   bucketSorts="count(*) asc", 
>   bucketSizeLimit=1, 
>   sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
> )
> {code}
> Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr 
> which then expands the alias to a list of collections. For each collection, 
> the top-level distributed query controller gathers a candidate set of 
> replicas to query and then scatters {{distrib=false}} queries to each replica 
> in the list. For instance, if we have 60 collections with 200 shards each, 
> then this results in 12,000 shard requests from the query controller node to 
> the other nodes in the cluster. The requests are sent in an async manner (see 
> {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases 
> where we hit 18,000 replicas and these queries don’t always come back in a 
> timely manner. Put simply, this also puts a lot of load on the top-level 
> query controller node in terms of open connections and new object creation.
> Instead, we can use {{plist}} to send the JSON facet query to each collection 
> in the alias in parallel, which reduces the overhead of each top-level 
> distributed query from 12,000 to 200 in my example above. With this approach, 
> you’ll then need to sort the tuples back from each collection and do a 
> rollup, something like:
> {code:java}
> select(
>   rollup(
> sort(
>   plist(
> select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt),
> select(facet(coll2,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt)
>   ),
>   by="a_i asc"
> ),
> over="a_i",
> sum(the_sum), avg(the_avg), min(the_min), max(the_max), sum(cnt)
>   ),
>   a_i, sum(the_sum) as the_sum, avg(the_avg) as the_avg, min(the_min) as 
> the_min, max(the_max) as the_max, sum(cnt) as cnt
> )
> {code}
> One thing to point out is that you can’t just avg. the averages back from 
> each collection in the rollup. It needs to be a *weighted avg.* when rolling 
> up the avg. from each facet expression in the plist. However, we have the 
> count per collection, so this is doable but will require some changes to the 
> rollup expression to support weighted average.
> While this plist approach is doable, it’s a pain for users to have to create 
> the rollup / sort over plist expression for collection aliases. After all, 
> aliases are supposed to hide these types of complexities from client 
> applications!
> The point of this ticket is to investigate the 

[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2020-12-14 Thread Joel Bernstein (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17249107#comment-17249107
 ] 

Joel Bernstein commented on SOLR-15036:
---

>From the docs: 

"The drill function reads and emits the aggregated tuples from each shard 
maintaining the sort order, but does not merge the aggregations. Streaming 
Expression functions can be wrapped around the drill function to merge the 
aggregates."


> Use plist automatically for executing a facet expression against a collection 
> alias backed by multiple collections
> --
>
> Key: SOLR-15036
> URL: https://issues.apache.org/jira/browse/SOLR-15036
> Project: Solr
>  Issue Type: Improvement
>  Security Level: Public(Default Security Level. Issues are Public) 
>  Components: streaming expressions
>Reporter: Timothy Potter
>Assignee: Timothy Potter
>Priority: Major
> Attachments: relay-approach.patch
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> For analytics use cases, streaming expressions make it possible to compute 
> basic aggregations (count, min, max, sum, and avg) over massive data sets. 
> Moreover, with massive data sets, it is common to use collection aliases over 
> many underlying collections, for instance time-partitioned aliases backed by 
> a set of collections, each covering a specific time range. In some cases, we 
> can end up with many collections (think 50-60) each with 100's of shards. 
> Aliases help insulate client applications from complex collection topologies 
> on the server side.
> Let's take a basic facet expression that computes some useful aggregation 
> metrics:
> {code:java}
> facet(
>   some_alias, 
>   q="*:*", 
>   fl="a_i", 
>   sort="a_i asc", 
>   buckets="a_i", 
>   bucketSorts="count(*) asc", 
>   bucketSizeLimit=1, 
>   sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
> )
> {code}
> Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr 
> which then expands the alias to a list of collections. For each collection, 
> the top-level distributed query controller gathers a candidate set of 
> replicas to query and then scatters {{distrib=false}} queries to each replica 
> in the list. For instance, if we have 60 collections with 200 shards each, 
> then this results in 12,000 shard requests from the query controller node to 
> the other nodes in the cluster. The requests are sent in an async manner (see 
> {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases 
> where we hit 18,000 replicas and these queries don’t always come back in a 
> timely manner. Put simply, this also puts a lot of load on the top-level 
> query controller node in terms of open connections and new object creation.
> Instead, we can use {{plist}} to send the JSON facet query to each collection 
> in the alias in parallel, which reduces the overhead of each top-level 
> distributed query from 12,000 to 200 in my example above. With this approach, 
> you’ll then need to sort the tuples back from each collection and do a 
> rollup, something like:
> {code:java}
> select(
>   rollup(
> sort(
>   plist(
> select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt),
> select(facet(coll2,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt)
>   ),
>   by="a_i asc"
> ),
> over="a_i",
> sum(the_sum), avg(the_avg), min(the_min), max(the_max), sum(cnt)
>   ),
>   a_i, sum(the_sum) as the_sum, avg(the_avg) as the_avg, min(the_min) as 
> the_min, max(the_max) as the_max, sum(cnt) as cnt
> )
> {code}
> One thing to point out is that you can’t just avg. the averages back from 
> each collection in the rollup. It needs to be a *weighted avg.* when rolling 
> up the avg. from each facet expression in the plist. However, we have the 
> count per collection, so this is doable but will require some changes to the 
> rollup expression to support weighted average.
> While this plist approach is doable, it’s a pain for users to have to create 
> the rollup / sort over plist expression for collection aliases. After all, 
> aliases are supposed to hide these types of complexities from client 
> applications!
> The point of this ticket is to investigate the feasibility of auto-wrapping 
> the facet expression with a rollup / sort / 

[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2020-12-14 Thread Joel Bernstein (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17249104#comment-17249104
 ] 

Joel Bernstein commented on SOLR-15036:
---

That's the expected behavior. You need to wrap drill in a rollup.

> Use plist automatically for executing a facet expression against a collection 
> alias backed by multiple collections
> --
>
> Key: SOLR-15036
> URL: https://issues.apache.org/jira/browse/SOLR-15036
> Project: Solr
>  Issue Type: Improvement
>  Security Level: Public(Default Security Level. Issues are Public) 
>  Components: streaming expressions
>Reporter: Timothy Potter
>Assignee: Timothy Potter
>Priority: Major
> Attachments: relay-approach.patch
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> For analytics use cases, streaming expressions make it possible to compute 
> basic aggregations (count, min, max, sum, and avg) over massive data sets. 
> Moreover, with massive data sets, it is common to use collection aliases over 
> many underlying collections, for instance time-partitioned aliases backed by 
> a set of collections, each covering a specific time range. In some cases, we 
> can end up with many collections (think 50-60) each with 100's of shards. 
> Aliases help insulate client applications from complex collection topologies 
> on the server side.
> Let's take a basic facet expression that computes some useful aggregation 
> metrics:
> {code:java}
> facet(
>   some_alias, 
>   q="*:*", 
>   fl="a_i", 
>   sort="a_i asc", 
>   buckets="a_i", 
>   bucketSorts="count(*) asc", 
>   bucketSizeLimit=1, 
>   sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
> )
> {code}
> Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr 
> which then expands the alias to a list of collections. For each collection, 
> the top-level distributed query controller gathers a candidate set of 
> replicas to query and then scatters {{distrib=false}} queries to each replica 
> in the list. For instance, if we have 60 collections with 200 shards each, 
> then this results in 12,000 shard requests from the query controller node to 
> the other nodes in the cluster. The requests are sent in an async manner (see 
> {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases 
> where we hit 18,000 replicas and these queries don’t always come back in a 
> timely manner. Put simply, this also puts a lot of load on the top-level 
> query controller node in terms of open connections and new object creation.
> Instead, we can use {{plist}} to send the JSON facet query to each collection 
> in the alias in parallel, which reduces the overhead of each top-level 
> distributed query from 12,000 to 200 in my example above. With this approach, 
> you’ll then need to sort the tuples back from each collection and do a 
> rollup, something like:
> {code:java}
> select(
>   rollup(
> sort(
>   plist(
> select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt),
> select(facet(coll2,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt)
>   ),
>   by="a_i asc"
> ),
> over="a_i",
> sum(the_sum), avg(the_avg), min(the_min), max(the_max), sum(cnt)
>   ),
>   a_i, sum(the_sum) as the_sum, avg(the_avg) as the_avg, min(the_min) as 
> the_min, max(the_max) as the_max, sum(cnt) as cnt
> )
> {code}
> One thing to point out is that you can’t just avg. the averages back from 
> each collection in the rollup. It needs to be a *weighted avg.* when rolling 
> up the avg. from each facet expression in the plist. However, we have the 
> count per collection, so this is doable but will require some changes to the 
> rollup expression to support weighted average.
> While this plist approach is doable, it’s a pain for users to have to create 
> the rollup / sort over plist expression for collection aliases. After all, 
> aliases are supposed to hide these types of complexities from client 
> applications!
> The point of this ticket is to investigate the feasibility of auto-wrapping 
> the facet expression with a rollup / sort / plist when the collection 
> argument is an alias with multiple collections; other stream sources will be 
> considered after facet is proven out.
> Lastly, I also considered an alternative 

[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2020-12-14 Thread Timothy Potter (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17249101#comment-17249101
 ] 

Timothy Potter commented on SOLR-15036:
---

Ok, I have a fix for that issue but the bigger issue is whether the expression 
is correct and if so, why is it not rolling up over the shards on the client? 
i.e. i'm getting multiple results per bucket instead of one final result per 
bucket?

> Use plist automatically for executing a facet expression against a collection 
> alias backed by multiple collections
> --
>
> Key: SOLR-15036
> URL: https://issues.apache.org/jira/browse/SOLR-15036
> Project: Solr
>  Issue Type: Improvement
>  Security Level: Public(Default Security Level. Issues are Public) 
>  Components: streaming expressions
>Reporter: Timothy Potter
>Assignee: Timothy Potter
>Priority: Major
> Attachments: relay-approach.patch
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> For analytics use cases, streaming expressions make it possible to compute 
> basic aggregations (count, min, max, sum, and avg) over massive data sets. 
> Moreover, with massive data sets, it is common to use collection aliases over 
> many underlying collections, for instance time-partitioned aliases backed by 
> a set of collections, each covering a specific time range. In some cases, we 
> can end up with many collections (think 50-60) each with 100's of shards. 
> Aliases help insulate client applications from complex collection topologies 
> on the server side.
> Let's take a basic facet expression that computes some useful aggregation 
> metrics:
> {code:java}
> facet(
>   some_alias, 
>   q="*:*", 
>   fl="a_i", 
>   sort="a_i asc", 
>   buckets="a_i", 
>   bucketSorts="count(*) asc", 
>   bucketSizeLimit=1, 
>   sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
> )
> {code}
> Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr 
> which then expands the alias to a list of collections. For each collection, 
> the top-level distributed query controller gathers a candidate set of 
> replicas to query and then scatters {{distrib=false}} queries to each replica 
> in the list. For instance, if we have 60 collections with 200 shards each, 
> then this results in 12,000 shard requests from the query controller node to 
> the other nodes in the cluster. The requests are sent in an async manner (see 
> {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases 
> where we hit 18,000 replicas and these queries don’t always come back in a 
> timely manner. Put simply, this also puts a lot of load on the top-level 
> query controller node in terms of open connections and new object creation.
> Instead, we can use {{plist}} to send the JSON facet query to each collection 
> in the alias in parallel, which reduces the overhead of each top-level 
> distributed query from 12,000 to 200 in my example above. With this approach, 
> you’ll then need to sort the tuples back from each collection and do a 
> rollup, something like:
> {code:java}
> select(
>   rollup(
> sort(
>   plist(
> select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt),
> select(facet(coll2,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt)
>   ),
>   by="a_i asc"
> ),
> over="a_i",
> sum(the_sum), avg(the_avg), min(the_min), max(the_max), sum(cnt)
>   ),
>   a_i, sum(the_sum) as the_sum, avg(the_avg) as the_avg, min(the_min) as 
> the_min, max(the_max) as the_max, sum(cnt) as cnt
> )
> {code}
> One thing to point out is that you can’t just avg. the averages back from 
> each collection in the rollup. It needs to be a *weighted avg.* when rolling 
> up the avg. from each facet expression in the plist. However, we have the 
> count per collection, so this is doable but will require some changes to the 
> rollup expression to support weighted average.
> While this plist approach is doable, it’s a pain for users to have to create 
> the rollup / sort over plist expression for collection aliases. After all, 
> aliases are supposed to hide these types of complexities from client 
> applications!
> The point of this ticket is to investigate the feasibility of auto-wrapping 
> the facet expression with a rollup / sort / plist when the 

[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2020-12-14 Thread Joel Bernstein (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17249098#comment-17249098
 ] 

Joel Bernstein commented on SOLR-15036:
---

It looks like you've got nulls in the *a_d* column based on where this stack 
trace is coming from. The SumMetric is not handling this gracefully. 

> Use plist automatically for executing a facet expression against a collection 
> alias backed by multiple collections
> --
>
> Key: SOLR-15036
> URL: https://issues.apache.org/jira/browse/SOLR-15036
> Project: Solr
>  Issue Type: Improvement
>  Security Level: Public(Default Security Level. Issues are Public) 
>  Components: streaming expressions
>Reporter: Timothy Potter
>Assignee: Timothy Potter
>Priority: Major
> Attachments: relay-approach.patch
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> For analytics use cases, streaming expressions make it possible to compute 
> basic aggregations (count, min, max, sum, and avg) over massive data sets. 
> Moreover, with massive data sets, it is common to use collection aliases over 
> many underlying collections, for instance time-partitioned aliases backed by 
> a set of collections, each covering a specific time range. In some cases, we 
> can end up with many collections (think 50-60) each with 100's of shards. 
> Aliases help insulate client applications from complex collection topologies 
> on the server side.
> Let's take a basic facet expression that computes some useful aggregation 
> metrics:
> {code:java}
> facet(
>   some_alias, 
>   q="*:*", 
>   fl="a_i", 
>   sort="a_i asc", 
>   buckets="a_i", 
>   bucketSorts="count(*) asc", 
>   bucketSizeLimit=1, 
>   sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
> )
> {code}
> Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr 
> which then expands the alias to a list of collections. For each collection, 
> the top-level distributed query controller gathers a candidate set of 
> replicas to query and then scatters {{distrib=false}} queries to each replica 
> in the list. For instance, if we have 60 collections with 200 shards each, 
> then this results in 12,000 shard requests from the query controller node to 
> the other nodes in the cluster. The requests are sent in an async manner (see 
> {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases 
> where we hit 18,000 replicas and these queries don’t always come back in a 
> timely manner. Put simply, this also puts a lot of load on the top-level 
> query controller node in terms of open connections and new object creation.
> Instead, we can use {{plist}} to send the JSON facet query to each collection 
> in the alias in parallel, which reduces the overhead of each top-level 
> distributed query from 12,000 to 200 in my example above. With this approach, 
> you’ll then need to sort the tuples back from each collection and do a 
> rollup, something like:
> {code:java}
> select(
>   rollup(
> sort(
>   plist(
> select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt),
> select(facet(coll2,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt)
>   ),
>   by="a_i asc"
> ),
> over="a_i",
> sum(the_sum), avg(the_avg), min(the_min), max(the_max), sum(cnt)
>   ),
>   a_i, sum(the_sum) as the_sum, avg(the_avg) as the_avg, min(the_min) as 
> the_min, max(the_max) as the_max, sum(cnt) as cnt
> )
> {code}
> One thing to point out is that you can’t just avg. the averages back from 
> each collection in the rollup. It needs to be a *weighted avg.* when rolling 
> up the avg. from each facet expression in the plist. However, we have the 
> count per collection, so this is doable but will require some changes to the 
> rollup expression to support weighted average.
> While this plist approach is doable, it’s a pain for users to have to create 
> the rollup / sort over plist expression for collection aliases. After all, 
> aliases are supposed to hide these types of complexities from client 
> applications!
> The point of this ticket is to investigate the feasibility of auto-wrapping 
> the facet expression with a rollup / sort / plist when the collection 
> argument is an alias with multiple collections; other stream sources will be 
> 

[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2020-12-14 Thread Timothy Potter (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17249078#comment-17249078
 ] 

Timothy Potter commented on SOLR-15036:
---

[~jbernste] does this {{drill}} expr look correct? I'm not seeing the results 
from each shard get rolled up into final results? Do I have to do that part 
myself? If so, that's kind of weird

{code}
drill(
  SOME_ALIAS_WITH_MANY_COLLS,
  q="*:*",
  fl="a_i",
  sort="a_i asc",
  rollup(
input(),
over="a_i",
sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
  )
)
{code}

Also, I was getting a number of NullPointerExceptions in the metrics 
computation in the ExportWriter, such as:
{code}
java.util.concurrent.ExecutionException: java.io.IOException: --> 
http://127.0.0.1:62300/solr/coll1_shard2_replica_n2/:
java.lang.NullPointerException
at 
org.apache.solr.client.solrj.io.stream.metrics.SumMetric.update(SumMetric.java:75)
at 
org.apache.solr.client.solrj.io.stream.RollupStream.read(RollupStream.java:252)
at 
org.apache.solr.handler.export.ExportWriter.lambda$writeDocs$7(ExportWriter.java:378)
at 
org.apache.solr.handler.export.ExportBuffers.run(ExportBuffers.java:217)
at 
org.apache.solr.handler.export.ExportWriter.writeDocs(ExportWriter.java:371)
at 
org.apache.solr.handler.export.ExportWriter.lambda$write$1(ExportWriter.java:288)
{code}




> Use plist automatically for executing a facet expression against a collection 
> alias backed by multiple collections
> --
>
> Key: SOLR-15036
> URL: https://issues.apache.org/jira/browse/SOLR-15036
> Project: Solr
>  Issue Type: Improvement
>  Security Level: Public(Default Security Level. Issues are Public) 
>  Components: streaming expressions
>Reporter: Timothy Potter
>Assignee: Timothy Potter
>Priority: Major
> Attachments: relay-approach.patch
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> For analytics use cases, streaming expressions make it possible to compute 
> basic aggregations (count, min, max, sum, and avg) over massive data sets. 
> Moreover, with massive data sets, it is common to use collection aliases over 
> many underlying collections, for instance time-partitioned aliases backed by 
> a set of collections, each covering a specific time range. In some cases, we 
> can end up with many collections (think 50-60) each with 100's of shards. 
> Aliases help insulate client applications from complex collection topologies 
> on the server side.
> Let's take a basic facet expression that computes some useful aggregation 
> metrics:
> {code:java}
> facet(
>   some_alias, 
>   q="*:*", 
>   fl="a_i", 
>   sort="a_i asc", 
>   buckets="a_i", 
>   bucketSorts="count(*) asc", 
>   bucketSizeLimit=1, 
>   sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
> )
> {code}
> Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr 
> which then expands the alias to a list of collections. For each collection, 
> the top-level distributed query controller gathers a candidate set of 
> replicas to query and then scatters {{distrib=false}} queries to each replica 
> in the list. For instance, if we have 60 collections with 200 shards each, 
> then this results in 12,000 shard requests from the query controller node to 
> the other nodes in the cluster. The requests are sent in an async manner (see 
> {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases 
> where we hit 18,000 replicas and these queries don’t always come back in a 
> timely manner. Put simply, this also puts a lot of load on the top-level 
> query controller node in terms of open connections and new object creation.
> Instead, we can use {{plist}} to send the JSON facet query to each collection 
> in the alias in parallel, which reduces the overhead of each top-level 
> distributed query from 12,000 to 200 in my example above. With this approach, 
> you’ll then need to sort the tuples back from each collection and do a 
> rollup, something like:
> {code:java}
> select(
>   rollup(
> sort(
>   plist(
> select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt),
> select(facet(coll2,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt)
>   ),
>   by="a_i asc"
> ),
> 

[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2020-12-09 Thread Joel Bernstein (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17246672#comment-17246672
 ] 

Joel Bernstein commented on SOLR-15036:
---

[~thelabdude] the drill source is implemented inside of the ExportWriter.

> Use plist automatically for executing a facet expression against a collection 
> alias backed by multiple collections
> --
>
> Key: SOLR-15036
> URL: https://issues.apache.org/jira/browse/SOLR-15036
> Project: Solr
>  Issue Type: Improvement
>  Security Level: Public(Default Security Level. Issues are Public) 
>  Components: streaming expressions
>Reporter: Timothy Potter
>Assignee: Timothy Potter
>Priority: Major
> Attachments: relay-approach.patch
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> For analytics use cases, streaming expressions make it possible to compute 
> basic aggregations (count, min, max, sum, and avg) over massive data sets. 
> Moreover, with massive data sets, it is common to use collection aliases over 
> many underlying collections, for instance time-partitioned aliases backed by 
> a set of collections, each covering a specific time range. In some cases, we 
> can end up with many collections (think 50-60) each with 100's of shards. 
> Aliases help insulate client applications from complex collection topologies 
> on the server side.
> Let's take a basic facet expression that computes some useful aggregation 
> metrics:
> {code:java}
> facet(
>   some_alias, 
>   q="*:*", 
>   fl="a_i", 
>   sort="a_i asc", 
>   buckets="a_i", 
>   bucketSorts="count(*) asc", 
>   bucketSizeLimit=1, 
>   sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
> )
> {code}
> Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr 
> which then expands the alias to a list of collections. For each collection, 
> the top-level distributed query controller gathers a candidate set of 
> replicas to query and then scatters {{distrib=false}} queries to each replica 
> in the list. For instance, if we have 60 collections with 200 shards each, 
> then this results in 12,000 shard requests from the query controller node to 
> the other nodes in the cluster. The requests are sent in an async manner (see 
> {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases 
> where we hit 18,000 replicas and these queries don’t always come back in a 
> timely manner. Put simply, this also puts a lot of load on the top-level 
> query controller node in terms of open connections and new object creation.
> Instead, we can use {{plist}} to send the JSON facet query to each collection 
> in the alias in parallel, which reduces the overhead of each top-level 
> distributed query from 12,000 to 200 in my example above. With this approach, 
> you’ll then need to sort the tuples back from each collection and do a 
> rollup, something like:
> {code:java}
> select(
>   rollup(
> sort(
>   plist(
> select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt),
> select(facet(coll2,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt)
>   ),
>   by="a_i asc"
> ),
> over="a_i",
> sum(the_sum), avg(the_avg), min(the_min), max(the_max), sum(cnt)
>   ),
>   a_i, sum(the_sum) as the_sum, avg(the_avg) as the_avg, min(the_min) as 
> the_min, max(the_max) as the_max, sum(cnt) as cnt
> )
> {code}
> One thing to point out is that you can’t just avg. the averages back from 
> each collection in the rollup. It needs to be a *weighted avg.* when rolling 
> up the avg. from each facet expression in the plist. However, we have the 
> count per collection, so this is doable but will require some changes to the 
> rollup expression to support weighted average.
> While this plist approach is doable, it’s a pain for users to have to create 
> the rollup / sort over plist expression for collection aliases. After all, 
> aliases are supposed to hide these types of complexities from client 
> applications!
> The point of this ticket is to investigate the feasibility of auto-wrapping 
> the facet expression with a rollup / sort / plist when the collection 
> argument is an alias with multiple collections; other stream sources will be 
> considered after facet is proven out.
> Lastly, I also considered an alternative 

[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2020-12-09 Thread Michael Gibney (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17246630#comment-17246630
 ] 

Michael Gibney commented on SOLR-15036:
---

{quote}I wouldn't get too hung up on my example only having a single dimension, 
as it's just an example.
{quote}
Indeed! I was mainly mentioning this in hopes of clarifying use cases that 
would and would not benefit from {{drill}} vs "JSON facet".

> Use plist automatically for executing a facet expression against a collection 
> alias backed by multiple collections
> --
>
> Key: SOLR-15036
> URL: https://issues.apache.org/jira/browse/SOLR-15036
> Project: Solr
>  Issue Type: Improvement
>  Security Level: Public(Default Security Level. Issues are Public) 
>  Components: streaming expressions
>Reporter: Timothy Potter
>Assignee: Timothy Potter
>Priority: Major
> Attachments: relay-approach.patch
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> For analytics use cases, streaming expressions make it possible to compute 
> basic aggregations (count, min, max, sum, and avg) over massive data sets. 
> Moreover, with massive data sets, it is common to use collection aliases over 
> many underlying collections, for instance time-partitioned aliases backed by 
> a set of collections, each covering a specific time range. In some cases, we 
> can end up with many collections (think 50-60) each with 100's of shards. 
> Aliases help insulate client applications from complex collection topologies 
> on the server side.
> Let's take a basic facet expression that computes some useful aggregation 
> metrics:
> {code:java}
> facet(
>   some_alias, 
>   q="*:*", 
>   fl="a_i", 
>   sort="a_i asc", 
>   buckets="a_i", 
>   bucketSorts="count(*) asc", 
>   bucketSizeLimit=1, 
>   sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
> )
> {code}
> Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr 
> which then expands the alias to a list of collections. For each collection, 
> the top-level distributed query controller gathers a candidate set of 
> replicas to query and then scatters {{distrib=false}} queries to each replica 
> in the list. For instance, if we have 60 collections with 200 shards each, 
> then this results in 12,000 shard requests from the query controller node to 
> the other nodes in the cluster. The requests are sent in an async manner (see 
> {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases 
> where we hit 18,000 replicas and these queries don’t always come back in a 
> timely manner. Put simply, this also puts a lot of load on the top-level 
> query controller node in terms of open connections and new object creation.
> Instead, we can use {{plist}} to send the JSON facet query to each collection 
> in the alias in parallel, which reduces the overhead of each top-level 
> distributed query from 12,000 to 200 in my example above. With this approach, 
> you’ll then need to sort the tuples back from each collection and do a 
> rollup, something like:
> {code:java}
> select(
>   rollup(
> sort(
>   plist(
> select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt),
> select(facet(coll2,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt)
>   ),
>   by="a_i asc"
> ),
> over="a_i",
> sum(the_sum), avg(the_avg), min(the_min), max(the_max), sum(cnt)
>   ),
>   a_i, sum(the_sum) as the_sum, avg(the_avg) as the_avg, min(the_min) as 
> the_min, max(the_max) as the_max, sum(cnt) as cnt
> )
> {code}
> One thing to point out is that you can’t just avg. the averages back from 
> each collection in the rollup. It needs to be a *weighted avg.* when rolling 
> up the avg. from each facet expression in the plist. However, we have the 
> count per collection, so this is doable but will require some changes to the 
> rollup expression to support weighted average.
> While this plist approach is doable, it’s a pain for users to have to create 
> the rollup / sort over plist expression for collection aliases. After all, 
> aliases are supposed to hide these types of complexities from client 
> applications!
> The point of this ticket is to investigate the feasibility of auto-wrapping 
> the facet expression with a rollup / sort / plist when 

[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2020-12-09 Thread Timothy Potter (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17246624#comment-17246624
 ] 

Timothy Potter commented on SOLR-15036:
---

I plan to spend some time today learning the inner-workings of the {{drill}} 
source. Also, I wouldn't get too hung up on my example only having a single 
dimension, as it's just an example. When I test this in a large cluster, I'll 
test with several dimensions as well, some with high cardinality.

> Use plist automatically for executing a facet expression against a collection 
> alias backed by multiple collections
> --
>
> Key: SOLR-15036
> URL: https://issues.apache.org/jira/browse/SOLR-15036
> Project: Solr
>  Issue Type: Improvement
>  Security Level: Public(Default Security Level. Issues are Public) 
>  Components: streaming expressions
>Reporter: Timothy Potter
>Assignee: Timothy Potter
>Priority: Major
> Attachments: relay-approach.patch
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> For analytics use cases, streaming expressions make it possible to compute 
> basic aggregations (count, min, max, sum, and avg) over massive data sets. 
> Moreover, with massive data sets, it is common to use collection aliases over 
> many underlying collections, for instance time-partitioned aliases backed by 
> a set of collections, each covering a specific time range. In some cases, we 
> can end up with many collections (think 50-60) each with 100's of shards. 
> Aliases help insulate client applications from complex collection topologies 
> on the server side.
> Let's take a basic facet expression that computes some useful aggregation 
> metrics:
> {code:java}
> facet(
>   some_alias, 
>   q="*:*", 
>   fl="a_i", 
>   sort="a_i asc", 
>   buckets="a_i", 
>   bucketSorts="count(*) asc", 
>   bucketSizeLimit=1, 
>   sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
> )
> {code}
> Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr 
> which then expands the alias to a list of collections. For each collection, 
> the top-level distributed query controller gathers a candidate set of 
> replicas to query and then scatters {{distrib=false}} queries to each replica 
> in the list. For instance, if we have 60 collections with 200 shards each, 
> then this results in 12,000 shard requests from the query controller node to 
> the other nodes in the cluster. The requests are sent in an async manner (see 
> {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases 
> where we hit 18,000 replicas and these queries don’t always come back in a 
> timely manner. Put simply, this also puts a lot of load on the top-level 
> query controller node in terms of open connections and new object creation.
> Instead, we can use {{plist}} to send the JSON facet query to each collection 
> in the alias in parallel, which reduces the overhead of each top-level 
> distributed query from 12,000 to 200 in my example above. With this approach, 
> you’ll then need to sort the tuples back from each collection and do a 
> rollup, something like:
> {code:java}
> select(
>   rollup(
> sort(
>   plist(
> select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt),
> select(facet(coll2,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt)
>   ),
>   by="a_i asc"
> ),
> over="a_i",
> sum(the_sum), avg(the_avg), min(the_min), max(the_max), sum(cnt)
>   ),
>   a_i, sum(the_sum) as the_sum, avg(the_avg) as the_avg, min(the_min) as 
> the_min, max(the_max) as the_max, sum(cnt) as cnt
> )
> {code}
> One thing to point out is that you can’t just avg. the averages back from 
> each collection in the rollup. It needs to be a *weighted avg.* when rolling 
> up the avg. from each facet expression in the plist. However, we have the 
> count per collection, so this is doable but will require some changes to the 
> rollup expression to support weighted average.
> While this plist approach is doable, it’s a pain for users to have to create 
> the rollup / sort over plist expression for collection aliases. After all, 
> aliases are supposed to hide these types of complexities from client 
> applications!
> The point of this ticket is to investigate the feasibility of auto-wrapping 
> the 

[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2020-12-09 Thread Michael Gibney (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17246612#comment-17246612
 ] 

Michael Gibney commented on SOLR-15036:
---

A, that makes sense! So if you'll permit me to paraphrase/change emphasis: 
high cardinality is wrt unique field values, but it's not so much about high 
cardinality _per se_ as about _multi-dimensional fanout_ in high-cardinality 
_requests_ (as opposed to limited requests that happen to be over high 
cardinality fields -- i.e. with {{limit != -1}}).

I was hung up on the fact that at each dimensional level, aggregation is 
necessarily "in memory", no matter what implementation you use. The key 
distinction is not so much about _processing_ as about _response building_ -- 
with "JSON facet" buffering the entire response in memory before sending, and 
{{drill}} streaming the response as it's calculated.

Is this an accurate characterization?

Accordingly, since the example expression in the description of this issue is 
not multi-dimensional (if I'm reading correctly?), I don't think there'd be any 
benefit to using {{drill}} in this particular case, no matter how 
high-cardinality the field in question.

Taking this a step further (and warranting a separate issue if there's interest 
in pursuing it): I understand the general-purpose utility of {{drill}} being 
built over exportHandler. But for the terms faceting case, the associated 
performance hit of working with BytesRefs is significant. It should be possible 
to have a shard-level streaming expression that uses term-ord based "JSON 
facet" processing under the hood at the shard level, and stream the result to 
the coordinator node. That would be the best of both worlds (term ords _and_ 
streaming), and would likely result in a _huge_ performance gain.

> Use plist automatically for executing a facet expression against a collection 
> alias backed by multiple collections
> --
>
> Key: SOLR-15036
> URL: https://issues.apache.org/jira/browse/SOLR-15036
> Project: Solr
>  Issue Type: Improvement
>  Security Level: Public(Default Security Level. Issues are Public) 
>  Components: streaming expressions
>Reporter: Timothy Potter
>Assignee: Timothy Potter
>Priority: Major
> Attachments: relay-approach.patch
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> For analytics use cases, streaming expressions make it possible to compute 
> basic aggregations (count, min, max, sum, and avg) over massive data sets. 
> Moreover, with massive data sets, it is common to use collection aliases over 
> many underlying collections, for instance time-partitioned aliases backed by 
> a set of collections, each covering a specific time range. In some cases, we 
> can end up with many collections (think 50-60) each with 100's of shards. 
> Aliases help insulate client applications from complex collection topologies 
> on the server side.
> Let's take a basic facet expression that computes some useful aggregation 
> metrics:
> {code:java}
> facet(
>   some_alias, 
>   q="*:*", 
>   fl="a_i", 
>   sort="a_i asc", 
>   buckets="a_i", 
>   bucketSorts="count(*) asc", 
>   bucketSizeLimit=1, 
>   sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
> )
> {code}
> Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr 
> which then expands the alias to a list of collections. For each collection, 
> the top-level distributed query controller gathers a candidate set of 
> replicas to query and then scatters {{distrib=false}} queries to each replica 
> in the list. For instance, if we have 60 collections with 200 shards each, 
> then this results in 12,000 shard requests from the query controller node to 
> the other nodes in the cluster. The requests are sent in an async manner (see 
> {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases 
> where we hit 18,000 replicas and these queries don’t always come back in a 
> timely manner. Put simply, this also puts a lot of load on the top-level 
> query controller node in terms of open connections and new object creation.
> Instead, we can use {{plist}} to send the JSON facet query to each collection 
> in the alias in parallel, which reduces the overhead of each top-level 
> distributed query from 12,000 to 200 in my example above. With this approach, 
> you’ll then need to sort the tuples back from each collection and do a 
> rollup, something like:
> {code:java}
> select(
>   rollup(
> sort(
>   plist(
> select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as 

[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2020-12-09 Thread Joel Bernstein (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17246569#comment-17246569
 ] 

Joel Bernstein commented on SOLR-15036:
---

The other important thing about drill is accuracy. With high cardinality, 
multi-dimension use cases you can ensure 100% accuracy with facets by using the 
limit of -1, which means track everything in memory. Otherwise you're relying 
on over-fetching and facet refinements to try and ensure accuracy. 

Drill is always 100% accurate no matter the cardinality.

There are data warehousing use cases, where accuracy matters (billing etc...). 

So in cases where accuracy matters more the speed, and limits of -1 for facets 
don't work, then drill can solve the problem.



> Use plist automatically for executing a facet expression against a collection 
> alias backed by multiple collections
> --
>
> Key: SOLR-15036
> URL: https://issues.apache.org/jira/browse/SOLR-15036
> Project: Solr
>  Issue Type: Improvement
>  Security Level: Public(Default Security Level. Issues are Public) 
>  Components: streaming expressions
>Reporter: Timothy Potter
>Assignee: Timothy Potter
>Priority: Major
> Attachments: relay-approach.patch
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> For analytics use cases, streaming expressions make it possible to compute 
> basic aggregations (count, min, max, sum, and avg) over massive data sets. 
> Moreover, with massive data sets, it is common to use collection aliases over 
> many underlying collections, for instance time-partitioned aliases backed by 
> a set of collections, each covering a specific time range. In some cases, we 
> can end up with many collections (think 50-60) each with 100's of shards. 
> Aliases help insulate client applications from complex collection topologies 
> on the server side.
> Let's take a basic facet expression that computes some useful aggregation 
> metrics:
> {code:java}
> facet(
>   some_alias, 
>   q="*:*", 
>   fl="a_i", 
>   sort="a_i asc", 
>   buckets="a_i", 
>   bucketSorts="count(*) asc", 
>   bucketSizeLimit=1, 
>   sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
> )
> {code}
> Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr 
> which then expands the alias to a list of collections. For each collection, 
> the top-level distributed query controller gathers a candidate set of 
> replicas to query and then scatters {{distrib=false}} queries to each replica 
> in the list. For instance, if we have 60 collections with 200 shards each, 
> then this results in 12,000 shard requests from the query controller node to 
> the other nodes in the cluster. The requests are sent in an async manner (see 
> {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases 
> where we hit 18,000 replicas and these queries don’t always come back in a 
> timely manner. Put simply, this also puts a lot of load on the top-level 
> query controller node in terms of open connections and new object creation.
> Instead, we can use {{plist}} to send the JSON facet query to each collection 
> in the alias in parallel, which reduces the overhead of each top-level 
> distributed query from 12,000 to 200 in my example above. With this approach, 
> you’ll then need to sort the tuples back from each collection and do a 
> rollup, something like:
> {code:java}
> select(
>   rollup(
> sort(
>   plist(
> select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt),
> select(facet(coll2,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt)
>   ),
>   by="a_i asc"
> ),
> over="a_i",
> sum(the_sum), avg(the_avg), min(the_min), max(the_max), sum(cnt)
>   ),
>   a_i, sum(the_sum) as the_sum, avg(the_avg) as the_avg, min(the_min) as 
> the_min, max(the_max) as the_max, sum(cnt) as cnt
> )
> {code}
> One thing to point out is that you can’t just avg. the averages back from 
> each collection in the rollup. It needs to be a *weighted avg.* when rolling 
> up the avg. from each facet expression in the plist. However, we have the 
> count per collection, so this is doable but will require some changes to the 
> rollup expression to support weighted average.
> While this plist approach is doable, it’s a pain for users to 

[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2020-12-09 Thread Joel Bernstein (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17246561#comment-17246561
 ] 

Joel Bernstein commented on SOLR-15036:
---

I haven't put an exact number on what high cardinality means, but it can ugly 
pretty fast with multiple dimensions. Let's say you have 3 dimensions each with 
100,000 unique values. The number of possible combinations that needed to be 
tracked is so high it cannot be done in memory.

Drill solves this problem by first sorting on the dimensions and rolling up one 
combination at a time. So it will never run out of memory no-matter how many 
combinations there are. 

Drill is fundamentally a high cardinality, multi-dimension specific tool. It 
just so happens the it's a very important use case because data warehousing 
application often have these high cardinality, multi-dimension rollups to 
perform.



> Use plist automatically for executing a facet expression against a collection 
> alias backed by multiple collections
> --
>
> Key: SOLR-15036
> URL: https://issues.apache.org/jira/browse/SOLR-15036
> Project: Solr
>  Issue Type: Improvement
>  Security Level: Public(Default Security Level. Issues are Public) 
>  Components: streaming expressions
>Reporter: Timothy Potter
>Assignee: Timothy Potter
>Priority: Major
> Attachments: relay-approach.patch
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> For analytics use cases, streaming expressions make it possible to compute 
> basic aggregations (count, min, max, sum, and avg) over massive data sets. 
> Moreover, with massive data sets, it is common to use collection aliases over 
> many underlying collections, for instance time-partitioned aliases backed by 
> a set of collections, each covering a specific time range. In some cases, we 
> can end up with many collections (think 50-60) each with 100's of shards. 
> Aliases help insulate client applications from complex collection topologies 
> on the server side.
> Let's take a basic facet expression that computes some useful aggregation 
> metrics:
> {code:java}
> facet(
>   some_alias, 
>   q="*:*", 
>   fl="a_i", 
>   sort="a_i asc", 
>   buckets="a_i", 
>   bucketSorts="count(*) asc", 
>   bucketSizeLimit=1, 
>   sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
> )
> {code}
> Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr 
> which then expands the alias to a list of collections. For each collection, 
> the top-level distributed query controller gathers a candidate set of 
> replicas to query and then scatters {{distrib=false}} queries to each replica 
> in the list. For instance, if we have 60 collections with 200 shards each, 
> then this results in 12,000 shard requests from the query controller node to 
> the other nodes in the cluster. The requests are sent in an async manner (see 
> {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases 
> where we hit 18,000 replicas and these queries don’t always come back in a 
> timely manner. Put simply, this also puts a lot of load on the top-level 
> query controller node in terms of open connections and new object creation.
> Instead, we can use {{plist}} to send the JSON facet query to each collection 
> in the alias in parallel, which reduces the overhead of each top-level 
> distributed query from 12,000 to 200 in my example above. With this approach, 
> you’ll then need to sort the tuples back from each collection and do a 
> rollup, something like:
> {code:java}
> select(
>   rollup(
> sort(
>   plist(
> select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt),
> select(facet(coll2,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt)
>   ),
>   by="a_i asc"
> ),
> over="a_i",
> sum(the_sum), avg(the_avg), min(the_min), max(the_max), sum(cnt)
>   ),
>   a_i, sum(the_sum) as the_sum, avg(the_avg) as the_avg, min(the_min) as 
> the_min, max(the_max) as the_max, sum(cnt) as cnt
> )
> {code}
> One thing to point out is that you can’t just avg. the averages back from 
> each collection in the rollup. It needs to be a *weighted avg.* when rolling 
> up the avg. from each facet expression in the plist. However, we have the 
> count per collection, so this is doable but will 

[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2020-12-08 Thread Michael Gibney (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17246228#comment-17246228
 ] 

Michael Gibney commented on SOLR-15036:
---

Thanks for the clarification, [~jbernste]. Would you be able to give a rough 
sense of how high you consider to be high cardinality, and whether you're 
talking about high cardinality _domain_ (DocSet size) or _field_ (number of 
unique values)?

Apologies (and I hope/trust this isn't off-topic for this issue), but "faceting 
will eventually run into performance and memory problems ... because it's an 
in-memory aggregation" -- in a sense all aggregation is an in-memory 
aggregation, it's just a question of how aggressively the accumulation data 
structure is pruned (unless {{drill}} is writing to disk?). I'm honestly having 
a hard time wrapping my head around cases in which {{drill}} would perform 
better than "JSON facet", esp. considering the fundamental distinction that an 
exportWriter-based impl would work with BytesRefs (right?), whereas "JSON 
facet" generally works against term ords (at the shard level). Hence my 
questions about "how high is high" wrt cardinality, etc. ... hoping that will 
help me better understand the performance characteristics you're describing.

> Use plist automatically for executing a facet expression against a collection 
> alias backed by multiple collections
> --
>
> Key: SOLR-15036
> URL: https://issues.apache.org/jira/browse/SOLR-15036
> Project: Solr
>  Issue Type: Improvement
>  Security Level: Public(Default Security Level. Issues are Public) 
>  Components: streaming expressions
>Reporter: Timothy Potter
>Assignee: Timothy Potter
>Priority: Major
> Attachments: relay-approach.patch
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> For analytics use cases, streaming expressions make it possible to compute 
> basic aggregations (count, min, max, sum, and avg) over massive data sets. 
> Moreover, with massive data sets, it is common to use collection aliases over 
> many underlying collections, for instance time-partitioned aliases backed by 
> a set of collections, each covering a specific time range. In some cases, we 
> can end up with many collections (think 50-60) each with 100's of shards. 
> Aliases help insulate client applications from complex collection topologies 
> on the server side.
> Let's take a basic facet expression that computes some useful aggregation 
> metrics:
> {code:java}
> facet(
>   some_alias, 
>   q="*:*", 
>   fl="a_i", 
>   sort="a_i asc", 
>   buckets="a_i", 
>   bucketSorts="count(*) asc", 
>   bucketSizeLimit=1, 
>   sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
> )
> {code}
> Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr 
> which then expands the alias to a list of collections. For each collection, 
> the top-level distributed query controller gathers a candidate set of 
> replicas to query and then scatters {{distrib=false}} queries to each replica 
> in the list. For instance, if we have 60 collections with 200 shards each, 
> then this results in 12,000 shard requests from the query controller node to 
> the other nodes in the cluster. The requests are sent in an async manner (see 
> {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases 
> where we hit 18,000 replicas and these queries don’t always come back in a 
> timely manner. Put simply, this also puts a lot of load on the top-level 
> query controller node in terms of open connections and new object creation.
> Instead, we can use {{plist}} to send the JSON facet query to each collection 
> in the alias in parallel, which reduces the overhead of each top-level 
> distributed query from 12,000 to 200 in my example above. With this approach, 
> you’ll then need to sort the tuples back from each collection and do a 
> rollup, something like:
> {code:java}
> select(
>   rollup(
> sort(
>   plist(
> select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt),
> select(facet(coll2,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt)
>   ),
>   by="a_i asc"
> ),
> over="a_i",
> sum(the_sum), avg(the_avg), min(the_min), max(the_max), sum(cnt)
>   ),
>   a_i, sum(the_sum) as the_sum, 

[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2020-12-08 Thread Joel Bernstein (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17246198#comment-17246198
 ] 

Joel Bernstein commented on SOLR-15036:
---

In the high cardinality use case, faceting will eventually run into performance 
and memory problems, so I don't really consider it a great high cardinality 
solution. Not because it sends all tuples to aggregator nodes, but because it's 
an in-memory aggregation.

I was comparing Streaming Expressions, prior to drill, when I mentioned sending 
all tuples to the aggregator nodes.

Streaming Expressions, prior to drill, could use the export handler to send all 
sorted tuples to the aggregator node and accomplish high cardinality 
aggregation.

So, drill improves on previous implementations of Streaming Expressions by 
first aggregating inside the export handler.

> Use plist automatically for executing a facet expression against a collection 
> alias backed by multiple collections
> --
>
> Key: SOLR-15036
> URL: https://issues.apache.org/jira/browse/SOLR-15036
> Project: Solr
>  Issue Type: Improvement
>  Security Level: Public(Default Security Level. Issues are Public) 
>  Components: streaming expressions
>Reporter: Timothy Potter
>Assignee: Timothy Potter
>Priority: Major
> Attachments: relay-approach.patch
>
>
> For analytics use cases, streaming expressions make it possible to compute 
> basic aggregations (count, min, max, sum, and avg) over massive data sets. 
> Moreover, with massive data sets, it is common to use collection aliases over 
> many underlying collections, for instance time-partitioned aliases backed by 
> a set of collections, each covering a specific time range. In some cases, we 
> can end up with many collections (think 50-60) each with 100's of shards. 
> Aliases help insulate client applications from complex collection topologies 
> on the server side.
> Let's take a basic facet expression that computes some useful aggregation 
> metrics:
> {code:java}
> facet(
>   some_alias, 
>   q="*:*", 
>   fl="a_i", 
>   sort="a_i asc", 
>   buckets="a_i", 
>   bucketSorts="count(*) asc", 
>   bucketSizeLimit=1, 
>   sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
> )
> {code}
> Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr 
> which then expands the alias to a list of collections. For each collection, 
> the top-level distributed query controller gathers a candidate set of 
> replicas to query and then scatters {{distrib=false}} queries to each replica 
> in the list. For instance, if we have 60 collections with 200 shards each, 
> then this results in 12,000 shard requests from the query controller node to 
> the other nodes in the cluster. The requests are sent in an async manner (see 
> {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases 
> where we hit 18,000 replicas and these queries don’t always come back in a 
> timely manner. Put simply, this also puts a lot of load on the top-level 
> query controller node in terms of open connections and new object creation.
> Instead, we can use {{plist}} to send the JSON facet query to each collection 
> in the alias in parallel, which reduces the overhead of each top-level 
> distributed query from 12,000 to 200 in my example above. With this approach, 
> you’ll then need to sort the tuples back from each collection and do a 
> rollup, something like:
> {code:java}
> select(
>   rollup(
> sort(
>   plist(
> select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt),
> select(facet(coll2,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt)
>   ),
>   by="a_i asc"
> ),
> over="a_i",
> sum(the_sum), avg(the_avg), min(the_min), max(the_max), sum(cnt)
>   ),
>   a_i, sum(the_sum) as the_sum, avg(the_avg) as the_avg, min(the_min) as 
> the_min, max(the_max) as the_max, sum(cnt) as cnt
> )
> {code}
> One thing to point out is that you can’t just avg. the averages back from 
> each collection in the rollup. It needs to be a *weighted avg.* when rolling 
> up the avg. from each facet expression in the plist. However, we have the 
> count per collection, so this is doable but will require some changes to the 
> rollup expression to support weighted average.
> While this 

[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2020-12-08 Thread Michael Gibney (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17246172#comment-17246172
 ] 

Michael Gibney commented on SOLR-15036:
---

[~thelabdude], you mention "JSON facet implementation", which I gather 
(according to the refguide) is under the hood of the [facet streaming 
expression|https://lucene.apache.org/solr/guide/8_7/stream-source-reference.html#facet].
 [~jbernste], you imply that "facet" sends "all tuples to the aggregator node". 
I'm confused here, because that implication contradicts my understanding of 
what the "JSON facet" implementation does (i.e., shard-level aggregation first, 
merging on coordinator node, optional shard-level refinement). Perhaps I'm 
missing something about the specific way in which the {{facet}} streaming 
expression wraps "JSON facet" functionality?

Also, when you say "high cardinality use case", roughly how high is "high", and 
are you referring to high cardinality wrt DocSet domain size, or number of 
unique values in a field?

> Use plist automatically for executing a facet expression against a collection 
> alias backed by multiple collections
> --
>
> Key: SOLR-15036
> URL: https://issues.apache.org/jira/browse/SOLR-15036
> Project: Solr
>  Issue Type: Improvement
>  Security Level: Public(Default Security Level. Issues are Public) 
>  Components: streaming expressions
>Reporter: Timothy Potter
>Assignee: Timothy Potter
>Priority: Major
> Attachments: relay-approach.patch
>
>
> For analytics use cases, streaming expressions make it possible to compute 
> basic aggregations (count, min, max, sum, and avg) over massive data sets. 
> Moreover, with massive data sets, it is common to use collection aliases over 
> many underlying collections, for instance time-partitioned aliases backed by 
> a set of collections, each covering a specific time range. In some cases, we 
> can end up with many collections (think 50-60) each with 100's of shards. 
> Aliases help insulate client applications from complex collection topologies 
> on the server side.
> Let's take a basic facet expression that computes some useful aggregation 
> metrics:
> {code:java}
> facet(
>   some_alias, 
>   q="*:*", 
>   fl="a_i", 
>   sort="a_i asc", 
>   buckets="a_i", 
>   bucketSorts="count(*) asc", 
>   bucketSizeLimit=1, 
>   sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
> )
> {code}
> Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr 
> which then expands the alias to a list of collections. For each collection, 
> the top-level distributed query controller gathers a candidate set of 
> replicas to query and then scatters {{distrib=false}} queries to each replica 
> in the list. For instance, if we have 60 collections with 200 shards each, 
> then this results in 12,000 shard requests from the query controller node to 
> the other nodes in the cluster. The requests are sent in an async manner (see 
> {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases 
> where we hit 18,000 replicas and these queries don’t always come back in a 
> timely manner. Put simply, this also puts a lot of load on the top-level 
> query controller node in terms of open connections and new object creation.
> Instead, we can use {{plist}} to send the JSON facet query to each collection 
> in the alias in parallel, which reduces the overhead of each top-level 
> distributed query from 12,000 to 200 in my example above. With this approach, 
> you’ll then need to sort the tuples back from each collection and do a 
> rollup, something like:
> {code:java}
> select(
>   rollup(
> sort(
>   plist(
> select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt),
> select(facet(coll2,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt)
>   ),
>   by="a_i asc"
> ),
> over="a_i",
> sum(the_sum), avg(the_avg), min(the_min), max(the_max), sum(cnt)
>   ),
>   a_i, sum(the_sum) as the_sum, avg(the_avg) as the_avg, min(the_min) as 
> the_min, max(the_max) as the_max, sum(cnt) as cnt
> )
> {code}
> One thing to point out is that you can’t just avg. the averages back from 
> each collection in the rollup. It needs to be a *weighted avg.* when rolling 
> up the avg. from each facet expression 

[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2020-12-08 Thread Joel Bernstein (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17246157#comment-17246157
 ] 

Joel Bernstein commented on SOLR-15036:
---

I can comment on the drill vs facet. Facet will always be faster than drill 
except in the high cardinality use case. Drill really shines in the high 
cardinality use case though. Rather than sending all tuples to the aggregator 
node, drill can first aggregate inside of the export handler and compress the 
result significantly before hitting the network. And drill never runs out of 
memory.

More work is coming that improves the export handler performance by about 300%. 
But even this improvement doesn't allow drill to match the speed of facet on 
low cardinality aggregations.

> Use plist automatically for executing a facet expression against a collection 
> alias backed by multiple collections
> --
>
> Key: SOLR-15036
> URL: https://issues.apache.org/jira/browse/SOLR-15036
> Project: Solr
>  Issue Type: Improvement
>  Security Level: Public(Default Security Level. Issues are Public) 
>  Components: streaming expressions
>Reporter: Timothy Potter
>Assignee: Timothy Potter
>Priority: Major
> Attachments: relay-approach.patch
>
>
> For analytics use cases, streaming expressions make it possible to compute 
> basic aggregations (count, min, max, sum, and avg) over massive data sets. 
> Moreover, with massive data sets, it is common to use collection aliases over 
> many underlying collections, for instance time-partitioned aliases backed by 
> a set of collections, each covering a specific time range. In some cases, we 
> can end up with many collections (think 50-60) each with 100's of shards. 
> Aliases help insulate client applications from complex collection topologies 
> on the server side.
> Let's take a basic facet expression that computes some useful aggregation 
> metrics:
> {code:java}
> facet(
>   some_alias, 
>   q="*:*", 
>   fl="a_i", 
>   sort="a_i asc", 
>   buckets="a_i", 
>   bucketSorts="count(*) asc", 
>   bucketSizeLimit=1, 
>   sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
> )
> {code}
> Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr 
> which then expands the alias to a list of collections. For each collection, 
> the top-level distributed query controller gathers a candidate set of 
> replicas to query and then scatters {{distrib=false}} queries to each replica 
> in the list. For instance, if we have 60 collections with 200 shards each, 
> then this results in 12,000 shard requests from the query controller node to 
> the other nodes in the cluster. The requests are sent in an async manner (see 
> {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases 
> where we hit 18,000 replicas and these queries don’t always come back in a 
> timely manner. Put simply, this also puts a lot of load on the top-level 
> query controller node in terms of open connections and new object creation.
> Instead, we can use {{plist}} to send the JSON facet query to each collection 
> in the alias in parallel, which reduces the overhead of each top-level 
> distributed query from 12,000 to 200 in my example above. With this approach, 
> you’ll then need to sort the tuples back from each collection and do a 
> rollup, something like:
> {code:java}
> select(
>   rollup(
> sort(
>   plist(
> select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt),
> select(facet(coll2,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt)
>   ),
>   by="a_i asc"
> ),
> over="a_i",
> sum(the_sum), avg(the_avg), min(the_min), max(the_max), sum(cnt)
>   ),
>   a_i, sum(the_sum) as the_sum, avg(the_avg) as the_avg, min(the_min) as 
> the_min, max(the_max) as the_max, sum(cnt) as cnt
> )
> {code}
> One thing to point out is that you can’t just avg. the averages back from 
> each collection in the rollup. It needs to be a *weighted avg.* when rolling 
> up the avg. from each facet expression in the plist. However, we have the 
> count per collection, so this is doable but will require some changes to the 
> rollup expression to support weighted average.
> While this plist approach is doable, it’s a pain for users to have to create 
> the rollup / sort 

[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2020-12-08 Thread Timothy Potter (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17246031#comment-17246031
 ] 

Timothy Potter commented on SOLR-15036:
---

[~atri] the patch isn't the solution I'm going for, as I explained in the 
description ...

 

Regarding drill, I'll have to investigate its performance compared to {{plist}} 
and {{facet}}. However, since it's based on {{/export}} seems like it would be 
a lot of I/O out each Solr node instead of just relying on the efficient JSON 
facet implementation? I certainly don't want to {{/export}} 1B rows to count 
them when I can just facet instead. What would a {{drill}} expression look like 
that does the same as my example in the description?

> Use plist automatically for executing a facet expression against a collection 
> alias backed by multiple collections
> --
>
> Key: SOLR-15036
> URL: https://issues.apache.org/jira/browse/SOLR-15036
> Project: Solr
>  Issue Type: Improvement
>  Security Level: Public(Default Security Level. Issues are Public) 
>  Components: streaming expressions
>Reporter: Timothy Potter
>Assignee: Timothy Potter
>Priority: Major
> Attachments: relay-approach.patch
>
>
> For analytics use cases, streaming expressions make it possible to compute 
> basic aggregations (count, min, max, sum, and avg) over massive data sets. 
> Moreover, with massive data sets, it is common to use collection aliases over 
> many underlying collections, for instance time-partitioned aliases backed by 
> a set of collections, each covering a specific time range. In some cases, we 
> can end up with many collections (think 50-60) each with 100's of shards. 
> Aliases help insulate client applications from complex collection topologies 
> on the server side.
> Let's take a basic facet expression that computes some useful aggregation 
> metrics:
> {code:java}
> facet(
>   some_alias, 
>   q="*:*", 
>   fl="a_i", 
>   sort="a_i asc", 
>   buckets="a_i", 
>   bucketSorts="count(*) asc", 
>   bucketSizeLimit=1, 
>   sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
> )
> {code}
> Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr 
> which then expands the alias to a list of collections. For each collection, 
> the top-level distributed query controller gathers a candidate set of 
> replicas to query and then scatters {{distrib=false}} queries to each replica 
> in the list. For instance, if we have 60 collections with 200 shards each, 
> then this results in 12,000 shard requests from the query controller node to 
> the other nodes in the cluster. The requests are sent in an async manner (see 
> {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases 
> where we hit 18,000 replicas and these queries don’t always come back in a 
> timely manner. Put simply, this also puts a lot of load on the top-level 
> query controller node in terms of open connections and new object creation.
> Instead, we can use {{plist}} to send the JSON facet query to each collection 
> in the alias in parallel, which reduces the overhead of each top-level 
> distributed query from 12,000 to 200 in my example above. With this approach, 
> you’ll then need to sort the tuples back from each collection and do a 
> rollup, something like:
> {code:java}
> select(
>   rollup(
> sort(
>   plist(
> select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt),
> select(facet(coll2,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt)
>   ),
>   by="a_i asc"
> ),
> over="a_i",
> sum(the_sum), avg(the_avg), min(the_min), max(the_max), sum(cnt)
>   ),
>   a_i, sum(the_sum) as the_sum, avg(the_avg) as the_avg, min(the_min) as 
> the_min, max(the_max) as the_max, sum(cnt) as cnt
> )
> {code}
> One thing to point out is that you can’t just avg. the averages back from 
> each collection in the rollup. It needs to be a *weighted avg.* when rolling 
> up the avg. from each facet expression in the plist. However, we have the 
> count per collection, so this is doable but will require some changes to the 
> rollup expression to support weighted average.
> While this plist approach is doable, it’s a pain for users to have to create 
> the rollup / sort over plist expression for collection 

[jira] [Commented] (SOLR-15036) Use plist automatically for executing a facet expression against a collection alias backed by multiple collections

2020-12-08 Thread Atri Sharma (Jira)


[ 
https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17246017#comment-17246017
 ] 

Atri Sharma commented on SOLR-15036:


I havent looked at the patch yet – but why not do a drill expression and wrap 
it with the aggregate to be computed? Would that not achieve the objective to 
push down aggregation to shards?

> Use plist automatically for executing a facet expression against a collection 
> alias backed by multiple collections
> --
>
> Key: SOLR-15036
> URL: https://issues.apache.org/jira/browse/SOLR-15036
> Project: Solr
>  Issue Type: Improvement
>  Security Level: Public(Default Security Level. Issues are Public) 
>  Components: streaming expressions
>Reporter: Timothy Potter
>Assignee: Timothy Potter
>Priority: Major
> Attachments: relay-approach.patch
>
>
> For analytics use cases, streaming expressions make it possible to compute 
> basic aggregations (count, min, max, sum, and avg) over massive data sets. 
> Moreover, with massive data sets, it is common to use collection aliases over 
> many underlying collections, for instance time-partitioned aliases backed by 
> a set of collections, each covering a specific time range. In some cases, we 
> can end up with many collections (think 50-60) each with 100's of shards. 
> Aliases help insulate client applications from complex collection topologies 
> on the server side.
> Let's take a basic facet expression that computes some useful aggregation 
> metrics:
> {code:java}
> facet(
>   some_alias, 
>   q="*:*", 
>   fl="a_i", 
>   sort="a_i asc", 
>   buckets="a_i", 
>   bucketSorts="count(*) asc", 
>   bucketSizeLimit=1, 
>   sum(a_d), avg(a_d), min(a_d), max(a_d), count(*)
> )
> {code}
> Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr 
> which then expands the alias to a list of collections. For each collection, 
> the top-level distributed query controller gathers a candidate set of 
> replicas to query and then scatters {{distrib=false}} queries to each replica 
> in the list. For instance, if we have 60 collections with 200 shards each, 
> then this results in 12,000 shard requests from the query controller node to 
> the other nodes in the cluster. The requests are sent in an async manner (see 
> {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases 
> where we hit 18,000 replicas and these queries don’t always come back in a 
> timely manner. Put simply, this also puts a lot of load on the top-level 
> query controller node in terms of open connections and new object creation.
> Instead, we can use {{plist}} to send the JSON facet query to each collection 
> in the alias in parallel, which reduces the overhead of each top-level 
> distributed query from 12,000 to 200 in my example above. With this approach, 
> you’ll then need to sort the tuples back from each collection and do a 
> rollup, something like:
> {code:java}
> select(
>   rollup(
> sort(
>   plist(
> select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt),
> select(facet(coll2,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", 
> bucketSorts="count(*) asc", bucketSizeLimit=1, sum(a_d), avg(a_d), 
> min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, 
> min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt)
>   ),
>   by="a_i asc"
> ),
> over="a_i",
> sum(the_sum), avg(the_avg), min(the_min), max(the_max), sum(cnt)
>   ),
>   a_i, sum(the_sum) as the_sum, avg(the_avg) as the_avg, min(the_min) as 
> the_min, max(the_max) as the_max, sum(cnt) as cnt
> )
> {code}
> One thing to point out is that you can’t just avg. the averages back from 
> each collection in the rollup. It needs to be a *weighted avg.* when rolling 
> up the avg. from each facet expression in the plist. However, we have the 
> count per collection, so this is doable but will require some changes to the 
> rollup expression to support weighted average.
> While this plist approach is doable, it’s a pain for users to have to create 
> the rollup / sort over plist expression for collection aliases. After all, 
> aliases are supposed to hide these types of complexities from client 
> applications!
> The point of this ticket is to investigate the feasibility of auto-wrapping 
> the facet expression with a rollup / sort / plist when the collection 
> argument is an alias with multiple collections; other stream sources will be 
> considered after facet