Hello,
I have a use case where I need to first compute an aggregation for each
key, and then filter out the keys based on some criteria. And finally
feed the matched keys as an input to PCollection using ElasticsearchIO
read. But ElasticsearchIO does not seem to support query that contains
aggregation:
Error message from worker: org.elasticsearch.client.ResponseException:
method [GET], host [https://...], URI [...], status line [HTTP/1.1 400 Bad
Request]
{"error":{"root_cause":[{"type":"parsing_exception","reason":"request does
not support
[aggregations]","line":1,"col":135}],"type":"parsing_exception","reason":"request
does not support [aggregations]","line":1,"col":135},"status":400}
org.elasticsearch.client.RestClient.convertResponse(RestClient.java:331)
org.elasticsearch.client.RestClient.performRequest(RestClient.java:301)
org.elasticsearch.client.RestClient.performRequest(RestClient.java:276)
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BoundedElasticsearchSource.queryCount(ElasticsearchIO.java:780)
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BoundedElasticsearchSource.getEstimatedSizeBytes(ElasticsearchIO.java:762)
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BoundedElasticsearchSource.split(ElasticsearchIO.java:710)
Here is an example of the Elasticsearch query I am trying to do:
{
"aggs": {
"user_id": {
"composite": {
"sources": [
{ "user_id": { "terms": { "field": "user_id" } } }
]
},
"aggs": {
"min": {
"min": {
"field": "play_time"
}
},
"max": {
"max": {
"field": "play_time"
}
},
"diff": {
"bucket_selector": {
"buckets_path": {
"min": "min",
"max": "max"
},
"script": "params.max - params.min > 5000"
}
}
}
}
}
}
Is Elasticsearch query that contains aggregation not supported in
ElasticsearchIO? If not, is there a way to work around this?
Thanks,
Nick