[jira] [Updated] (FLINK-34926) Adaptive auto parallelism doesn't work for a query

2024-03-24 Thread Xingcan Cui (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingcan Cui updated FLINK-34926:

Attachment: image.png

> Adaptive auto parallelism doesn't work for a query
> --
>
> Key: FLINK-34926
> URL: https://issues.apache.org/jira/browse/FLINK-34926
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.18.1
>Reporter: Xingcan Cui
>Priority: Major
> Attachments: image.png
>
>
> We have the following query running in batch mode.
> {code:java}
> WITH FEATURE_INCLUSION AS (
>     SELECT
>         insertion_id, -- Not unique
>         features -- Array>
>     FROM
>         features_table
> ),
> TOTAL AS (
>     SELECT
>         COUNT(DISTINCT insertion_id) total_id
>     FROM
>         FEATURE_INCLUSION
> ),
> FEATURE_INCLUSION_COUNTS AS (
>     SELECT
>         `key`,
>         COUNT(DISTINCT insertion_id) AS id_count
>     FROM
>         FEATURE_INCLUSION,
>         UNNEST(features) as t (`key`, `value`)
>     WHERE
>         TRUE
>     GROUP BY
>         `key`
> ),
> RESULTS AS (
>     SELECT
>         `key`
>     FROM
>         FEATURE_INCLUSION_COUNTS,
>         TOTAL
>     WHERE
>        (1.0 * id_count)/total_id > 0.1
> )
> SELECT
>     JSON_ARRAYAGG(`key`) AS feature_ids,
> FROM
>     RESULTS{code}
> The parallelism adaptively set by Flink for the following operator was always 
> 1.
> {code:java}
> [37]:HashAggregate(isMerge=[true], groupBy=[key, insertion_id], select=[key, 
> insertion_id])
> +- [38]:LocalHashAggregate(groupBy=[key], select=[key, 
> Partial_COUNT(insertion_id) AS count$0]){code}
> If we turn off `execution.batch.adaptive.auto-parallelism.enabled` and 
> manually set `parallelism.default` to be greater than one, it worked.
> The screenshot of the full job graph is attached.  !image_720.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34926) Adaptive auto parallelism doesn't work for a query

2024-03-24 Thread Xingcan Cui (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingcan Cui updated FLINK-34926:

Description: 
We have the following query running in batch mode.
{code:java}
WITH FEATURE_INCLUSION AS (
    SELECT
        insertion_id, -- Not unique
        features -- Array>
    FROM
        features_table
),
TOTAL AS (
    SELECT
        COUNT(DISTINCT insertion_id) total_id
    FROM
        FEATURE_INCLUSION
),
FEATURE_INCLUSION_COUNTS AS (
    SELECT
        `key`,
        COUNT(DISTINCT insertion_id) AS id_count
    FROM
        FEATURE_INCLUSION,
        UNNEST(features) as t (`key`, `value`)
    WHERE
        TRUE
    GROUP BY
        `key`
),
RESULTS AS (
    SELECT
        `key`
    FROM
        FEATURE_INCLUSION_COUNTS,
        TOTAL
    WHERE
       (1.0 * id_count)/total_id > 0.1
)
SELECT
    JSON_ARRAYAGG(`key`) AS feature_ids,
FROM
    RESULTS{code}
The parallelism adaptively set by Flink for the following operator was always 1.
{code:java}
[37]:HashAggregate(isMerge=[true], groupBy=[key, insertion_id], select=[key, 
insertion_id])
+- [38]:LocalHashAggregate(groupBy=[key], select=[key, 
Partial_COUNT(insertion_id) AS count$0]){code}
If we turn off `execution.batch.adaptive.auto-parallelism.enabled` and manually 
set `parallelism.default` to be greater than one, it worked.

The screenshot of the full job graph is attached.  !image.png!

  was:
We have the following query running in batch mode.
{code:java}
WITH FEATURE_INCLUSION AS (
    SELECT
        insertion_id, -- Not unique
        features -- Array>
    FROM
        features_table
),
TOTAL AS (
    SELECT
        COUNT(DISTINCT insertion_id) total_id
    FROM
        FEATURE_INCLUSION
),
FEATURE_INCLUSION_COUNTS AS (
    SELECT
        `key`,
        COUNT(DISTINCT insertion_id) AS id_count
    FROM
        FEATURE_INCLUSION,
        UNNEST(features) as t (`key`, `value`)
    WHERE
        TRUE
    GROUP BY
        `key`
),
RESULTS AS (
    SELECT
        `key`
    FROM
        FEATURE_INCLUSION_COUNTS,
        TOTAL
    WHERE
       (1.0 * id_count)/total_id > 0.1
)
SELECT
    JSON_ARRAYAGG(`key`) AS feature_ids,
FROM
    RESULTS{code}
The parallelism adaptively set by Flink for the following operator was always 1.
{code:java}
[37]:HashAggregate(isMerge=[true], groupBy=[key, insertion_id], select=[key, 
insertion_id])
+- [38]:LocalHashAggregate(groupBy=[key], select=[key, 
Partial_COUNT(insertion_id) AS count$0]){code}
If we turn off `execution.batch.adaptive.auto-parallelism.enabled` and manually 
set `parallelism.default` to be greater than one, it worked.

The screenshot of the full job graph is attached.  !image_720.png!


> Adaptive auto parallelism doesn't work for a query
> --
>
> Key: FLINK-34926
> URL: https://issues.apache.org/jira/browse/FLINK-34926
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.18.1
>Reporter: Xingcan Cui
>Priority: Major
> Attachments: image.png
>
>
> We have the following query running in batch mode.
> {code:java}
> WITH FEATURE_INCLUSION AS (
>     SELECT
>         insertion_id, -- Not unique
>         features -- Array>
>     FROM
>         features_table
> ),
> TOTAL AS (
>     SELECT
>         COUNT(DISTINCT insertion_id) total_id
>     FROM
>         FEATURE_INCLUSION
> ),
> FEATURE_INCLUSION_COUNTS AS (
>     SELECT
>         `key`,
>         COUNT(DISTINCT insertion_id) AS id_count
>     FROM
>         FEATURE_INCLUSION,
>         UNNEST(features) as t (`key`, `value`)
>     WHERE
>         TRUE
>     GROUP BY
>         `key`
> ),
> RESULTS AS (
>     SELECT
>         `key`
>     FROM
>         FEATURE_INCLUSION_COUNTS,
>         TOTAL
>     WHERE
>        (1.0 * id_count)/total_id > 0.1
> )
> SELECT
>     JSON_ARRAYAGG(`key`) AS feature_ids,
> FROM
>     RESULTS{code}
> The parallelism adaptively set by Flink for the following operator was always 
> 1.
> {code:java}
> [37]:HashAggregate(isMerge=[true], groupBy=[key, insertion_id], select=[key, 
> insertion_id])
> +- [38]:LocalHashAggregate(groupBy=[key], select=[key, 
> Partial_COUNT(insertion_id) AS count$0]){code}
> If we turn off `execution.batch.adaptive.auto-parallelism.enabled` and 
> manually set `parallelism.default` to be greater than one, it worked.
> The screenshot of the full job graph is attached.  !image.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34926) Adaptive auto parallelism doesn't work for a query

2024-03-24 Thread Xingcan Cui (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingcan Cui updated FLINK-34926:

Attachment: (was: image_720.png)

> Adaptive auto parallelism doesn't work for a query
> --
>
> Key: FLINK-34926
> URL: https://issues.apache.org/jira/browse/FLINK-34926
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.18.1
>Reporter: Xingcan Cui
>Priority: Major
>
> We have the following query running in batch mode.
> {code:java}
> WITH FEATURE_INCLUSION AS (
>     SELECT
>         insertion_id, -- Not unique
>         features -- Array>
>     FROM
>         features_table
> ),
> TOTAL AS (
>     SELECT
>         COUNT(DISTINCT insertion_id) total_id
>     FROM
>         FEATURE_INCLUSION
> ),
> FEATURE_INCLUSION_COUNTS AS (
>     SELECT
>         `key`,
>         COUNT(DISTINCT insertion_id) AS id_count
>     FROM
>         FEATURE_INCLUSION,
>         UNNEST(features) as t (`key`, `value`)
>     WHERE
>         TRUE
>     GROUP BY
>         `key`
> ),
> RESULTS AS (
>     SELECT
>         `key`
>     FROM
>         FEATURE_INCLUSION_COUNTS,
>         TOTAL
>     WHERE
>        (1.0 * id_count)/total_id > 0.1
> )
> SELECT
>     JSON_ARRAYAGG(`key`) AS feature_ids,
> FROM
>     RESULTS{code}
> The parallelism adaptively set by Flink for the following operator was always 
> 1.
> {code:java}
> [37]:HashAggregate(isMerge=[true], groupBy=[key, insertion_id], select=[key, 
> insertion_id])
> +- [38]:LocalHashAggregate(groupBy=[key], select=[key, 
> Partial_COUNT(insertion_id) AS count$0]){code}
> If we turn off `execution.batch.adaptive.auto-parallelism.enabled` and 
> manually set `parallelism.default` to be greater than one, it worked.
> The screenshot of the full job graph is attached.  !image_720.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34926) Adaptive auto parallelism doesn't work for a query

2024-03-24 Thread Xingcan Cui (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingcan Cui updated FLINK-34926:

Description: 
We have the following query running in batch mode.
{code:java}
WITH FEATURE_INCLUSION AS (
    SELECT
        insertion_id, -- Not unique
        features -- Array>
    FROM
        features_table
),
TOTAL AS (
    SELECT
        COUNT(DISTINCT insertion_id) total_id
    FROM
        FEATURE_INCLUSION
),
FEATURE_INCLUSION_COUNTS AS (
    SELECT
        `key`,
        COUNT(DISTINCT insertion_id) AS id_count
    FROM
        FEATURE_INCLUSION,
        UNNEST(features) as t (`key`, `value`)
    WHERE
        TRUE
    GROUP BY
        `key`
),
RESULTS AS (
    SELECT
        `key`
    FROM
        FEATURE_INCLUSION_COUNTS,
        TOTAL
    WHERE
       (1.0 * id_count)/total_id > 0.1
)
SELECT
    JSON_ARRAYAGG(`key`) AS feature_ids,
FROM
    RESULTS{code}
The parallelism adaptively set by Flink for the following operator was always 1.
{code:java}
[37]:HashAggregate(isMerge=[true], groupBy=[key, insertion_id], select=[key, 
insertion_id])
+- [38]:LocalHashAggregate(groupBy=[key], select=[key, 
Partial_COUNT(insertion_id) AS count$0]){code}
If we turn off `execution.batch.adaptive.auto-parallelism.enabled` and manually 
set `parallelism.default` to be greater than one, it worked.

The screenshot of the full job graph is attached.  !image_720.png!

  was:
We have the following query running in batch mode.

 
{code:java}
WITH FEATURE_INCLUSION AS (
    SELECT
        insertion_id, -- Not unique
        features -- Array>
    FROM
        features_table
),
TOTAL AS (
    SELECT
        COUNT(DISTINCT insertion_id) total_id
    FROM
        FEATURE_INCLUSION
),
FEATURE_INCLUSION_COUNTS AS (
    SELECT
        `key`,
        COUNT(DISTINCT insertion_id) AS id_count
    FROM
        FEATURE_INCLUSION,
        UNNEST(features) as t (`key`, `value`)
    WHERE
        TRUE
    GROUP BY
        `key`
),
RESULTS AS (
    SELECT
        `key`
    FROM
        FEATURE_INCLUSION_COUNTS,
        TOTAL
    WHERE
       (1.0 * id_count)/total_id > 0.1
)
SELECT
    JSON_ARRAYAGG(`key`) AS feature_ids,
FROM
    RESULTS{code}
The parallelism adaptively set by Flink for the following operator was always 1.
{code:java}
[37]:HashAggregate(isMerge=[true], groupBy=[key, insertion_id], select=[key, 
insertion_id])
+- [38]:LocalHashAggregate(groupBy=[key], select=[key, 
Partial_COUNT(insertion_id) AS count$0]){code}
If we turn off `execution.batch.adaptive.auto-parallelism.enabled` and manually 
set `parallelism.default` to be greater than one, it worked.

The screenshot of the full job graph is attached. !image_720.png!


> Adaptive auto parallelism doesn't work for a query
> --
>
> Key: FLINK-34926
> URL: https://issues.apache.org/jira/browse/FLINK-34926
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.18.1
>Reporter: Xingcan Cui
>Priority: Major
>
> We have the following query running in batch mode.
> {code:java}
> WITH FEATURE_INCLUSION AS (
>     SELECT
>         insertion_id, -- Not unique
>         features -- Array>
>     FROM
>         features_table
> ),
> TOTAL AS (
>     SELECT
>         COUNT(DISTINCT insertion_id) total_id
>     FROM
>         FEATURE_INCLUSION
> ),
> FEATURE_INCLUSION_COUNTS AS (
>     SELECT
>         `key`,
>         COUNT(DISTINCT insertion_id) AS id_count
>     FROM
>         FEATURE_INCLUSION,
>         UNNEST(features) as t (`key`, `value`)
>     WHERE
>         TRUE
>     GROUP BY
>         `key`
> ),
> RESULTS AS (
>     SELECT
>         `key`
>     FROM
>         FEATURE_INCLUSION_COUNTS,
>         TOTAL
>     WHERE
>        (1.0 * id_count)/total_id > 0.1
> )
> SELECT
>     JSON_ARRAYAGG(`key`) AS feature_ids,
> FROM
>     RESULTS{code}
> The parallelism adaptively set by Flink for the following operator was always 
> 1.
> {code:java}
> [37]:HashAggregate(isMerge=[true], groupBy=[key, insertion_id], select=[key, 
> insertion_id])
> +- [38]:LocalHashAggregate(groupBy=[key], select=[key, 
> Partial_COUNT(insertion_id) AS count$0]){code}
> If we turn off `execution.batch.adaptive.auto-parallelism.enabled` and 
> manually set `parallelism.default` to be greater than one, it worked.
> The screenshot of the full job graph is attached.  !image_720.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)