[jira] [Updated] (FLINK-34926) Adaptive auto parallelism doesn't work for a query
[ https://issues.apache.org/jira/browse/FLINK-34926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xingcan Cui updated FLINK-34926: Attachment: image.png > Adaptive auto parallelism doesn't work for a query > -- > > Key: FLINK-34926 > URL: https://issues.apache.org/jira/browse/FLINK-34926 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.18.1 >Reporter: Xingcan Cui >Priority: Major > Attachments: image.png > > > We have the following query running in batch mode. > {code:java} > WITH FEATURE_INCLUSION AS ( > SELECT > insertion_id, -- Not unique > features -- Array> > FROM > features_table > ), > TOTAL AS ( > SELECT > COUNT(DISTINCT insertion_id) total_id > FROM > FEATURE_INCLUSION > ), > FEATURE_INCLUSION_COUNTS AS ( > SELECT > `key`, > COUNT(DISTINCT insertion_id) AS id_count > FROM > FEATURE_INCLUSION, > UNNEST(features) as t (`key`, `value`) > WHERE > TRUE > GROUP BY > `key` > ), > RESULTS AS ( > SELECT > `key` > FROM > FEATURE_INCLUSION_COUNTS, > TOTAL > WHERE > (1.0 * id_count)/total_id > 0.1 > ) > SELECT > JSON_ARRAYAGG(`key`) AS feature_ids, > FROM > RESULTS{code} > The parallelism adaptively set by Flink for the following operator was always > 1. > {code:java} > [37]:HashAggregate(isMerge=[true], groupBy=[key, insertion_id], select=[key, > insertion_id]) > +- [38]:LocalHashAggregate(groupBy=[key], select=[key, > Partial_COUNT(insertion_id) AS count$0]){code} > If we turn off `execution.batch.adaptive.auto-parallelism.enabled` and > manually set `parallelism.default` to be greater than one, it worked. > The screenshot of the full job graph is attached. !image_720.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34926) Adaptive auto parallelism doesn't work for a query
[ https://issues.apache.org/jira/browse/FLINK-34926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xingcan Cui updated FLINK-34926: Description: We have the following query running in batch mode. {code:java} WITH FEATURE_INCLUSION AS ( SELECT insertion_id, -- Not unique features -- Array> FROM features_table ), TOTAL AS ( SELECT COUNT(DISTINCT insertion_id) total_id FROM FEATURE_INCLUSION ), FEATURE_INCLUSION_COUNTS AS ( SELECT `key`, COUNT(DISTINCT insertion_id) AS id_count FROM FEATURE_INCLUSION, UNNEST(features) as t (`key`, `value`) WHERE TRUE GROUP BY `key` ), RESULTS AS ( SELECT `key` FROM FEATURE_INCLUSION_COUNTS, TOTAL WHERE (1.0 * id_count)/total_id > 0.1 ) SELECT JSON_ARRAYAGG(`key`) AS feature_ids, FROM RESULTS{code} The parallelism adaptively set by Flink for the following operator was always 1. {code:java} [37]:HashAggregate(isMerge=[true], groupBy=[key, insertion_id], select=[key, insertion_id]) +- [38]:LocalHashAggregate(groupBy=[key], select=[key, Partial_COUNT(insertion_id) AS count$0]){code} If we turn off `execution.batch.adaptive.auto-parallelism.enabled` and manually set `parallelism.default` to be greater than one, it worked. The screenshot of the full job graph is attached. !image.png! was: We have the following query running in batch mode. {code:java} WITH FEATURE_INCLUSION AS ( SELECT insertion_id, -- Not unique features -- Array> FROM features_table ), TOTAL AS ( SELECT COUNT(DISTINCT insertion_id) total_id FROM FEATURE_INCLUSION ), FEATURE_INCLUSION_COUNTS AS ( SELECT `key`, COUNT(DISTINCT insertion_id) AS id_count FROM FEATURE_INCLUSION, UNNEST(features) as t (`key`, `value`) WHERE TRUE GROUP BY `key` ), RESULTS AS ( SELECT `key` FROM FEATURE_INCLUSION_COUNTS, TOTAL WHERE (1.0 * id_count)/total_id > 0.1 ) SELECT JSON_ARRAYAGG(`key`) AS feature_ids, FROM RESULTS{code} The parallelism adaptively set by Flink for the following operator was always 1. {code:java} [37]:HashAggregate(isMerge=[true], groupBy=[key, insertion_id], select=[key, insertion_id]) +- [38]:LocalHashAggregate(groupBy=[key], select=[key, Partial_COUNT(insertion_id) AS count$0]){code} If we turn off `execution.batch.adaptive.auto-parallelism.enabled` and manually set `parallelism.default` to be greater than one, it worked. The screenshot of the full job graph is attached. !image_720.png! > Adaptive auto parallelism doesn't work for a query > -- > > Key: FLINK-34926 > URL: https://issues.apache.org/jira/browse/FLINK-34926 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.18.1 >Reporter: Xingcan Cui >Priority: Major > Attachments: image.png > > > We have the following query running in batch mode. > {code:java} > WITH FEATURE_INCLUSION AS ( > SELECT > insertion_id, -- Not unique > features -- Array> > FROM > features_table > ), > TOTAL AS ( > SELECT > COUNT(DISTINCT insertion_id) total_id > FROM > FEATURE_INCLUSION > ), > FEATURE_INCLUSION_COUNTS AS ( > SELECT > `key`, > COUNT(DISTINCT insertion_id) AS id_count > FROM > FEATURE_INCLUSION, > UNNEST(features) as t (`key`, `value`) > WHERE > TRUE > GROUP BY > `key` > ), > RESULTS AS ( > SELECT > `key` > FROM > FEATURE_INCLUSION_COUNTS, > TOTAL > WHERE > (1.0 * id_count)/total_id > 0.1 > ) > SELECT > JSON_ARRAYAGG(`key`) AS feature_ids, > FROM > RESULTS{code} > The parallelism adaptively set by Flink for the following operator was always > 1. > {code:java} > [37]:HashAggregate(isMerge=[true], groupBy=[key, insertion_id], select=[key, > insertion_id]) > +- [38]:LocalHashAggregate(groupBy=[key], select=[key, > Partial_COUNT(insertion_id) AS count$0]){code} > If we turn off `execution.batch.adaptive.auto-parallelism.enabled` and > manually set `parallelism.default` to be greater than one, it worked. > The screenshot of the full job graph is attached. !image.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34926) Adaptive auto parallelism doesn't work for a query
[ https://issues.apache.org/jira/browse/FLINK-34926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xingcan Cui updated FLINK-34926: Attachment: (was: image_720.png) > Adaptive auto parallelism doesn't work for a query > -- > > Key: FLINK-34926 > URL: https://issues.apache.org/jira/browse/FLINK-34926 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.18.1 >Reporter: Xingcan Cui >Priority: Major > > We have the following query running in batch mode. > {code:java} > WITH FEATURE_INCLUSION AS ( > SELECT > insertion_id, -- Not unique > features -- Array> > FROM > features_table > ), > TOTAL AS ( > SELECT > COUNT(DISTINCT insertion_id) total_id > FROM > FEATURE_INCLUSION > ), > FEATURE_INCLUSION_COUNTS AS ( > SELECT > `key`, > COUNT(DISTINCT insertion_id) AS id_count > FROM > FEATURE_INCLUSION, > UNNEST(features) as t (`key`, `value`) > WHERE > TRUE > GROUP BY > `key` > ), > RESULTS AS ( > SELECT > `key` > FROM > FEATURE_INCLUSION_COUNTS, > TOTAL > WHERE > (1.0 * id_count)/total_id > 0.1 > ) > SELECT > JSON_ARRAYAGG(`key`) AS feature_ids, > FROM > RESULTS{code} > The parallelism adaptively set by Flink for the following operator was always > 1. > {code:java} > [37]:HashAggregate(isMerge=[true], groupBy=[key, insertion_id], select=[key, > insertion_id]) > +- [38]:LocalHashAggregate(groupBy=[key], select=[key, > Partial_COUNT(insertion_id) AS count$0]){code} > If we turn off `execution.batch.adaptive.auto-parallelism.enabled` and > manually set `parallelism.default` to be greater than one, it worked. > The screenshot of the full job graph is attached. !image_720.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34926) Adaptive auto parallelism doesn't work for a query
[ https://issues.apache.org/jira/browse/FLINK-34926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xingcan Cui updated FLINK-34926: Description: We have the following query running in batch mode. {code:java} WITH FEATURE_INCLUSION AS ( SELECT insertion_id, -- Not unique features -- Array> FROM features_table ), TOTAL AS ( SELECT COUNT(DISTINCT insertion_id) total_id FROM FEATURE_INCLUSION ), FEATURE_INCLUSION_COUNTS AS ( SELECT `key`, COUNT(DISTINCT insertion_id) AS id_count FROM FEATURE_INCLUSION, UNNEST(features) as t (`key`, `value`) WHERE TRUE GROUP BY `key` ), RESULTS AS ( SELECT `key` FROM FEATURE_INCLUSION_COUNTS, TOTAL WHERE (1.0 * id_count)/total_id > 0.1 ) SELECT JSON_ARRAYAGG(`key`) AS feature_ids, FROM RESULTS{code} The parallelism adaptively set by Flink for the following operator was always 1. {code:java} [37]:HashAggregate(isMerge=[true], groupBy=[key, insertion_id], select=[key, insertion_id]) +- [38]:LocalHashAggregate(groupBy=[key], select=[key, Partial_COUNT(insertion_id) AS count$0]){code} If we turn off `execution.batch.adaptive.auto-parallelism.enabled` and manually set `parallelism.default` to be greater than one, it worked. The screenshot of the full job graph is attached. !image_720.png! was: We have the following query running in batch mode. {code:java} WITH FEATURE_INCLUSION AS ( SELECT insertion_id, -- Not unique features -- Array> FROM features_table ), TOTAL AS ( SELECT COUNT(DISTINCT insertion_id) total_id FROM FEATURE_INCLUSION ), FEATURE_INCLUSION_COUNTS AS ( SELECT `key`, COUNT(DISTINCT insertion_id) AS id_count FROM FEATURE_INCLUSION, UNNEST(features) as t (`key`, `value`) WHERE TRUE GROUP BY `key` ), RESULTS AS ( SELECT `key` FROM FEATURE_INCLUSION_COUNTS, TOTAL WHERE (1.0 * id_count)/total_id > 0.1 ) SELECT JSON_ARRAYAGG(`key`) AS feature_ids, FROM RESULTS{code} The parallelism adaptively set by Flink for the following operator was always 1. {code:java} [37]:HashAggregate(isMerge=[true], groupBy=[key, insertion_id], select=[key, insertion_id]) +- [38]:LocalHashAggregate(groupBy=[key], select=[key, Partial_COUNT(insertion_id) AS count$0]){code} If we turn off `execution.batch.adaptive.auto-parallelism.enabled` and manually set `parallelism.default` to be greater than one, it worked. The screenshot of the full job graph is attached. !image_720.png! > Adaptive auto parallelism doesn't work for a query > -- > > Key: FLINK-34926 > URL: https://issues.apache.org/jira/browse/FLINK-34926 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.18.1 >Reporter: Xingcan Cui >Priority: Major > > We have the following query running in batch mode. > {code:java} > WITH FEATURE_INCLUSION AS ( > SELECT > insertion_id, -- Not unique > features -- Array> > FROM > features_table > ), > TOTAL AS ( > SELECT > COUNT(DISTINCT insertion_id) total_id > FROM > FEATURE_INCLUSION > ), > FEATURE_INCLUSION_COUNTS AS ( > SELECT > `key`, > COUNT(DISTINCT insertion_id) AS id_count > FROM > FEATURE_INCLUSION, > UNNEST(features) as t (`key`, `value`) > WHERE > TRUE > GROUP BY > `key` > ), > RESULTS AS ( > SELECT > `key` > FROM > FEATURE_INCLUSION_COUNTS, > TOTAL > WHERE > (1.0 * id_count)/total_id > 0.1 > ) > SELECT > JSON_ARRAYAGG(`key`) AS feature_ids, > FROM > RESULTS{code} > The parallelism adaptively set by Flink for the following operator was always > 1. > {code:java} > [37]:HashAggregate(isMerge=[true], groupBy=[key, insertion_id], select=[key, > insertion_id]) > +- [38]:LocalHashAggregate(groupBy=[key], select=[key, > Partial_COUNT(insertion_id) AS count$0]){code} > If we turn off `execution.batch.adaptive.auto-parallelism.enabled` and > manually set `parallelism.default` to be greater than one, it worked. > The screenshot of the full job graph is attached. !image_720.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)