[jira] [Updated] (FLINK-35232) Support for retry settings on GCS connector

2024-04-29 Thread Oleksandr Nitavskyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Nitavskyi updated FLINK-35232:

Description: 
https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to 
specify transport options in GCS connector. While setting the params enabled 
here reduced read timeouts, we still see 503 errors leading to Flink job 
restarts.

Thus, in this ticket, we want to specify additional retry settings as noted in 
[https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy#customize-retries]

We need 
[these|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#methods]
 methods available for Flink users so that they can customize their deployment. 
In particular next settings seems to be the minimum required to adjust GCS 
timeout with Job's checkpoint config:
 * 
[maxAttempts|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxAttempts__]
 * 
[initialRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getInitialRpcTimeout__]
 * 
[rpcTimeoutMultiplier|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getRpcTimeoutMultiplier__]
 * 
[maxRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxRpcTimeout__]
 * 
[totalTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getTotalTimeout__]

 

Basically the proposal is to be able to tune the timeout via multiplier, 
maxAttemts + totalTimeout mechanisms.

All of the config options should be optional and the default one should be used 
in case some of configs are not provided.

  was:
https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to 
specify transport options in GCS connector. While setting the params enabled 
here reduced read timeouts, we still see 503 errors leading to Flink job 
restarts.

Thus, in this ticket, we want to specify additional retry settings as noted in 
[https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy#customize-retries]

We need 
[these|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#methods]
 methods available for Flink users so that they can customize their deployment. 
In particular next settings seems to be the minimum required to adjust GCS 
timeout with Job's checkpoint config:
 * 
[maxAttempts|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxAttempts__]
 * 
[initialRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getInitialRpcTimeout__]
 * 
[rpcTimeoutMultiplier|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getRpcTimeoutMultiplier__]
 * 
[maxRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxRpcTimeout__]
 * 
[totalTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getTotalTimeout__]

 

Basically the proposal is to be able to tune the timeout via various mechanism.

All of the config options should be optional and the default one should be used 
in case some of configs are not provided.


> Support for retry settings on GCS connector
> ---
>
> Key: FLINK-35232
> URL: https://issues.apache.org/jira/browse/FLINK-35232
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.15.3, 1.16.2, 1.17.1, 1.19.0, 1.18.1
>Reporter: Vikas M
>Assignee: Ravi Singh
>Priority: Major
>
> https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to 
> specify transport options in GCS connector. While setting the params enabled 
> here reduced read timeouts, we still see 503 errors leading to Flink job 
> restarts.
> Thus, in this ticket, we want to specify additional retry settings as noted 
> in 
> 

[jira] [Updated] (FLINK-35232) Support for retry settings on GCS connector

2024-04-29 Thread Oleksandr Nitavskyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Nitavskyi updated FLINK-35232:

Description: 
https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to 
specify transport options in GCS connector. While setting the params enabled 
here reduced read timeouts, we still see 503 errors leading to Flink job 
restarts.

Thus, in this ticket, we want to specify additional retry settings as noted in 
[https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy#customize-retries]

We need 
[these|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#methods]
 methods available for Flink users so that they can customize their deployment. 
In particular next settings seems to be the minimum required to adjust GCS 
timeout with Job's checkpoint config:
 * 
[maxAttempts|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxAttempts__]
 * 
[initialRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getInitialRpcTimeout__]
 * 
[rpcTimeoutMultiplier|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getRpcTimeoutMultiplier__]
 * 
[maxRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxRpcTimeout__]
 * 
[totalTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getTotalTimeout__]

 

Basically the proposal is to be able to tune the timeout via various mechanism.

All of the config options should be optional and the default one should be used 
in case some of configs are not provided.

  was:
https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to 
specify transport options in GCS connector. While setting the params enabled 
here reduced read timeouts, we still see 503 errors leading to Flink job 
restarts.

Thus, in this ticket, we want to specify additional retry settings as noted in 
[https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy#customize-retries]

We need 
[these|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#methods]
 methods available for Flink users so that they can customize their deployment. 
In particular next settings seems to be the minimum required to adjust GCS 
timeout with Job's checkpoint config:
 * 
[maxAttempts|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxAttempts__]
 * 
[initialRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getInitialRpcTimeout__]
 * 
[rpcTimeoutMultiplier|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getRpcTimeoutMultiplier__]
 * 
[maxRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxRpcTimeout__]
 * 
[totalTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getTotalTimeout__]

 

All of the config options should be optional and the default one should be used 
in case some of configs are not provided.


> Support for retry settings on GCS connector
> ---
>
> Key: FLINK-35232
> URL: https://issues.apache.org/jira/browse/FLINK-35232
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.15.3, 1.16.2, 1.17.1, 1.19.0, 1.18.1
>Reporter: Vikas M
>Assignee: Ravi Singh
>Priority: Major
>
> https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to 
> specify transport options in GCS connector. While setting the params enabled 
> here reduced read timeouts, we still see 503 errors leading to Flink job 
> restarts.
> Thus, in this ticket, we want to specify additional retry settings as noted 
> in 
> [https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy#customize-retries]
> We need 
> 

[jira] [Commented] (FLINK-35232) Support for retry settings on GCS connector

2024-04-26 Thread Oleksandr Nitavskyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841267#comment-17841267
 ] 

Oleksandr Nitavskyi commented on FLINK-35232:
-

[~galenwarren] thanks. We have reduced the amount of methods to the bare 
minimum. Reflected in the description: 
 * 
[maxAttempts|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxAttempts__]
 * 
[initialRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getInitialRpcTimeout__]
 * 
[rpcTimeoutMultiplier|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getRpcTimeoutMultiplier__]
 * 
[maxRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxRpcTimeout__]
 * 
[totalTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getTotalTimeout__]

Thus Flink user will be able to adjust the total timeout time to the checkpoint 
timeout, so job does it best before it gave up to commit the data.

> Support for retry settings on GCS connector
> ---
>
> Key: FLINK-35232
> URL: https://issues.apache.org/jira/browse/FLINK-35232
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.15.3, 1.16.2, 1.17.1, 1.19.0, 1.18.1
>Reporter: Vikas M
>Assignee: Ravi Singh
>Priority: Major
>
> https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to 
> specify transport options in GCS connector. While setting the params enabled 
> here reduced read timeouts, we still see 503 errors leading to Flink job 
> restarts.
> Thus, in this ticket, we want to specify additional retry settings as noted 
> in 
> [https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy#customize-retries]
> We need 
> [these|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#methods]
>  methods available for Flink users so that they can customize their 
> deployment. In particular next settings seems to be the minimum required to 
> adjust GCS timeout with Job's checkpoint config:
>  * 
> [maxAttempts|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxAttempts__]
>  * 
> [initialRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getInitialRpcTimeout__]
>  * 
> [rpcTimeoutMultiplier|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getRpcTimeoutMultiplier__]
>  * 
> [maxRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxRpcTimeout__]
>  * 
> [totalTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getTotalTimeout__]
>  
> All of the config options should be optional and the default one should be 
> used in case some of configs are not provided.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35232) Support for retry settings on GCS connector

2024-04-26 Thread Oleksandr Nitavskyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Nitavskyi updated FLINK-35232:

Description: 
https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to 
specify transport options in GCS connector. While setting the params enabled 
here reduced read timeouts, we still see 503 errors leading to Flink job 
restarts.

Thus, in this ticket, we want to specify additional retry settings as noted in 
[https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy#customize-retries]

We need 
[these|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#methods]
 methods available for Flink users so that they can customize their deployment. 
In particular next settings seems to be the minimum required to adjust GCS 
timeout with Job's checkpoint config:
 * 
[maxAttempts|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxAttempts__]
 * 
[initialRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getInitialRpcTimeout__]
 * 
[rpcTimeoutMultiplier|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getRpcTimeoutMultiplier__]
 * 
[maxRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxRpcTimeout__]
 * 
[totalTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getTotalTimeout__]

 

All of the config options should be optional and the default one should be used 
in case some of configs are not provided.

  was:
https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to 
specify transport options in GCS connector. While setting the params enabled 
here reduced read timeouts, we still see 503 errors leading to Flink job 
restarts.

Thus, in this ticket, we want to specify additional retry settings as noted in 
[https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy#customize-retries]

We need 
[these|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#methods]
 methods available for Flink users so that they can customize their deployment. 
In particular next settings seems to be the minimum required to adjust GCS 
timeout with Job's checkpoint config:
 * maxAttempts
 * initialRpcTimeout
 * rpcTimeoutMultiplier
 * maxRpcTimeout
 * totalTimeout

 

All of the config options should be optional and the default one should be used 
in case some of configs are not provided.


> Support for retry settings on GCS connector
> ---
>
> Key: FLINK-35232
> URL: https://issues.apache.org/jira/browse/FLINK-35232
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.15.3, 1.16.2, 1.17.1, 1.19.0, 1.18.1
>Reporter: Vikas M
>Assignee: Ravi Singh
>Priority: Major
>
> https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to 
> specify transport options in GCS connector. While setting the params enabled 
> here reduced read timeouts, we still see 503 errors leading to Flink job 
> restarts.
> Thus, in this ticket, we want to specify additional retry settings as noted 
> in 
> [https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy#customize-retries]
> We need 
> [these|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#methods]
>  methods available for Flink users so that they can customize their 
> deployment. In particular next settings seems to be the minimum required to 
> adjust GCS timeout with Job's checkpoint config:
>  * 
> [maxAttempts|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxAttempts__]
>  * 
> [initialRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getInitialRpcTimeout__]
>  * 
> [rpcTimeoutMultiplier|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getRpcTimeoutMultiplier__]
>  * 
> 

[jira] [Updated] (FLINK-35232) Support for retry settings on GCS connector

2024-04-26 Thread Oleksandr Nitavskyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Nitavskyi updated FLINK-35232:

Description: 
https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to 
specify transport options in GCS connector. While setting the params enabled 
here reduced read timeouts, we still see 503 errors leading to Flink job 
restarts.

Thus, in this ticket, we want to specify additional retry settings as noted in 
[https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy#customize-retries]

We need 
[these|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#methods]
 methods available for Flink users so that they can customize their deployment. 
In particular next settings seems to be the minimum required to adjust GCS 
timeout with Job's checkpoint config:
 * maxAttempts
 * initialRpcTimeout
 * rpcTimeoutMultiplier
 * maxRpcTimeout
 * totalTimeout

 

All of the config options should be optional and the default one should be used 
in case some of configs are not provided.

  was:
https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to 
specify transport options in GCS connector. While setting the params enabled 
here reduced read timeouts, we still see 503 errors leading to Flink job 
restarts.

Thus, in this ticket, we want to specify additional retry settings as noted in 
[https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy#customize-retries]

We need 
[these|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#methods]
 methods available for Flink users so that they can customize their deployment. 
In particular next settings seems to be the minimum required to adjust GCS 
timeout with Job's checkpoint config:

```

maxAttempts

initialRpcTimeout

rpcTimeoutMultiplier

maxRpcTimeout

totalTimeout

```

All of the config options should be optional and the default one should be used 
in case some of configs are not provided.


> Support for retry settings on GCS connector
> ---
>
> Key: FLINK-35232
> URL: https://issues.apache.org/jira/browse/FLINK-35232
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.15.3, 1.16.2, 1.17.1, 1.19.0, 1.18.1
>Reporter: Vikas M
>Assignee: Ravi Singh
>Priority: Major
>
> https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to 
> specify transport options in GCS connector. While setting the params enabled 
> here reduced read timeouts, we still see 503 errors leading to Flink job 
> restarts.
> Thus, in this ticket, we want to specify additional retry settings as noted 
> in 
> [https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy#customize-retries]
> We need 
> [these|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#methods]
>  methods available for Flink users so that they can customize their 
> deployment. In particular next settings seems to be the minimum required to 
> adjust GCS timeout with Job's checkpoint config:
>  * maxAttempts
>  * initialRpcTimeout
>  * rpcTimeoutMultiplier
>  * maxRpcTimeout
>  * totalTimeout
>  
> All of the config options should be optional and the default one should be 
> used in case some of configs are not provided.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35232) Support for retry settings on GCS connector

2024-04-26 Thread Oleksandr Nitavskyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Nitavskyi updated FLINK-35232:

Description: 
https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to 
specify transport options in GCS connector. While setting the params enabled 
here reduced read timeouts, we still see 503 errors leading to Flink job 
restarts.

Thus, in this ticket, we want to specify additional retry settings as noted in 
[https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy#customize-retries]

We need 
[these|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#methods]
 methods available for Flink users so that they can customize their deployment. 
In particular next settings seems to be the minimum required to adjust GCS 
timeout with Job's checkpoint config:

```

maxAttempts

initialRpcTimeout

rpcTimeoutMultiplier

maxRpcTimeout

totalTimeout

```

All of the config options should be optional and the default one should be used 
in case some of configs are not provided.

  was:
https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to 
specify transport options in GCS connector. While setting the params enabled 
here reduced read timeouts, we still see 503 errors leading to Flink job 
restarts.

Thus, in this ticket, we want to specify additional retry settings as noted in 
[https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy#customize-retries]

We want 
[these|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#methods]
 methods available for Flink users so that they can customize their deployment.


> Support for retry settings on GCS connector
> ---
>
> Key: FLINK-35232
> URL: https://issues.apache.org/jira/browse/FLINK-35232
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.15.3, 1.16.2, 1.17.1, 1.19.0, 1.18.1
>Reporter: Vikas M
>Assignee: Ravi Singh
>Priority: Major
>
> https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to 
> specify transport options in GCS connector. While setting the params enabled 
> here reduced read timeouts, we still see 503 errors leading to Flink job 
> restarts.
> Thus, in this ticket, we want to specify additional retry settings as noted 
> in 
> [https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy#customize-retries]
> We need 
> [these|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#methods]
>  methods available for Flink users so that they can customize their 
> deployment. In particular next settings seems to be the minimum required to 
> adjust GCS timeout with Job's checkpoint config:
> ```
> maxAttempts
> initialRpcTimeout
> rpcTimeoutMultiplier
> maxRpcTimeout
> totalTimeout
> ```
> All of the config options should be optional and the default one should be 
> used in case some of configs are not provided.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35210) Give the option to set automatically the parallelism of the KafkaSource to the number of kafka partitions

2024-04-22 Thread Oleksandr Nitavskyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839739#comment-17839739
 ] 

Oleksandr Nitavskyi commented on FLINK-35210:
-

Thanks [~npfp] for suggestion. I believe what you proposed is often resolve 
with some wrapper around KafkaSource, which could be a layer of indirection to 
do a lot of things, e.g. parallelism config.

Meanwhile could you please elaborate how could bad parallelism lead to the Idle 
tasks? Do you mean the case where Source parallelism is lower than the amount 
of partitions and thus you have Source which consumes nothing and thus you have 
no watermark advancement unless 
[Idleness|https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources]
 is not configured.

> Give the option to set automatically the parallelism of the KafkaSource to 
> the number of kafka partitions
> -
>
> Key: FLINK-35210
> URL: https://issues.apache.org/jira/browse/FLINK-35210
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Reporter: Nicolas Perrin
>Priority: Minor
>
> Currently the setting of the `KafkaSource` Flink's operator parallelism needs 
> to be manually chosen which can leads to highly skewed tasks if the developer 
> doesn't do this job.
> To avoid this issue, I propose to:
> -  retrieve dynamically the number of partitions of the topic using 
> `KafkaConsumer.
> partitionsFor(topic).size()`,
> - set the parallelism of the stream built from the source based on this value.
>  This way there won't be any idle tasks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33376) Extend Curator config option for Zookeeper configuration

2024-03-25 Thread Oleksandr Nitavskyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Nitavskyi updated FLINK-33376:

Component/s: Runtime / Coordination

> Extend Curator config option for Zookeeper configuration
> 
>
> Key: FLINK-33376
> URL: https://issues.apache.org/jira/browse/FLINK-33376
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Oleksandr Nitavskyi
>Assignee: Oleksandr Nitavskyi
>Priority: Major
>
> In certain cases ZooKeeper requires additional Authentication information. 
> For example list of valid [names for 
> ensemble|https://zookeeper.apache.org/doc/r3.8.0/zookeeperAdmin.html#:~:text=for%20secure%20authentication.-,zookeeper.ensembleAuthName,-%3A%20(Java%20system%20property]
>  in order to prevent the accidental connecting to a wrong ensemble.
> Curator allows to add additional AuthInfo object for such configuration. Thus 
> it would be useful to add one more additional Map property which would allow 
> to pass AuthInfo objects during Curator client creation.
> *Acceptance Criteria:* For Flink users it is possible to configure auth info 
> list for Curator framework client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33376) Extend Curator config option for Zookeeper configuration

2023-12-12 Thread Oleksandr Nitavskyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Nitavskyi updated FLINK-33376:

Summary: Extend Curator config option for Zookeeper configuration  (was: 
Add AuthInfo config option for Zookeeper configuration)

> Extend Curator config option for Zookeeper configuration
> 
>
> Key: FLINK-33376
> URL: https://issues.apache.org/jira/browse/FLINK-33376
> Project: Flink
>  Issue Type: Improvement
>Reporter: Oleksandr Nitavskyi
>Assignee: Oleksandr Nitavskyi
>Priority: Major
>
> In certain cases ZooKeeper requires additional Authentication information. 
> For example list of valid [names for 
> ensemble|https://zookeeper.apache.org/doc/r3.8.0/zookeeperAdmin.html#:~:text=for%20secure%20authentication.-,zookeeper.ensembleAuthName,-%3A%20(Java%20system%20property]
>  in order to prevent the accidental connecting to a wrong ensemble.
> Curator allows to add additional AuthInfo object for such configuration. Thus 
> it would be useful to add one more additional Map property which would allow 
> to pass AuthInfo objects during Curator client creation.
> *Acceptance Criteria:* For Flink users it is possible to configure auth info 
> list for Curator framework client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33376) Add AuthInfo config option for Zookeeper configuration

2023-11-06 Thread Oleksandr Nitavskyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17783220#comment-17783220
 ] 

Oleksandr Nitavskyi commented on FLINK-33376:
-

Thanks for the detailed listing. It sounds reasonable to add configuration in 
public Flink interface for only those options:
* 
[authorization|https://curator.apache.org/apidocs/org/apache/curator/framework/CuratorFrameworkFactory.Builder.html#authorization(java.lang.String,byte%5B%5D)]
* 
[maxCloseWaitMs|https://curator.apache.org/apidocs/org/apache/curator/framework/CuratorFrameworkFactory.Builder.html#maxCloseWaitMs(int)]
* 
[simulatedSessionExpirationPercent|https://curator.apache.org/apidocs/org/apache/curator/framework/CuratorFrameworkFactory.Builder.html#simulatedSessionExpirationPercent(int)]

It also make sense to not touch the 
[compressionProvider|https://curator.apache.org/apidocs/org/apache/curator/framework/CuratorFrameworkFactory.Builder.html#compressionProvider(org.apache.curator.framework.api.CompressionProvider)]
 since Flink doesn't store any BLOB data in zookeeper.

In general it make sense to expose only connection related configuration 
(auth/timeouts) since it depends on the Flink's user environment.


Once we have aligned on the set of the options and the fact that we basically 
would add 3 missing options, we can start the documentation process. Am I right?

> Add AuthInfo config option for Zookeeper configuration
> --
>
> Key: FLINK-33376
> URL: https://issues.apache.org/jira/browse/FLINK-33376
> Project: Flink
>  Issue Type: Improvement
>Reporter: Oleksandr Nitavskyi
>Assignee: Oleksandr Nitavskyi
>Priority: Major
>
> In certain cases ZooKeeper requires additional Authentication information. 
> For example list of valid [names for 
> ensemble|https://zookeeper.apache.org/doc/r3.8.0/zookeeperAdmin.html#:~:text=for%20secure%20authentication.-,zookeeper.ensembleAuthName,-%3A%20(Java%20system%20property]
>  in order to prevent the accidental connecting to a wrong ensemble.
> Curator allows to add additional AuthInfo object for such configuration. Thus 
> it would be useful to add one more additional Map property which would allow 
> to pass AuthInfo objects during Curator client creation.
> *Acceptance Criteria:* For Flink users it is possible to configure auth info 
> list for Curator framework client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33376) Add AuthInfo config option for Zookeeper configuration

2023-10-27 Thread Oleksandr Nitavskyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780489#comment-17780489
 ] 

Oleksandr Nitavskyi commented on FLINK-33376:
-

It would be really good to be able to support something generic enough to 
translate Flink configuration into Curator config, e.g. like in [hadoop 
config|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#flink-hadoop-%3Ckey%3E].
 

But since Curator uses the Builder pattern I do not see how can we make it 
generic enough. Probably as compromise it would be sane to consider to add 
support for all missing Curator configurations. 
If we go this way here is the list of configurations which Flink doesn't 
configure at all for now:
* 
[authorization|https://curator.apache.org/apidocs/org/apache/curator/framework/CuratorFrameworkFactory.Builder.html#authorization(java.lang.String,byte%5B%5D)]
* 
[canBeReadOnly|https://curator.apache.org/apidocs/org/apache/curator/framework/CuratorFrameworkFactory.Builder.html#canBeReadOnly(boolean)]
* 
[compressionProvider|https://curator.apache.org/apidocs/org/apache/curator/framework/CuratorFrameworkFactory.Builder.html#compressionProvider(org.apache.curator.framework.api.CompressionProvider)]
* 
[defaultData|https://curator.apache.org/apidocs/org/apache/curator/framework/CuratorFrameworkFactory.Builder.html#defaultData(byte%5B%5D)]
* 
[dontUseContainerParents|https://curator.apache.org/apidocs/org/apache/curator/framework/CuratorFrameworkFactory.Builder.html#dontUseContainerParents()]/[useContainerParentsIfAvailable|https://curator.apache.org/apidocs/org/apache/curator/framework/CuratorFrameworkFactory.Builder.html#useContainerParentsIfAvailable()]
* 
[maxCloseWaitMs|https://curator.apache.org/apidocs/org/apache/curator/framework/CuratorFrameworkFactory.Builder.html#maxCloseWaitMs(int)]
* 
[namespace|https://curator.apache.org/apidocs/org/apache/curator/framework/CuratorFrameworkFactory.Builder.html#namespace(java.lang.String)]
* 
[runSafeService|https://curator.apache.org/apidocs/org/apache/curator/framework/CuratorFrameworkFactory.Builder.html#runSafeService(java.util.concurrent.Executor)]
* 
[schemaSet|https://curator.apache.org/apidocs/org/apache/curator/framework/CuratorFrameworkFactory.Builder.html#schemaSet(org.apache.curator.framework.schema.SchemaSet)]
* 
[simulatedSessionExpirationPercent|https://curator.apache.org/apidocs/org/apache/curator/framework/CuratorFrameworkFactory.Builder.html#simulatedSessionExpirationPercent(int)]
* 
[waitForShutdownTimeoutMs|https://curator.apache.org/apidocs/org/apache/curator/framework/CuratorFrameworkFactory.Builder.html#waitForShutdownTimeoutMs(int)]



> Add AuthInfo config option for Zookeeper configuration
> --
>
> Key: FLINK-33376
> URL: https://issues.apache.org/jira/browse/FLINK-33376
> Project: Flink
>  Issue Type: Improvement
>Reporter: Oleksandr Nitavskyi
>Assignee: Oleksandr Nitavskyi
>Priority: Major
>
> In certain cases ZooKeeper requires additional Authentication information. 
> For example list of valid [names for 
> ensemble|https://zookeeper.apache.org/doc/r3.8.0/zookeeperAdmin.html#:~:text=for%20secure%20authentication.-,zookeeper.ensembleAuthName,-%3A%20(Java%20system%20property]
>  in order to prevent the accidental connecting to a wrong ensemble.
> Curator allows to add additional AuthInfo object for such configuration. Thus 
> it would be useful to add one more additional Map property which would allow 
> to pass AuthInfo objects during Curator client creation.
> *Acceptance Criteria:* For Flink users it is possible to configure auth info 
> list for Curator framework client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33376) Add AuthInfo config option for Zookeeper configuration

2023-10-27 Thread Oleksandr Nitavskyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780387#comment-17780387
 ] 

Oleksandr Nitavskyi commented on FLINK-33376:
-

Thanks for the link. If we really require a Flip, I think it would be nice to 
anticipate a bit more parameters for Curator framework.

> Add AuthInfo config option for Zookeeper configuration
> --
>
> Key: FLINK-33376
> URL: https://issues.apache.org/jira/browse/FLINK-33376
> Project: Flink
>  Issue Type: Improvement
>Reporter: Oleksandr Nitavskyi
>Assignee: Oleksandr Nitavskyi
>Priority: Major
>
> In certain cases ZooKeeper requires additional Authentication information. 
> For example list of valid [names for 
> ensemble|https://zookeeper.apache.org/doc/r3.8.0/zookeeperAdmin.html#:~:text=for%20secure%20authentication.-,zookeeper.ensembleAuthName,-%3A%20(Java%20system%20property]
>  in order to prevent the accidental connecting to a wrong ensemble.
> Curator allows to add additional AuthInfo object for such configuration. Thus 
> it would be useful to add one more additional Map property which would allow 
> to pass AuthInfo objects during Curator client creation.
> *Acceptance Criteria:* For Flink users it is possible to configure auth info 
> list for Curator framework client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-33376) Add AuthInfo config option for Zookeeper configuration

2023-10-27 Thread Oleksandr Nitavskyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780387#comment-17780387
 ] 

Oleksandr Nitavskyi edited comment on FLINK-33376 at 10/27/23 1:56 PM:
---

Thanks for the link. If we really require a Flip, I think it would be nice to 
anticipate a bit more generic way to configure the Curator framework.


was (Author: oleksandr nitavskyi):
Thanks for the link. If we really require a Flip, I think it would be nice to 
anticipate a bit more parameters for Curator framework.

> Add AuthInfo config option for Zookeeper configuration
> --
>
> Key: FLINK-33376
> URL: https://issues.apache.org/jira/browse/FLINK-33376
> Project: Flink
>  Issue Type: Improvement
>Reporter: Oleksandr Nitavskyi
>Assignee: Oleksandr Nitavskyi
>Priority: Major
>
> In certain cases ZooKeeper requires additional Authentication information. 
> For example list of valid [names for 
> ensemble|https://zookeeper.apache.org/doc/r3.8.0/zookeeperAdmin.html#:~:text=for%20secure%20authentication.-,zookeeper.ensembleAuthName,-%3A%20(Java%20system%20property]
>  in order to prevent the accidental connecting to a wrong ensemble.
> Curator allows to add additional AuthInfo object for such configuration. Thus 
> it would be useful to add one more additional Map property which would allow 
> to pass AuthInfo objects during Curator client creation.
> *Acceptance Criteria:* For Flink users it is possible to configure auth info 
> list for Curator framework client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33376) Add AuthInfo config option for Zookeeper configuration

2023-10-27 Thread Oleksandr Nitavskyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780373#comment-17780373
 ] 

Oleksandr Nitavskyi commented on FLINK-33376:
-

[~mapohl] sure, would be glad to drive this! 

Sorry, didn't know that we need to start a dev list discussion (didn't make it 
for several config options in the past). Will drop an email to ensure 
visibility with the community.

> Add AuthInfo config option for Zookeeper configuration
> --
>
> Key: FLINK-33376
> URL: https://issues.apache.org/jira/browse/FLINK-33376
> Project: Flink
>  Issue Type: Improvement
>Reporter: Oleksandr Nitavskyi
>Priority: Major
>
> In certain cases ZooKeeper requires additional Authentication information. 
> For example list of valid [names for 
> ensemble|https://zookeeper.apache.org/doc/r3.8.0/zookeeperAdmin.html#:~:text=for%20secure%20authentication.-,zookeeper.ensembleAuthName,-%3A%20(Java%20system%20property]
>  in order to prevent the accidental connecting to a wrong ensemble.
> Curator allows to add additional AuthInfo object for such configuration. Thus 
> it would be useful to add one more additional Map property which would allow 
> to pass AuthInfo objects during Curator client creation.
> *Acceptance Criteria:* For Flink users it is possible to configure auth info 
> list for Curator framework client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33376) Add AuthInfo config option for Zookeeper configuration

2023-10-26 Thread Oleksandr Nitavskyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17779947#comment-17779947
 ] 

Oleksandr Nitavskyi commented on FLINK-33376:
-

For implementation we could add an additional Map config option and Flink users 
will be able to pass AuthInfo.
There is some miss-alignment, AuthInfo type is  while Map is 
.
As simplest workaround we get accept  on Flink config interface 
and use _getBytes()_ method in order to adapt interfaces.

> Add AuthInfo config option for Zookeeper configuration
> --
>
> Key: FLINK-33376
> URL: https://issues.apache.org/jira/browse/FLINK-33376
> Project: Flink
>  Issue Type: Improvement
>Reporter: Oleksandr Nitavskyi
>Priority: Major
>
> In certain cases ZooKeeper requires additional Authentication information. 
> For example list of valid [names for 
> ensemble|https://zookeeper.apache.org/doc/r3.8.0/zookeeperAdmin.html#:~:text=for%20secure%20authentication.-,zookeeper.ensembleAuthName,-%3A%20(Java%20system%20property]
>  in order to prevent the accidental connecting to a wrong ensemble.
> Curator allows to add additional AuthInfo object for such configuration. Thus 
> it would be useful to add one more additional Map property which would allow 
> to pass AuthInfo objects during Curator client creation.
> *Acceptance Criteria:* For Flink users it is possible to configure auth info 
> list for Curator framework client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33376) Add AuthInfo config option for Zookeeper configuration

2023-10-26 Thread Oleksandr Nitavskyi (Jira)
Oleksandr Nitavskyi created FLINK-33376:
---

 Summary: Add AuthInfo config option for Zookeeper configuration
 Key: FLINK-33376
 URL: https://issues.apache.org/jira/browse/FLINK-33376
 Project: Flink
  Issue Type: Improvement
Reporter: Oleksandr Nitavskyi


In certain cases ZooKeeper requires additional Authentication information. For 
example list of valid [names for 
ensemble|https://zookeeper.apache.org/doc/r3.8.0/zookeeperAdmin.html#:~:text=for%20secure%20authentication.-,zookeeper.ensembleAuthName,-%3A%20(Java%20system%20property]
 in order to prevent the accidental connecting to a wrong ensemble.

Curator allows to add additional AuthInfo object for such configuration. Thus 
it would be useful to add one more additional Map property which would allow to 
pass AuthInfo objects during Curator client creation.

*Acceptance Criteria:* For Flink users it is possible to configure auth info 
list for Curator framework client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32203) Potential ClassLoader memory leak due to log4j configuration

2023-05-26 Thread Oleksandr Nitavskyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Nitavskyi updated FLINK-32203:

Priority: Minor  (was: Major)

> Potential ClassLoader memory leak due to log4j configuration
> 
>
> Key: FLINK-32203
> URL: https://issues.apache.org/jira/browse/FLINK-32203
> Project: Flink
>  Issue Type: Bug
>Reporter: Oleksandr Nitavskyi
>Priority: Minor
>  Labels: pull-request-available
> Attachments: classloader_leak.png, 
> stack_trace_example_with_log4j_creation_on_job_reload.log
>
>
> *Context*
> We have encountered a memory leak related to ClassLoaders in Apache Flink. 
> ChildFirstClassLoader is not properly garbage collected, when job is being 
> restarted.
> Heap Dump has shown that Log4j starts a configuration watch thread, which 
> then has Strong reference to ChildFirstClassLoader via AccessControlContext. 
> Since thread is never stopped, ChildFirstClassLoader is never cleaned. 
> Removal monitorInterval introduced in FLINK-20510 helps to mitigate the 
> issue, I believe it could be applied to log4j config by default.
> *How to reproduce*
> Deploy Flink job, which uses Hadoop File System (e.g. s3a). Redeploy the job 
> -> in Task Manager dump you should see multiple Log4jThreads
> *AC*
> We have a configuration which doesn't lead easy to memory leak with default 
> configuration for Flink users.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32203) Potential ClassLoader memory leak due to log4j configuration

2023-05-26 Thread Oleksandr Nitavskyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726626#comment-17726626
 ] 

Oleksandr Nitavskyi commented on FLINK-32203:
-

[~chesnay] thanks for looking into PR 
(https://github.com/apache/flink/pull/22664). You can see in attach an example 
of the stack trace, which we get when Log4jThread is being created.

We have run the job and were killing one of JobManager to rely on HA and 
trigger the job restart.
During debug of the Log4jThread creation we saw that in StackTrace there are 
Presto (for checkpoint) or Hadoop S3A (to write output on S3) FileSystems, 
which are loaded from Plugin Classloader. (example stack trace is attached)

Do you know if a plugin Classloader instance is created per job, when a job is 
being created? If yes, probably this instance is being passed to 
Log4jContextFactory and thus a new Log4j subsystem being created.

> Potential ClassLoader memory leak due to log4j configuration
> 
>
> Key: FLINK-32203
> URL: https://issues.apache.org/jira/browse/FLINK-32203
> Project: Flink
>  Issue Type: Bug
>Reporter: Oleksandr Nitavskyi
>Priority: Major
>  Labels: pull-request-available
> Attachments: classloader_leak.png, 
> stack_trace_example_with_log4j_creation_on_job_reload.log
>
>
> *Context*
> We have encountered a memory leak related to ClassLoaders in Apache Flink. 
> ChildFirstClassLoader is not properly garbage collected, when job is being 
> restarted.
> Heap Dump has shown that Log4j starts a configuration watch thread, which 
> then has Strong reference to ChildFirstClassLoader via AccessControlContext. 
> Since thread is never stopped, ChildFirstClassLoader is never cleaned. 
> Removal monitorInterval introduced in FLINK-20510 helps to mitigate the 
> issue, I believe it could be applied to log4j config by default.
> *How to reproduce*
> Deploy Flink job, which uses Hadoop File System (e.g. s3a). Redeploy the job 
> -> in Task Manager dump you should see multiple Log4jThreads
> *AC*
> We have a configuration which doesn't lead easy to memory leak with default 
> configuration for Flink users.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32203) Potential ClassLoader memory leak due to log4j configuration

2023-05-26 Thread Oleksandr Nitavskyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Nitavskyi updated FLINK-32203:

Description: 
*Context*

We have encountered a memory leak related to ClassLoaders in Apache Flink. 
ChildFirstClassLoader is not properly garbage collected, when job is being 
restarted.
Heap Dump has shown that Log4j starts a configuration watch thread, which then 
has Strong reference to ChildFirstClassLoader via AccessControlContext. Since 
thread is never stopped, ChildFirstClassLoader is never cleaned. 

Removal monitorInterval introduced in FLINK-20510 helps to mitigate the issue, 
I believe it could be applied to log4j config by default.

*How to reproduce*
Deploy Flink job, which uses Hadoop File System (e.g. s3a). Redeploy the job -> 
in Task Manager dump you should see multiple 

*AC*
We have a configuration which doesn't lead easy to memory leak with default 
configuration for Flink users.

  was:
*Context*

We have encountered a memory leak related to ClassLoaders in Apache Flink. 
ChildFirstClassLoader is not properly garbage collected, when job is being 
restarted.
Heap Dump has shown that Log4j starts a configuration watch thread, which then 
has Strong reference to ChildFirstClassLoader via AccessControlContext. Since 
thread is never stopped, ChildFirstClassLoader is never cleaned. 

Removal monitorInterval introduced in FLINK-20510 helps to mitigate the issue, 
I believe it could be applied to log4j config by default.

*AC*
We have a configuration which doesn't lead easy to memory leak with default 
configuration for Flink users.


> Potential ClassLoader memory leak due to log4j configuration
> 
>
> Key: FLINK-32203
> URL: https://issues.apache.org/jira/browse/FLINK-32203
> Project: Flink
>  Issue Type: Bug
>Reporter: Oleksandr Nitavskyi
>Priority: Major
> Attachments: classloader_leak.png, 
> stack_trace_example_with_log4j_creation_on_job_reload.log
>
>
> *Context*
> We have encountered a memory leak related to ClassLoaders in Apache Flink. 
> ChildFirstClassLoader is not properly garbage collected, when job is being 
> restarted.
> Heap Dump has shown that Log4j starts a configuration watch thread, which 
> then has Strong reference to ChildFirstClassLoader via AccessControlContext. 
> Since thread is never stopped, ChildFirstClassLoader is never cleaned. 
> Removal monitorInterval introduced in FLINK-20510 helps to mitigate the 
> issue, I believe it could be applied to log4j config by default.
> *How to reproduce*
> Deploy Flink job, which uses Hadoop File System (e.g. s3a). Redeploy the job 
> -> in Task Manager dump you should see multiple 
> *AC*
> We have a configuration which doesn't lead easy to memory leak with default 
> configuration for Flink users.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32203) Potential ClassLoader memory leak due to log4j configuration

2023-05-26 Thread Oleksandr Nitavskyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Nitavskyi updated FLINK-32203:

Description: 
*Context*

We have encountered a memory leak related to ClassLoaders in Apache Flink. 
ChildFirstClassLoader is not properly garbage collected, when job is being 
restarted.
Heap Dump has shown that Log4j starts a configuration watch thread, which then 
has Strong reference to ChildFirstClassLoader via AccessControlContext. Since 
thread is never stopped, ChildFirstClassLoader is never cleaned. 

Removal monitorInterval introduced in FLINK-20510 helps to mitigate the issue, 
I believe it could be applied to log4j config by default.

*How to reproduce*
Deploy Flink job, which uses Hadoop File System (e.g. s3a). Redeploy the job -> 
in Task Manager dump you should see multiple Log4jThreads

*AC*
We have a configuration which doesn't lead easy to memory leak with default 
configuration for Flink users.

  was:
*Context*

We have encountered a memory leak related to ClassLoaders in Apache Flink. 
ChildFirstClassLoader is not properly garbage collected, when job is being 
restarted.
Heap Dump has shown that Log4j starts a configuration watch thread, which then 
has Strong reference to ChildFirstClassLoader via AccessControlContext. Since 
thread is never stopped, ChildFirstClassLoader is never cleaned. 

Removal monitorInterval introduced in FLINK-20510 helps to mitigate the issue, 
I believe it could be applied to log4j config by default.

*How to reproduce*
Deploy Flink job, which uses Hadoop File System (e.g. s3a). Redeploy the job -> 
in Task Manager dump you should see multiple 

*AC*
We have a configuration which doesn't lead easy to memory leak with default 
configuration for Flink users.


> Potential ClassLoader memory leak due to log4j configuration
> 
>
> Key: FLINK-32203
> URL: https://issues.apache.org/jira/browse/FLINK-32203
> Project: Flink
>  Issue Type: Bug
>Reporter: Oleksandr Nitavskyi
>Priority: Major
> Attachments: classloader_leak.png, 
> stack_trace_example_with_log4j_creation_on_job_reload.log
>
>
> *Context*
> We have encountered a memory leak related to ClassLoaders in Apache Flink. 
> ChildFirstClassLoader is not properly garbage collected, when job is being 
> restarted.
> Heap Dump has shown that Log4j starts a configuration watch thread, which 
> then has Strong reference to ChildFirstClassLoader via AccessControlContext. 
> Since thread is never stopped, ChildFirstClassLoader is never cleaned. 
> Removal monitorInterval introduced in FLINK-20510 helps to mitigate the 
> issue, I believe it could be applied to log4j config by default.
> *How to reproduce*
> Deploy Flink job, which uses Hadoop File System (e.g. s3a). Redeploy the job 
> -> in Task Manager dump you should see multiple Log4jThreads
> *AC*
> We have a configuration which doesn't lead easy to memory leak with default 
> configuration for Flink users.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32203) Potential ClassLoader memory leak due to log4j configuration

2023-05-26 Thread Oleksandr Nitavskyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Nitavskyi updated FLINK-32203:

Attachment: stack_trace_example_with_log4j_creation_on_job_reload.log

> Potential ClassLoader memory leak due to log4j configuration
> 
>
> Key: FLINK-32203
> URL: https://issues.apache.org/jira/browse/FLINK-32203
> Project: Flink
>  Issue Type: Bug
>Reporter: Oleksandr Nitavskyi
>Priority: Major
> Attachments: classloader_leak.png, 
> stack_trace_example_with_log4j_creation_on_job_reload.log
>
>
> *Context*
> We have encountered a memory leak related to ClassLoaders in Apache Flink. 
> ChildFirstClassLoader is not properly garbage collected, when job is being 
> restarted.
> Heap Dump has shown that Log4j starts a configuration watch thread, which 
> then has Strong reference to ChildFirstClassLoader via AccessControlContext. 
> Since thread is never stopped, ChildFirstClassLoader is never cleaned. 
> Removal monitorInterval introduced in FLINK-20510 helps to mitigate the 
> issue, I believe it could be applied to log4j config by default.
> *AC*
> We have a configuration which doesn't lead easy to memory leak with default 
> configuration for Flink users.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32203) Potential ClassLoader memory leak due to log4j configuration

2023-05-26 Thread Oleksandr Nitavskyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Nitavskyi updated FLINK-32203:

Attachment: classloader_leak.png

> Potential ClassLoader memory leak due to log4j configuration
> 
>
> Key: FLINK-32203
> URL: https://issues.apache.org/jira/browse/FLINK-32203
> Project: Flink
>  Issue Type: Bug
>Reporter: Oleksandr Nitavskyi
>Priority: Major
> Attachments: classloader_leak.png
>
>
> *Context*
> We have encountered a memory leak related to ClassLoaders in Apache Flink. 
> ChildFirstClassLoader is not properly garbage collected, when job is being 
> restarted.
> Heap Dump has shown that Log4j starts a configuration watch thread, which 
> then has Strong reference to ChildFirstClassLoader via AccessControlContext. 
> Since thread is never stopped, ChildFirstClassLoader is never cleaned. 
> Removal monitorInterval introduced in FLINK-20510 helps to mitigate the 
> issue, I believe it could be applied to log4j config by default.
> *AC*
> We have a configuration which doesn't lead easy to memory leak with default 
> configuration for Flink users.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32203) Potential ClassLoader memory leak due to log4j configuration

2023-05-26 Thread Oleksandr Nitavskyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Nitavskyi updated FLINK-32203:

Description: 
*Context*

We have encountered a memory leak related to ClassLoaders in Apache Flink. 
ChildFirstClassLoader is not properly garbage collected, when job is being 
restarted.
Heap Dump has shown that Log4j starts a configuration watch thread, which then 
has Strong reference to ChildFirstClassLoader via AccessControlContext. Since 
thread is never stopped, ChildFirstClassLoader is never cleaned. 

Removal monitorInterval introduced in FLINK-20510 helps to mitigate the issue, 
I believe it could be applied to log4j config by default.

*AC*
We have a configuration which doesn't lead easy to memory leak with default 
configuration for Flink users.

> Potential ClassLoader memory leak due to log4j configuration
> 
>
> Key: FLINK-32203
> URL: https://issues.apache.org/jira/browse/FLINK-32203
> Project: Flink
>  Issue Type: Bug
>Reporter: Oleksandr Nitavskyi
>Priority: Major
> Attachments: classloader_leak.png
>
>
> *Context*
> We have encountered a memory leak related to ClassLoaders in Apache Flink. 
> ChildFirstClassLoader is not properly garbage collected, when job is being 
> restarted.
> Heap Dump has shown that Log4j starts a configuration watch thread, which 
> then has Strong reference to ChildFirstClassLoader via AccessControlContext. 
> Since thread is never stopped, ChildFirstClassLoader is never cleaned. 
> Removal monitorInterval introduced in FLINK-20510 helps to mitigate the 
> issue, I believe it could be applied to log4j config by default.
> *AC*
> We have a configuration which doesn't lead easy to memory leak with default 
> configuration for Flink users.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32203) Potential ClassLoader memory leak due to log4j configuration

2023-05-26 Thread Oleksandr Nitavskyi (Jira)
Oleksandr Nitavskyi created FLINK-32203:
---

 Summary: Potential ClassLoader memory leak due to log4j 
configuration
 Key: FLINK-32203
 URL: https://issues.apache.org/jira/browse/FLINK-32203
 Project: Flink
  Issue Type: Bug
Reporter: Oleksandr Nitavskyi






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31780) Allow users to disable "Ensemble tracking" for ZooKeeper

2023-04-26 Thread Oleksandr Nitavskyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Nitavskyi updated FLINK-31780:

Description: 
In Apache Curator an option to skip ensemble tracking was added since version 
5.0.0 ([CURATOR-568|https://issues.apache.org/jira/browse/CURATOR-568])

This can be useful in certain scenarios in which CuratorFramework is accessing 
to ZK clusters via load balancer or Virtual IPs. 
Thus in case Zookeeper of Flink user is running behind LB or Virtual IP 
ensemble tracking could be disabled too.

In case ZooKeeper is hidden under VIP it can return URL during Ensemble 
Tracking, which would lead to Unresolved Host Exception inside Curator 
Framework. On Flink level it would lead to cluster restart.

Currently HA with ZooKeeper can even lead to the JobManager failure. The 
scenario of the failure is next:

# Flink connects to ZooKeeper via configured URL.
# Ensemble tracking gets a new URL of ensemble, which is not obligatory 
accessible for Flink, because Zookeeper is under VIP.
# In case of reconnect Flink fails to Zookeeper, moreover due to 
"UnresolvedHostException" Flink's jobManager is killed.

*Acceptance Criteria:* Users of Apache Flink has a Zookeeper config option to 
disable ensemble tracking for ZooKeeper.

  was:
In Apache Curator an option to skip ensemble tracking was added since version 
5.0.0 ([CURATOR-568|https://issues.apache.org/jira/browse/CURATOR-568])

This can be useful in certain scenarios in which CuratorFramework is accessing 
to ZK clusters via load balancer or Virtual IPs. 
Thus in case Zookeeper of Flink user is running behind LB or Virtual IP 
ensemble tracking could be disabled too.

In case ZooKeeper is hidden under VIP it can return URL during Ensemble 
Tracking, which would lead to Unresolved Host Exception inside Curator 
Framework. On Flink level it would lead to cluster restart.

*Acceptance Criteria:* Users of Apache Flink has a Zookeeper config option to 
disable ensemble tracking for ZooKeeper.


> Allow users to disable "Ensemble tracking" for ZooKeeper
> 
>
> Key: FLINK-31780
> URL: https://issues.apache.org/jira/browse/FLINK-31780
> Project: Flink
>  Issue Type: Improvement
>Reporter: Oleksandr Nitavskyi
>Priority: Major
>  Labels: pull-request-available
>
> In Apache Curator an option to skip ensemble tracking was added since version 
> 5.0.0 ([CURATOR-568|https://issues.apache.org/jira/browse/CURATOR-568])
> This can be useful in certain scenarios in which CuratorFramework is 
> accessing to ZK clusters via load balancer or Virtual IPs. 
> Thus in case Zookeeper of Flink user is running behind LB or Virtual IP 
> ensemble tracking could be disabled too.
> In case ZooKeeper is hidden under VIP it can return URL during Ensemble 
> Tracking, which would lead to Unresolved Host Exception inside Curator 
> Framework. On Flink level it would lead to cluster restart.
> Currently HA with ZooKeeper can even lead to the JobManager failure. The 
> scenario of the failure is next:
> # Flink connects to ZooKeeper via configured URL.
> # Ensemble tracking gets a new URL of ensemble, which is not obligatory 
> accessible for Flink, because Zookeeper is under VIP.
> # In case of reconnect Flink fails to Zookeeper, moreover due to 
> "UnresolvedHostException" Flink's jobManager is killed.
> *Acceptance Criteria:* Users of Apache Flink has a Zookeeper config option to 
> disable ensemble tracking for ZooKeeper.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31780) Allow users to disable "Ensemble tracking" for ZooKeeper

2023-04-18 Thread Oleksandr Nitavskyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Nitavskyi updated FLINK-31780:

Description: 
In Apache Curator an option to skip ensemble tracking was added since version 
5.0.0 ([CURATOR-568|https://issues.apache.org/jira/browse/CURATOR-568])

This can be useful in certain scenarios in which CuratorFramework is accessing 
to ZK clusters via load balancer or Virtual IPs. 
Thus in case Zookeeper of Flink user is running behind LB or Virtual IP 
ensemble tracking could be disabled too.

In case ZooKeeper is hidden under VIP it can return URL during Ensemble 
Tracking, which would lead to Unresolved Host Exception inside Curator 
Framework. On Flink level it would lead to cluster restart.

*Acceptance Criteria:* Users of Apache Flink has a Zookeeper config option to 
disable ensemble tracking for ZooKeeper.

  was:
In Apache Curator an option to skip ensemble tracking was added since version 
5.0.0 ([CURATOR-568|https://issues.apache.org/jira/browse/CURATOR-568])

This can be useful in certain scenarios in which CuratorFramework is accessing 
to ZK clusters via load balancer or Virtual IPs. 
Thus in case Zookeeper of Flink user is running behind LB or Virtual IP 
ensemble tracking could be disabled too.

*Acceptance Criteria:* Users of Apache Flink has a Zookeeper config option to 
disable ensemble tracking for ZooKeeper.


> Allow users to disable "Ensemble tracking" for ZooKeeper
> 
>
> Key: FLINK-31780
> URL: https://issues.apache.org/jira/browse/FLINK-31780
> Project: Flink
>  Issue Type: Improvement
>Reporter: Oleksandr Nitavskyi
>Priority: Major
>  Labels: pull-request-available
>
> In Apache Curator an option to skip ensemble tracking was added since version 
> 5.0.0 ([CURATOR-568|https://issues.apache.org/jira/browse/CURATOR-568])
> This can be useful in certain scenarios in which CuratorFramework is 
> accessing to ZK clusters via load balancer or Virtual IPs. 
> Thus in case Zookeeper of Flink user is running behind LB or Virtual IP 
> ensemble tracking could be disabled too.
> In case ZooKeeper is hidden under VIP it can return URL during Ensemble 
> Tracking, which would lead to Unresolved Host Exception inside Curator 
> Framework. On Flink level it would lead to cluster restart.
> *Acceptance Criteria:* Users of Apache Flink has a Zookeeper config option to 
> disable ensemble tracking for ZooKeeper.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31780) Allow users to disable "Ensemble tracking" for ZooKeeper

2023-04-12 Thread Oleksandr Nitavskyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Nitavskyi updated FLINK-31780:

Description: 
In Apache Curator an option to skip ensemble tracking was added since version 
5.0.0 ([CURATOR-568|https://issues.apache.org/jira/browse/CURATOR-568])

This can be useful in certain scenarios in which CuratorFramework is accessing 
to ZK clusters via load balancer or Virtual IPs. 
Thus in case Zookeeper of Flink user is running behind LB or Virtual IP 
ensemble tracking could be disabled too.

*Acceptance Criteria:* Users of Apache Flink has a Zookeeper config option to 
disable ensemble tracking for ZooKeeper.

  was:
In Apache Curator an option to skip ensemble tracking was added since version 
5.0.0 ([CURATOR-568|https://issues.apache.org/jira/browse/CURATOR-568])

This can be useful in certain scenarios in which CuratorFramework is accessing 
to ZK clusters via load balancer or Virtual IPs. 
Thus in case Zookeeper of Flink user is running behind LB or Virtual IP 
ensemble tracking could be disabled too.

Moreover enabled ensemble tracking could lead to NPE and thus Flink cluster 
failure, even if only single node is not available, see fix in Apache Curator: 
https://github.com/apache/curator/pull/433.

*Acceptance Criteria:* Users of Apache Flink has a Zookeeper config option to 
disable ensemble tracking for ZooKeeper.


> Allow users to disable "Ensemble tracking" for ZooKeeper
> 
>
> Key: FLINK-31780
> URL: https://issues.apache.org/jira/browse/FLINK-31780
> Project: Flink
>  Issue Type: Bug
>Reporter: Oleksandr Nitavskyi
>Priority: Major
>
> In Apache Curator an option to skip ensemble tracking was added since version 
> 5.0.0 ([CURATOR-568|https://issues.apache.org/jira/browse/CURATOR-568])
> This can be useful in certain scenarios in which CuratorFramework is 
> accessing to ZK clusters via load balancer or Virtual IPs. 
> Thus in case Zookeeper of Flink user is running behind LB or Virtual IP 
> ensemble tracking could be disabled too.
> *Acceptance Criteria:* Users of Apache Flink has a Zookeeper config option to 
> disable ensemble tracking for ZooKeeper.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31780) Allow users to disable "Ensemble tracking" for ZooKeeper

2023-04-12 Thread Oleksandr Nitavskyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Nitavskyi updated FLINK-31780:

Description: 
In Apache Curator an option to skip ensemble tracking was added since version 
5.0.0 ([CURATOR-568|https://issues.apache.org/jira/browse/CURATOR-568])

This can be useful in certain scenarios in which CuratorFramework is accessing 
to ZK clusters via load balancer or Virtual IPs. 
Thus in case Zookeeper of Flink user is running behind LB or Virtual IP 
ensemble tracking could be disabled too.

Moreover enabled ensemble tracking could lead to NPE and thus Flink cluster 
failure, even if only single node is not available, see fix in Apache Curator: 
https://github.com/apache/curator/pull/433.

*Acceptance Criteria:* Users of Apache Flink has a Zookeeper config option to 
disable ensemble tracking for ZooKeeper.

  was:
In Apache Curator an option to skip ensemble tracking was added since version 
5.0.0 ([CURATOR-568|https://issues.apache.org/jira/browse/CURATOR-568])

This can be useful in certain scenarios in which CuratorFramework is accessing 
to ZK clusters via load balancer or Virtual IPs. 
Thus in case Zookeeper of Flink user is running behind LB or Virtual IP 
ensemble tracking could be disabled too.

*Acceptance Criteria:* Users of Apache Flink has a Zookeeper config option to 
disable ensemble tracking for ZooKeeper.


> Allow users to disable "Ensemble tracking" for ZooKeeper
> 
>
> Key: FLINK-31780
> URL: https://issues.apache.org/jira/browse/FLINK-31780
> Project: Flink
>  Issue Type: Bug
>Reporter: Oleksandr Nitavskyi
>Priority: Major
>
> In Apache Curator an option to skip ensemble tracking was added since version 
> 5.0.0 ([CURATOR-568|https://issues.apache.org/jira/browse/CURATOR-568])
> This can be useful in certain scenarios in which CuratorFramework is 
> accessing to ZK clusters via load balancer or Virtual IPs. 
> Thus in case Zookeeper of Flink user is running behind LB or Virtual IP 
> ensemble tracking could be disabled too.
> Moreover enabled ensemble tracking could lead to NPE and thus Flink cluster 
> failure, even if only single node is not available, see fix in Apache 
> Curator: https://github.com/apache/curator/pull/433.
> *Acceptance Criteria:* Users of Apache Flink has a Zookeeper config option to 
> disable ensemble tracking for ZooKeeper.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31780) Allow users to enable Ensemble tracking for ZooKeeper

2023-04-12 Thread Oleksandr Nitavskyi (Jira)
Oleksandr Nitavskyi created FLINK-31780:
---

 Summary: Allow users to enable Ensemble tracking for ZooKeeper
 Key: FLINK-31780
 URL: https://issues.apache.org/jira/browse/FLINK-31780
 Project: Flink
  Issue Type: Bug
Reporter: Oleksandr Nitavskyi


In Apache Curator an option to skip ensemble tracking was added since version 
5.0.0 ([CURATOR-568|https://issues.apache.org/jira/browse/CURATOR-568])

This can be useful in certain scenarios in which CuratorFramework is accessing 
to ZK clusters via load balancer or Virtual IPs. 
Thus in case Zookeeper of Flink user is running behind LB or Virtual IP 
ensemble tracking could be disabled too.

*Acceptance Criteria:* Users of Apache Flink has a Zookeeper config option to 
disable ensemble tracking for ZooKeeper.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31780) Allow users to disable "Ensemble tracking" for ZooKeeper

2023-04-12 Thread Oleksandr Nitavskyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Nitavskyi updated FLINK-31780:

Summary: Allow users to disable "Ensemble tracking" for ZooKeeper  (was: 
Allow users to enable Ensemble tracking for ZooKeeper)

> Allow users to disable "Ensemble tracking" for ZooKeeper
> 
>
> Key: FLINK-31780
> URL: https://issues.apache.org/jira/browse/FLINK-31780
> Project: Flink
>  Issue Type: Bug
>Reporter: Oleksandr Nitavskyi
>Priority: Major
>
> In Apache Curator an option to skip ensemble tracking was added since version 
> 5.0.0 ([CURATOR-568|https://issues.apache.org/jira/browse/CURATOR-568])
> This can be useful in certain scenarios in which CuratorFramework is 
> accessing to ZK clusters via load balancer or Virtual IPs. 
> Thus in case Zookeeper of Flink user is running behind LB or Virtual IP 
> ensemble tracking could be disabled too.
> *Acceptance Criteria:* Users of Apache Flink has a Zookeeper config option to 
> disable ensemble tracking for ZooKeeper.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (FLINK-29242) Read time out when close write channel [flink-gs-fs-hadoop ]

2023-03-30 Thread Oleksandr Nitavskyi (Jira)


[ https://issues.apache.org/jira/browse/FLINK-29242 ]


Oleksandr Nitavskyi deleted comment on FLINK-29242:
-

was (Author: oleksandr nitavskyi):
While it seems that Flink can do something to be more resilient for such type 
of errors, here is a link to probably related issue in Google Cloud tracker: 
https://issuetracker.google.com/issues/191071342?pli=1 

> Read time out when close write channel [flink-gs-fs-hadoop ]
> 
>
> Key: FLINK-29242
> URL: https://issues.apache.org/jira/browse/FLINK-29242
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystems
>Affects Versions: 1.15.0
> Environment: flink version: 1.15
> jdk: 1.8
>  
>Reporter: Jian Zheng
>Priority: Major
>
> h2. Detail
> See in GSBlobStorageImpl
> {code:java}
> @Override
> public int write(byte[] content, int start, int length) throws IOException {
> LOGGER.trace("Writing {} bytes to blob {}", length, blobIdentifier);
> Preconditions.checkNotNull(content);
> Preconditions.checkArgument(start >= 0);
> Preconditions.checkArgument(length >= 0);
> ByteBuffer byteBuffer = ByteBuffer.wrap(content, start, length);
> int written = writeChannel.write(byteBuffer);
> LOGGER.trace("Wrote {} bytes to blob {}", written, blobIdentifier);
> return written;
> }
> @Override
> public void close() throws IOException {
> LOGGER.trace("Closing write channel to blob {}", blobIdentifier);
> writeChannel.close();
> } {code}
> when I write data into google cloud storage by flink-gs-fs-haddoop.
> The service always has read time out exceptions, which can be reproduced in a 
> very short time of task execution. 
> I tried to trace the code and found that it always occurs when the 
> writeChannel Close code is executed. I tried retrying by modifying the source 
> code but it didn't solve the problem, the timeout is 20s and the checkpoint 
> will fail if this problem occurs.
> I tried to change the chunk size but found no help, with this component, I 
> can't write data to gcs via flink.
>  
> By the way, I found that 503 service unavailable occurs when create 
> writeChannel. This problem occurs less often than Read time out, but it needs 
> to be checked
> {code:java}
> @Override
> public GSBlobStorage.WriteChannel writeBlob(GSBlobIdentifier blobIdentifier) {
> LOGGER.trace("Creating writeable blob for identifier {}", blobIdentifier);
> Preconditions.checkNotNull(blobIdentifier);
> BlobInfo blobInfo = 
> BlobInfo.newBuilder(blobIdentifier.getBlobId()).build();
> com.google.cloud.WriteChannel writeChannel = storage.writer(blobInfo);
> return new WriteChannel(blobIdentifier, writeChannel);
> }
> @Override
> public GSBlobStorage.WriteChannel writeBlob(
> GSBlobIdentifier blobIdentifier, MemorySize chunkSize) {
> LOGGER.trace(
> "Creating writeable blob for identifier {} with chunk size {}",
> blobIdentifier,
> chunkSize);
> Preconditions.checkNotNull(blobIdentifier);
> Preconditions.checkArgument(chunkSize.getBytes() > 0);
> BlobInfo blobInfo = 
> BlobInfo.newBuilder(blobIdentifier.getBlobId()).build();
> com.google.cloud.WriteChannel writeChannel = storage.writer(blobInfo);
> writeChannel.setChunkSize((int) chunkSize.getBytes());
> return new WriteChannel(blobIdentifier, writeChannel);
> } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29242) Read time out when close write channel [flink-gs-fs-hadoop ]

2023-03-30 Thread Oleksandr Nitavskyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17706863#comment-17706863
 ] 

Oleksandr Nitavskyi commented on FLINK-29242:
-

While it seems that Flink can do something to be more resilient for such type 
of errors, here is a link to probably related issue in Google Cloud tracker: 
https://issuetracker.google.com/issues/191071342?pli=1 

> Read time out when close write channel [flink-gs-fs-hadoop ]
> 
>
> Key: FLINK-29242
> URL: https://issues.apache.org/jira/browse/FLINK-29242
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystems
>Affects Versions: 1.15.0
> Environment: flink version: 1.15
> jdk: 1.8
>  
>Reporter: Jian Zheng
>Priority: Major
>
> h2. Detail
> See in GSBlobStorageImpl
> {code:java}
> @Override
> public int write(byte[] content, int start, int length) throws IOException {
> LOGGER.trace("Writing {} bytes to blob {}", length, blobIdentifier);
> Preconditions.checkNotNull(content);
> Preconditions.checkArgument(start >= 0);
> Preconditions.checkArgument(length >= 0);
> ByteBuffer byteBuffer = ByteBuffer.wrap(content, start, length);
> int written = writeChannel.write(byteBuffer);
> LOGGER.trace("Wrote {} bytes to blob {}", written, blobIdentifier);
> return written;
> }
> @Override
> public void close() throws IOException {
> LOGGER.trace("Closing write channel to blob {}", blobIdentifier);
> writeChannel.close();
> } {code}
> when I write data into google cloud storage by flink-gs-fs-haddoop.
> The service always has read time out exceptions, which can be reproduced in a 
> very short time of task execution. 
> I tried to trace the code and found that it always occurs when the 
> writeChannel Close code is executed. I tried retrying by modifying the source 
> code but it didn't solve the problem, the timeout is 20s and the checkpoint 
> will fail if this problem occurs.
> I tried to change the chunk size but found no help, with this component, I 
> can't write data to gcs via flink.
>  
> By the way, I found that 503 service unavailable occurs when create 
> writeChannel. This problem occurs less often than Read time out, but it needs 
> to be checked
> {code:java}
> @Override
> public GSBlobStorage.WriteChannel writeBlob(GSBlobIdentifier blobIdentifier) {
> LOGGER.trace("Creating writeable blob for identifier {}", blobIdentifier);
> Preconditions.checkNotNull(blobIdentifier);
> BlobInfo blobInfo = 
> BlobInfo.newBuilder(blobIdentifier.getBlobId()).build();
> com.google.cloud.WriteChannel writeChannel = storage.writer(blobInfo);
> return new WriteChannel(blobIdentifier, writeChannel);
> }
> @Override
> public GSBlobStorage.WriteChannel writeBlob(
> GSBlobIdentifier blobIdentifier, MemorySize chunkSize) {
> LOGGER.trace(
> "Creating writeable blob for identifier {} with chunk size {}",
> blobIdentifier,
> chunkSize);
> Preconditions.checkNotNull(blobIdentifier);
> Preconditions.checkArgument(chunkSize.getBytes() > 0);
> BlobInfo blobInfo = 
> BlobInfo.newBuilder(blobIdentifier.getBlobId()).build();
> com.google.cloud.WriteChannel writeChannel = storage.writer(blobInfo);
> writeChannel.setChunkSize((int) chunkSize.getBytes());
> return new WriteChannel(blobIdentifier, writeChannel);
> } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-17505) Merge small files produced by StreamingFileSink

2021-11-15 Thread Oleksandr Nitavskyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17443767#comment-17443767
 ] 

Oleksandr Nitavskyi commented on FLINK-17505:
-

Another workaround for this problem is writing data with Iceberg and 
consolidating later with Iceberg capabilities. It is quite easy to set up since 
Flink supports Iceberg output. Also, it seems to be an industry-proof setup.

Probably this ticket can be closed

> Merge small files produced by StreamingFileSink
> ---
>
> Key: FLINK-17505
> URL: https://issues.apache.org/jira/browse/FLINK-17505
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.10.0
>Reporter: Piotr Nowojski
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> This an alternative approach to FLINK-11499, to solve a problem of creating 
> many small files with bulk formats in StreamingFileSink (which have to be 
> rolled on checkpoint).
> Merge based approach would require converting {{StreamingFileSink}} from a 
> sink, to an operator, that would be working exactly as it’s working right 
> now, with the same limitations (no support for arbitrary rolling policies for 
> bulk formats), followed by another operator that would be tasked with merging 
> small files in the background. 
> In the long term we probably would like to have both merge operator and write 
> ahead log solution (WAL described in FLINK-11499) as alternatives, as WAL 
> would behave better if small files are more common, and merge operator could 
> behave better if small files are rare (because of data skew for example).



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-17505) Merge small files produced by StreamingFileSink

2020-06-17 Thread Oleksandr Nitavskyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17138437#comment-17138437
 ] 

Oleksandr Nitavskyi commented on FLINK-17505:
-

8. And for sure open question is about the integration points to the existing 
code. Should it be totally separated in the separate Flink Operator or do we 
want to keep this as a separate Bucket type for example.

> Merge small files produced by StreamingFileSink
> ---
>
> Key: FLINK-17505
> URL: https://issues.apache.org/jira/browse/FLINK-17505
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.10.0
>Reporter: Piotr Nowojski
>Priority: Major
>
> This an alternative approach to FLINK-11499, to solve a problem of creating 
> many small files with bulk formats in StreamingFileSink (which have to be 
> rolled on checkpoint).
> Merge based approach would require converting {{StreamingFileSink}} from a 
> sink, to an operator, that would be working exactly as it’s working right 
> now, with the same limitations (no support for arbitrary rolling policies for 
> bulk formats), followed by another operator that would be tasked with merging 
> small files in the background. 
> In the long term we probably would like to have both merge operator and write 
> ahead log solution (WAL described in FLINK-11499) as alternatives, as WAL 
> would behave better if small files are more common, and merge operator could 
> behave better if small files are rare (because of data skew for example).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-17505) Merge small files produced by StreamingFileSink

2020-06-17 Thread Oleksandr Nitavskyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17136731#comment-17136731
 ] 

Oleksandr Nitavskyi edited comment on FLINK-17505 at 6/17/20, 10:02 AM:


MergeOperartor (ConsolidationOperator) will not be able to replace files 
atomically(at least on HDFS), so some isolation can be violated. For this the 
possible solution would be to produce data to some temp directory (aka [temp 
bucket 
|https://docs.google.com/document/d/1VOT12gPctXew58EyXqIEnNV2N3IMiWFYSL4b41oGQ-c/edit#heading=h.kpgruelafx8h]),
 then the Consolidation operator will merge small files depending on the format 
and move files to the final destination in the background. Unfortunately for S3 
FS, it would require the copying of the data.
Also even in case if the merge will not change any files and simply moves files 
to the final destination we can a useful feature to the FileStreeamingSink 
outputs. Since currently, any consumers of the files produced by Flink should 
filter files without suffixes(which neither .in-progress no .pending). Probably 
we want to move this logic on the Flink side.

*Problems:*
1. When to start file consolidation? Ideally, we want to perform merge 
iteration once the files were renamed from pending. Which is performed once the 
checkpoint is done or upon the recovery. But it is not obvious how reliably 
react to such events in another operator. So we probably want to merge files 
periodically on some timer with some configurable period (probably similar to 
the checkpoint interval). As an alternative, we can merge files instead of 
committing from pending state. But it would require changes inside the Bucket 
class

2. When files should actually be merged? There are at least two cases when 
files should be merged together and moved to the final directory:
* The desirable size of the input files is achieved.
* The bucket is closed. E.g. in case of time series export we probably 
should be able to compare time associated with a bucket and current watermark 
(if any). So it should be decided by bucketAssigner and bucketContext.

3. When files should be moved?
* Once they achieve desired file size (or when they were actually merged by 
achieving the desirable input files size)
* When the bucket is actually closed. E.g. it is a time series bucket and 
BucketAssigner with a bucket context can suppose that the bucket is closed. 
More detailed thoughts about Meta-Info has been precise in this technical doc 
https://docs.google.com/document/d/1VOT12gPctXew58EyXqIEnNV2N3IMiWFYSL4b41oGQ-c/edit#heading=h.b8ibdmatt6t3
* When the bucket is not yet closed (or never will be), but certain 
inactivity time has passed. 

4. How to handle the failures? MergeOperator should perform merging and then 
move merged files to the final directories. Since this operation cannot be made 
atomically and mutates the state on FS we should ensure idempotence of the 
merge/move/source removal operation. For this, we can store some state 
describing the mutation plan of the input files. We can use Flink State for 
this or persist the transaction plan on output FS.

5. How to share files from different slots for merging? We probably want to 
keep the same parallelism as FileStreamingSink. And MergeOperators should 
consider only the files produced by the Sink from the same slot. In this case 
on bucket closing if we want to keep the optimal output size we should make 
another consolidation strategy. So in order to keep efficiency, we want to 
perform merge operations in parallel.

6. How to discover files which should be merged? Such files are known by Bucket 
class. A possible solution is to forward all newly created filenames to the 
MergeOperator. Another solution is simply to list open buckets periodically. In 
case we have high parallelism we risk creating unnecessary load on the 
underlying file system. So for this operation, we would prefer to have a 
parallelism = 1.

7. Should we split files if they are too big? Probably the problem of the big 
files should be addressed by the proper Checkpoint Policy.


was (Author: oleksandr nitavskyi):
MergeOperartor (ConsolidationOperator) will not be able to replace files 
atomically(at least on HDFS), so some isolation can be violated. For this the 
possible solution would be to produce data to some temp directory (aka [temp 
bucket 
|https://docs.google.com/document/d/1VOT12gPctXew58EyXqIEnNV2N3IMiWFYSL4b41oGQ-c/edit#heading=h.kpgruelafx8h]),
 then the Consolidation operator will merge small files depending on the format 
and move files to the final destination in the background. Unfortunately for S3 
FS, it would require the copying of the data.
Also even in case if the merge will not change any files and simply moves files 
to the final destination we can a useful feature to the FileStreeamingSink 
outputs. Since currently, any 

[jira] [Commented] (FLINK-17505) Merge small files produced by StreamingFileSink

2020-06-16 Thread Oleksandr Nitavskyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17136731#comment-17136731
 ] 

Oleksandr Nitavskyi commented on FLINK-17505:
-

MergeOperartor (ConsolidationOperator) will not be able to replace files 
atomically(at least on HDFS), so some isolation can be violated. For this the 
possible solution would be to produce data to some temp directory (aka [temp 
bucket 
|https://docs.google.com/document/d/1VOT12gPctXew58EyXqIEnNV2N3IMiWFYSL4b41oGQ-c/edit#heading=h.kpgruelafx8h]),
 then the Consolidation operator will merge small files depending on the format 
and move files to the final destination in the background. Unfortunately for S3 
FS, it would require the copying of the data.
Also even in case if the merge will not change any files and simply moves files 
to the final destination we can a useful feature to the FileStreeamingSink 
outputs. Since currently, any consumers of the files produced by Flink should 
filter files without suffixes(which neither .in-progress no .pending). Probably 
we want to move this logic on the Flink side.

Problems:
1. When to start file consolidation? Ideally, we want to perform merge 
iteration once the files were renamed from pending. Which is performed once the 
checkpoint is done or upon the recovery. But it is not obvious how reliably 
react to such events in another operator. So we probably want to merge files 
periodically on some timer with some configurable period (probably similar to 
the checkpoint interval).
2. When files should actually be merged? There are at least two cases when 
files should be merged together and moved to the final directory:
1. The desirable size of the input files is achieved.
2. The bucket is closed. E.g. in case of time series export we probably 
should be able to compare time associated with a bucket and current watermark 
(if any). So it should be decided by bucketAssigner and bucketContext.
3. When files should be moved?
* Once they achieve desired file size (or when they were actually merged by 
achieving the desirable input files size)
* When the bucket is actually closed. E.g. it is a time series bucket and 
BucketAssigner with a bucket context can suppose that the bucket is closed. 
More detailed thoughts about Meta-Info has been precise in this technical doc 
https://docs.google.com/document/d/1VOT12gPctXew58EyXqIEnNV2N3IMiWFYSL4b41oGQ-c/edit#heading=h.b8ibdmatt6t3
* When the bucket is not yet closed (or never will be), but certain 
inactivity time has passed. 
4. How to handle the failures? MergeOperator should perform merging and then 
move merged files to the final directories. Since this operation cannot be made 
atomically and mutates the state on FS we should ensure idempotence of the 
merge/move/source removal operation. For this, we can store some state 
describing the mutation plan of the input files. We can use Flink State for 
this or persist the transaction plan on output FS.
5. How to share files from different slots for merging? We probably want to 
keep the same parallelism as FileStreamingSink. And MergeOperators should 
consider only the files produced by the Sink from the same slot. In this case 
on bucket closing if we want to keep the optimal output size we should make 
another consolidation strategy. So in order to keep efficiency, we want to 
perform merge operations in parallel.
6. How to discover files which should be merged? Such files are known by Bucket 
class. A possible solution is to forward all newly created filenames to the 
MergeOperator. Another solution is simply to list open buckets periodically. In 
case we have high parallelism we risk creating unnecessary load on the 
underlying file system. So for this operation, we would prefer to have a 
parallelism = 1.
7. Should we split files if they are too big? Probably the problem of the big 
files should be addressed by the proper Checkpoint Policy.

> Merge small files produced by StreamingFileSink
> ---
>
> Key: FLINK-17505
> URL: https://issues.apache.org/jira/browse/FLINK-17505
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.10.0
>Reporter: Piotr Nowojski
>Priority: Major
>
> This an alternative approach to FLINK-11499, to solve a problem of creating 
> many small files with bulk formats in StreamingFileSink (which have to be 
> rolled on checkpoint).
> Merge based approach would require converting {{StreamingFileSink}} from a 
> sink, to an operator, that would be working exactly as it’s working right 
> now, with the same limitations (no support for arbitrary rolling policies for 
> bulk formats), followed by another operator that would be tasked with merging 
> small files in the background. 
> In the long term we 

[jira] [Comment Edited] (FLINK-17505) Merge small files produced by StreamingFileSink

2020-06-16 Thread Oleksandr Nitavskyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17136731#comment-17136731
 ] 

Oleksandr Nitavskyi edited comment on FLINK-17505 at 6/16/20, 3:19 PM:
---

MergeOperartor (ConsolidationOperator) will not be able to replace files 
atomically(at least on HDFS), so some isolation can be violated. For this the 
possible solution would be to produce data to some temp directory (aka [temp 
bucket 
|https://docs.google.com/document/d/1VOT12gPctXew58EyXqIEnNV2N3IMiWFYSL4b41oGQ-c/edit#heading=h.kpgruelafx8h]),
 then the Consolidation operator will merge small files depending on the format 
and move files to the final destination in the background. Unfortunately for S3 
FS, it would require the copying of the data.
Also even in case if the merge will not change any files and simply moves files 
to the final destination we can a useful feature to the FileStreeamingSink 
outputs. Since currently, any consumers of the files produced by Flink should 
filter files without suffixes(which neither .in-progress no .pending). Probably 
we want to move this logic on the Flink side.

*Problems:*
1. When to start file consolidation? Ideally, we want to perform merge 
iteration once the files were renamed from pending. Which is performed once the 
checkpoint is done or upon the recovery. But it is not obvious how reliably 
react to such events in another operator. So we probably want to merge files 
periodically on some timer with some configurable period (probably similar to 
the checkpoint interval).

2. When files should actually be merged? There are at least two cases when 
files should be merged together and moved to the final directory:
* The desirable size of the input files is achieved.
* The bucket is closed. E.g. in case of time series export we probably 
should be able to compare time associated with a bucket and current watermark 
(if any). So it should be decided by bucketAssigner and bucketContext.

3. When files should be moved?
* Once they achieve desired file size (or when they were actually merged by 
achieving the desirable input files size)
* When the bucket is actually closed. E.g. it is a time series bucket and 
BucketAssigner with a bucket context can suppose that the bucket is closed. 
More detailed thoughts about Meta-Info has been precise in this technical doc 
https://docs.google.com/document/d/1VOT12gPctXew58EyXqIEnNV2N3IMiWFYSL4b41oGQ-c/edit#heading=h.b8ibdmatt6t3
* When the bucket is not yet closed (or never will be), but certain 
inactivity time has passed. 

4. How to handle the failures? MergeOperator should perform merging and then 
move merged files to the final directories. Since this operation cannot be made 
atomically and mutates the state on FS we should ensure idempotence of the 
merge/move/source removal operation. For this, we can store some state 
describing the mutation plan of the input files. We can use Flink State for 
this or persist the transaction plan on output FS.

5. How to share files from different slots for merging? We probably want to 
keep the same parallelism as FileStreamingSink. And MergeOperators should 
consider only the files produced by the Sink from the same slot. In this case 
on bucket closing if we want to keep the optimal output size we should make 
another consolidation strategy. So in order to keep efficiency, we want to 
perform merge operations in parallel.

6. How to discover files which should be merged? Such files are known by Bucket 
class. A possible solution is to forward all newly created filenames to the 
MergeOperator. Another solution is simply to list open buckets periodically. In 
case we have high parallelism we risk creating unnecessary load on the 
underlying file system. So for this operation, we would prefer to have a 
parallelism = 1.

7. Should we split files if they are too big? Probably the problem of the big 
files should be addressed by the proper Checkpoint Policy.


was (Author: oleksandr nitavskyi):
MergeOperartor (ConsolidationOperator) will not be able to replace files 
atomically(at least on HDFS), so some isolation can be violated. For this the 
possible solution would be to produce data to some temp directory (aka [temp 
bucket 
|https://docs.google.com/document/d/1VOT12gPctXew58EyXqIEnNV2N3IMiWFYSL4b41oGQ-c/edit#heading=h.kpgruelafx8h]),
 then the Consolidation operator will merge small files depending on the format 
and move files to the final destination in the background. Unfortunately for S3 
FS, it would require the copying of the data.
Also even in case if the merge will not change any files and simply moves files 
to the final destination we can a useful feature to the FileStreeamingSink 
outputs. Since currently, any consumers of the files produced by Flink should 
filter files without suffixes(which neither .in-progress no .pending). Probably 
we 

[jira] [Updated] (FLINK-10203) Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream

2020-04-14 Thread Oleksandr Nitavskyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-10203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Nitavskyi updated FLINK-10203:

Description: 
New StreamingFileSink ( introduced in 1.6 Flink version ) use 
HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS.

HadoopRecoverableFsDataOutputStream is a wrapper for FSDataOutputStream to have 
an ability to restore from a certain point of the file after failure and 
continue to write data. To achieve this recovery functionality the 
HadoopRecoverableFsDataOutputStream uses "truncate" method which was introduced 
only in Hadoop 2.7. 
FLINK-14170 has enabled the usage of StreamingFileSink for 
OnCheckpointRollingPolicy, but it is still not possible to use 
StreamingFileSink with DefaultRollingPolicy, which makes writing of the data to 
HDFS unpractical in scale for HDFS < 2.7.

Unfortunately, there are a few official Hadoop distributives which latest 
version still use Hadoop 2.6 (This distributives: Cloudera, Pivotal HD ). As 
the result Flinks Hadoop connector can't work with this distributives.

Flink declares that supported Hadoop from version 2.4.0 upwards 
([https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions])

I guess we should emulate the functionality of "truncate" method for older 
Hadoop versions.

  was:
New StreamingFileSink ( introduced in 1.6 Flink version ) use 
HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS.

HadoopRecoverableFsDataOutputStream is a wrapper for FSDataOutputStream to have 
an ability to restore from certain point of file after failure and continue 
write data. To achieve this recover functionality the 
HadoopRecoverableFsDataOutputStream use "truncate" method which was introduced 
only in Hadoop 2.7 .

Unfortunately there are a few official Hadoop distributive which latest version 
still use Hadoop 2.6 (This distributives: Cloudera, Pivotal HD ). As the result 
Flinks Hadoop connector can't work with this distributives.

Flink declares that supported Hadoop from version 2.4.0 upwards 
([https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions])

I guess we should emulate the functionality of "truncate" method for older 
Hadoop versions.


> Support truncate method for old Hadoop versions in 
> HadoopRecoverableFsDataOutputStream
> --
>
> Key: FLINK-10203
> URL: https://issues.apache.org/jira/browse/FLINK-10203
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / FileSystem
>Affects Versions: 1.6.0, 1.6.1, 1.7.0
>Reporter: Artsem Semianenka
>Assignee: Artsem Semianenka
>Priority: Major
>  Labels: pull-request-available
> Attachments: legacy truncate logic.pdf
>
>
> New StreamingFileSink ( introduced in 1.6 Flink version ) use 
> HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS.
> HadoopRecoverableFsDataOutputStream is a wrapper for FSDataOutputStream to 
> have an ability to restore from a certain point of the file after failure and 
> continue to write data. To achieve this recovery functionality the 
> HadoopRecoverableFsDataOutputStream uses "truncate" method which was 
> introduced only in Hadoop 2.7. 
> FLINK-14170 has enabled the usage of StreamingFileSink for 
> OnCheckpointRollingPolicy, but it is still not possible to use 
> StreamingFileSink with DefaultRollingPolicy, which makes writing of the data 
> to HDFS unpractical in scale for HDFS < 2.7.
> Unfortunately, there are a few official Hadoop distributives which latest 
> version still use Hadoop 2.6 (This distributives: Cloudera, Pivotal HD ). As 
> the result Flinks Hadoop connector can't work with this distributives.
> Flink declares that supported Hadoop from version 2.4.0 upwards 
> ([https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions])
> I guess we should emulate the functionality of "truncate" method for older 
> Hadoop versions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (FLINK-11396) MetricStore creates significant GC pressure

2020-02-21 Thread Oleksandr Nitavskyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-11396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Nitavskyi resolved FLINK-11396.
-
Fix Version/s: 1.9.0
   Resolution: Fixed

> MetricStore creates significant GC pressure
> ---
>
> Key: FLINK-11396
> URL: https://issues.apache.org/jira/browse/FLINK-11396
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics
>Reporter: Oleksandr Nitavskyi
>Priority: Major
> Fix For: 1.9.0
>
> Attachments: GC_parallel_flink_job_manager.log, g1.txt, gc_example.txt
>
>
> On Flink version 1.6.x we observe a significant increase of the latency in 
> UI. 
> After performance profiling, we have concluded that during UI rendering 
> back-end spends 50% of the time on GC pauses, which means that Flink 1.6.x is 
> not friendly with G1 GC ergonomics configurations. 
> On Flink side, MetricStore creates a huge amount of short living objects for 
> our job, which provokes a lot of Young GC pauses for a non-small job, with 
> around 50 operators and 120 parallelisms. 
> Samples of GC logs are in attach.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11396) MetricStore creates significant GC pressure

2020-02-21 Thread Oleksandr Nitavskyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17041971#comment-17041971
 ] 

Oleksandr Nitavskyi commented on FLINK-11396:
-

Sorry, we still didn't migrate anything significant on the new UI.
Let's close this ticket since UI has been significantly changed?
In case we see some problems again we can always reopen with more details :)

> MetricStore creates significant GC pressure
> ---
>
> Key: FLINK-11396
> URL: https://issues.apache.org/jira/browse/FLINK-11396
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics
>Reporter: Oleksandr Nitavskyi
>Priority: Major
> Attachments: GC_parallel_flink_job_manager.log, g1.txt, gc_example.txt
>
>
> On Flink version 1.6.x we observe a significant increase of the latency in 
> UI. 
> After performance profiling, we have concluded that during UI rendering 
> back-end spends 50% of the time on GC pauses, which means that Flink 1.6.x is 
> not friendly with G1 GC ergonomics configurations. 
> On Flink side, MetricStore creates a huge amount of short living objects for 
> our job, which provokes a lot of Young GC pauses for a non-small job, with 
> around 50 operators and 120 parallelisms. 
> Samples of GC logs are in attach.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15904) Make Kafka Consumer work with activated "disableGenericTypes()"

2020-02-19 Thread Oleksandr Nitavskyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17040142#comment-17040142
 ] 

Oleksandr Nitavskyi commented on FLINK-15904:
-

made a PR, but cannot assign the Jira ticket on me :(

> Make Kafka Consumer work with activated "disableGenericTypes()"
> ---
>
> Key: FLINK-15904
> URL: https://issues.apache.org/jira/browse/FLINK-15904
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Reporter: Aljoscha Krettek
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> A user reported a problem that the Kafka Consumer doesn't work in that case: 
> https://lists.apache.org/thread.html/r462a854e8a0ab3512e2906b40411624f3164ea3af7cba61ee94cd760%40%3Cuser.flink.apache.org%3E.
>  We should use a different constructor for {{ListStateDescriptor}} that takes 
> {{TypeSerializer}} here: 
> https://github.com/apache/flink/blob/68cc21e4af71505efa142110e35a1f8b1c25fe6e/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L860.
>  This will circumvent the check.
> My full analysis from the email thread:
> {quote}
> Unfortunately, the fact that the Kafka Sources use Kryo for state 
> serialization is a very early design misstep that we cannot get rid of for 
> now. We will get rid of that when the new source interface lands ([1]) and 
> when we have a new Kafka Source based on that.
> As a workaround, we should change the Kafka Consumer to go through a 
> different constructor of ListStateDescriptor which directly takes a 
> TypeSerializer instead of a TypeInformation here: [2]. This should sidestep 
> the "no generic types" check.
> [1] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> [2] 
> https://github.com/apache/flink/blob/68cc21e4af71505efa142110e35a1f8b1c25fe6e/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L860
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-11911) KafkaTopicPartition is not a valid POJO

2020-01-30 Thread Oleksandr Nitavskyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17026852#comment-17026852
 ] 

Oleksandr Nitavskyi edited comment on FLINK-11911 at 1/30/20 5:19 PM:
--

What if we try to use custom TypeInformationFactory 
(https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#defining-type-information-using-a-factory),
 where we can fallback to Kryo if we deserialize old Kryo state?


was (Author: oleksandr nitavskyi):
Sure, makes sense.
What if we try to use custom TypeInformationFactory 
(https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#defining-type-information-using-a-factory),
 where we can fallback to Kryo if we deserialize old Kryo state?

> KafkaTopicPartition is not a valid POJO
> ---
>
> Key: FLINK-11911
> URL: https://issues.apache.org/jira/browse/FLINK-11911
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Fokko Driesprong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> KafkaTopicPartition is not a POJO, and therefore it cannot be serialized 
> efficiently. This is using the KafkaDeserializationSchema.
> When enforcing POJO's:
> ```
> java.lang.UnsupportedOperationException: Generic types have been disabled in 
> the ExecutionConfig and type 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition is 
> treated as a generic type.
>   at 
> org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
>   at 
> org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:107)
>   at 
> org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:52)
>   at 
> org.apache.flink.api.java.typeutils.ListTypeInfo.createSerializer(ListTypeInfo.java:102)
>   at 
> org.apache.flink.api.common.state.StateDescriptor.initializeSerializerUnlessSet(StateDescriptor.java:288)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getListState(DefaultOperatorStateBackend.java:289)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getUnionListState(DefaultOperatorStateBackend.java:219)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:856)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:748)
> ```
> And in the logs:
> ```
> 2019-03-13 16:41:28,217 INFO  
> org.apache.flink.api.java.typeutils.TypeExtractor - class 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition 
> does not contain a setter for field topic
> 2019-03-13 16:41:28,221 INFO  
> org.apache.flink.api.java.typeutils.TypeExtractor - Class class 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition 
> cannot be used as a POJO type because not all fields are valid POJO fields, 
> and must be processed as GenericType. Please read the Flink documentation on 
> "Data Types & Serialization" for details of the effect on performance.
> ```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11911) KafkaTopicPartition is not a valid POJO

2020-01-30 Thread Oleksandr Nitavskyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17026852#comment-17026852
 ] 

Oleksandr Nitavskyi commented on FLINK-11911:
-

Sure, makes sense.
What if we try to use custom TypeInformationFactory 
(https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#defining-type-information-using-a-factory),
 where we can fallback to Kryo if we deserialize old Kryo state?

> KafkaTopicPartition is not a valid POJO
> ---
>
> Key: FLINK-11911
> URL: https://issues.apache.org/jira/browse/FLINK-11911
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.8.0
>Reporter: Fokko Driesprong
>Assignee: Fokko Driesprong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> KafkaTopicPartition is not a POJO, and therefore it cannot be serialized 
> efficiently. This is using the KafkaDeserializationSchema.
> When enforcing POJO's:
> ```
> java.lang.UnsupportedOperationException: Generic types have been disabled in 
> the ExecutionConfig and type 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition is 
> treated as a generic type.
>   at 
> org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
>   at 
> org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:107)
>   at 
> org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:52)
>   at 
> org.apache.flink.api.java.typeutils.ListTypeInfo.createSerializer(ListTypeInfo.java:102)
>   at 
> org.apache.flink.api.common.state.StateDescriptor.initializeSerializerUnlessSet(StateDescriptor.java:288)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getListState(DefaultOperatorStateBackend.java:289)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getUnionListState(DefaultOperatorStateBackend.java:219)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:856)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:748)
> ```
> And in the logs:
> ```
> 2019-03-13 16:41:28,217 INFO  
> org.apache.flink.api.java.typeutils.TypeExtractor - class 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition 
> does not contain a setter for field topic
> 2019-03-13 16:41:28,221 INFO  
> org.apache.flink.api.java.typeutils.TypeExtractor - Class class 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition 
> cannot be used as a POJO type because not all fields are valid POJO fields, 
> and must be processed as GenericType. Please read the Flink documentation on 
> "Data Types & Serialization" for details of the effect on performance.
> ```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-12489) Flink on Mesos - Parameterize network resources.

2019-05-13 Thread Oleksandr Nitavskyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Nitavskyi reassigned FLINK-12489:
---

Assignee: Oleksandr Nitavskyi

> Flink on Mesos - Parameterize network resources.
> 
>
> Key: FLINK-12489
> URL: https://issues.apache.org/jira/browse/FLINK-12489
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Mesos
>Reporter: Oleksandr Nitavskyi
>Assignee: Oleksandr Nitavskyi
>Priority: Minor
>
> Mesos supports network resource parameters. It would be nice if Flink can 
> specify network resources consumption.
> Unfortunately, network resource is not standarized in Mesos, so fenzo name: 
> "network" should be customized.
> Thus we can introduce two parameters:
> 1. Network Bandwidth in mb.
> 2. Name of the network resource in mesos, where "network" will be the value 
> by default.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12489) Flink on Mesos - Parameterize network resources.

2019-05-10 Thread Oleksandr Nitavskyi (JIRA)
Oleksandr Nitavskyi created FLINK-12489:
---

 Summary: Flink on Mesos - Parameterize network resources.
 Key: FLINK-12489
 URL: https://issues.apache.org/jira/browse/FLINK-12489
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / Mesos
Reporter: Oleksandr Nitavskyi


Mesos supports network resource parameters. It would be nice if Flink can 
specify network resources consumption.
Unfortunately, network resource is not standarized in Mesos, so fenzo name: 
"network" should be customized.
Thus we can introduce two parameters:
1. Network Bandwidth in mb.
2. Name of the network resource in mesos, where "network" will be the value by 
default.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-12139) Flink on mesos - Parameterize disk space needed.

2019-04-09 Thread Oleksandr Nitavskyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Nitavskyi reassigned FLINK-12139:
---

Assignee: Oleksandr Nitavskyi

> Flink on mesos - Parameterize disk space needed.
> 
>
> Key: FLINK-12139
> URL: https://issues.apache.org/jira/browse/FLINK-12139
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Mesos
>Reporter: Juan
>Assignee: Oleksandr Nitavskyi
>Priority: Minor
>
> We are having a small issue while trying to deploy Flink on Mesos using 
> marathon. In our set up of Mesos we are required to specify the amount of 
> disk space we want to have for the applications we deploy there.
> The current default value in Flink is 0 and it's currently is not 
> parameterizable. This means that we ask 0 disk space for our instances so 
> Flink can't work.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10342) Kafka duplicate topic consumption when topic name is changed

2019-01-19 Thread Oleksandr Nitavskyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Nitavskyi closed FLINK-10342.
---
Resolution: Fixed

> Kafka duplicate topic consumption when topic name is changed
> 
>
> Key: FLINK-10342
> URL: https://issues.apache.org/jira/browse/FLINK-10342
> Project: Flink
>  Issue Type: Bug
>Reporter: Oleksandr Nitavskyi
>Assignee: Oleksandr Nitavskyi
>Priority: Minor
>
> In case of topic name is simply renamed for a KafkaConsumer Flink starts to 
> consume from old and a new topic in the same time which can lead to 
> unexpected behavior.
> Here is the PR with reproduce: https://github.com/apache/flink/pull/6691
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (FLINK-10342) Kafka duplicate topic consumption when topic name is changed

2019-01-19 Thread Oleksandr Nitavskyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Nitavskyi reopened FLINK-10342:
-

> Kafka duplicate topic consumption when topic name is changed
> 
>
> Key: FLINK-10342
> URL: https://issues.apache.org/jira/browse/FLINK-10342
> Project: Flink
>  Issue Type: Bug
>Reporter: Oleksandr Nitavskyi
>Assignee: Oleksandr Nitavskyi
>Priority: Minor
>
> In case of topic name is simply renamed for a KafkaConsumer Flink starts to 
> consume from old and a new topic in the same time which can lead to 
> unexpected behavior.
> Here is the PR with reproduce: https://github.com/apache/flink/pull/6691
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10342) Kafka duplicate topic consumption when topic name is changed

2019-01-19 Thread Oleksandr Nitavskyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Nitavskyi closed FLINK-10342.
---
Resolution: Won't Fix

> Kafka duplicate topic consumption when topic name is changed
> 
>
> Key: FLINK-10342
> URL: https://issues.apache.org/jira/browse/FLINK-10342
> Project: Flink
>  Issue Type: Bug
>Reporter: Oleksandr Nitavskyi
>Assignee: Oleksandr Nitavskyi
>Priority: Minor
>
> In case of topic name is simply renamed for a KafkaConsumer Flink starts to 
> consume from old and a new topic in the same time which can lead to 
> unexpected behavior.
> Here is the PR with reproduce: https://github.com/apache/flink/pull/6691
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11396) MetricStore creates significant GC pressure

2019-01-19 Thread Oleksandr Nitavskyi (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16746997#comment-16746997
 ] 

Oleksandr Nitavskyi commented on FLINK-11396:
-

On GC tuning side, G1 ergonomic concluded we need to have a small Young Memory 
Pool (because short living object are created only during UI usage, which is 
rare), so alternative solution would be to set Yound Memory Pool size via 
parameters or use ParallelGC.

> MetricStore creates significant GC pressure
> ---
>
> Key: FLINK-11396
> URL: https://issues.apache.org/jira/browse/FLINK-11396
> Project: Flink
>  Issue Type: Bug
>Reporter: Oleksandr Nitavskyi
>Priority: Major
> Attachments: GC_parallel_flink_job_manager.log, g1.txt, gc_example.txt
>
>
> On Flink version 1.6.x we observe a significant increase of the latency in 
> UI. 
> After performance profiling, we have concluded that during UI rendering 
> back-end spends 50% of the time on GC pauses, which means that Flink 1.6.x is 
> not friendly with G1 GC ergonomics configurations. 
> On Flink side, MetricStore creates a huge amount of short living objects for 
> our job, which provokes a lot of Young GC pauses for a non-small job, with 
> around 50 operators and 120 parallelisms. 
> Samples of GC logs are in attach.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11396) MetricStore creates significant GC pressure

2019-01-19 Thread Oleksandr Nitavskyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Nitavskyi updated FLINK-11396:

Attachment: GC_parallel_flink_job_manager.log
g1.txt
gc_example.txt

> MetricStore creates significant GC pressure
> ---
>
> Key: FLINK-11396
> URL: https://issues.apache.org/jira/browse/FLINK-11396
> Project: Flink
>  Issue Type: Bug
>Reporter: Oleksandr Nitavskyi
>Priority: Major
> Attachments: GC_parallel_flink_job_manager.log, g1.txt, gc_example.txt
>
>
> On Flink version 1.6.x we observe a significant increase of the latency in 
> UI. 
> After performance profiling, we have concluded that during UI rendering 
> back-end spends 50% of the time on GC pauses, which means that Flink 1.6.x is 
> not friendly with G1 GC ergonomics configurations. 
> On Flink side, MetricStore creates a huge amount of short living objects for 
> our job, which provokes a lot of Young GC pauses for a non-small job, with 
> around 50 operators and 120 parallelisms. 
> Samples of GC logs are in attach.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11396) MetricStore creates significant GC pressure

2019-01-19 Thread Oleksandr Nitavskyi (JIRA)
Oleksandr Nitavskyi created FLINK-11396:
---

 Summary: MetricStore creates significant GC pressure
 Key: FLINK-11396
 URL: https://issues.apache.org/jira/browse/FLINK-11396
 Project: Flink
  Issue Type: Bug
Reporter: Oleksandr Nitavskyi


On Flink version 1.6.x we observe a significant increase of the latency in UI. 

After performance profiling, we have concluded that during UI rendering 
back-end spends 50% of the time on GC pauses, which means that Flink 1.6.x is 
not friendly with G1 GC ergonomics configurations. 

On Flink side, MetricStore creates a huge amount of short living objects for 
our job, which provokes a lot of Young GC pauses for a non-small job, with 
around 50 operators and 120 parallelisms. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11396) MetricStore creates significant GC pressure

2019-01-19 Thread Oleksandr Nitavskyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Nitavskyi updated FLINK-11396:

Description: 
On Flink version 1.6.x we observe a significant increase of the latency in UI. 

After performance profiling, we have concluded that during UI rendering 
back-end spends 50% of the time on GC pauses, which means that Flink 1.6.x is 
not friendly with G1 GC ergonomics configurations. 

On Flink side, MetricStore creates a huge amount of short living objects for 
our job, which provokes a lot of Young GC pauses for a non-small job, with 
around 50 operators and 120 parallelisms. 

Samples of GC logs are in attach.

  was:
On Flink version 1.6.x we observe a significant increase of the latency in UI. 

After performance profiling, we have concluded that during UI rendering 
back-end spends 50% of the time on GC pauses, which means that Flink 1.6.x is 
not friendly with G1 GC ergonomics configurations. 

On Flink side, MetricStore creates a huge amount of short living objects for 
our job, which provokes a lot of Young GC pauses for a non-small job, with 
around 50 operators and 120 parallelisms. 


> MetricStore creates significant GC pressure
> ---
>
> Key: FLINK-11396
> URL: https://issues.apache.org/jira/browse/FLINK-11396
> Project: Flink
>  Issue Type: Bug
>Reporter: Oleksandr Nitavskyi
>Priority: Major
>
> On Flink version 1.6.x we observe a significant increase of the latency in 
> UI. 
> After performance profiling, we have concluded that during UI rendering 
> back-end spends 50% of the time on GC pauses, which means that Flink 1.6.x is 
> not friendly with G1 GC ergonomics configurations. 
> On Flink side, MetricStore creates a huge amount of short living objects for 
> our job, which provokes a lot of Young GC pauses for a non-small job, with 
> around 50 operators and 120 parallelisms. 
> Samples of GC logs are in attach.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11394) Job UI is not rendered in case of slow back-end

2019-01-19 Thread Oleksandr Nitavskyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Nitavskyi updated FLINK-11394:

Description: 
In case back-end requests for the job state take longer than 
_web.refresh-interval_ Job UI is not rendered correctly.

Request to back-end is made:
 !image-2019-01-18-19-53-17-358.png|height=600,width=800! 

In case one of refresh calls take more than _web.refresh-interval_ next 
requests are made.

!image-2019-01-18-19-53-42-565.png|height=250,width=250!

The correct answer has been received on the UI side, but nothing has been 
rendered.

 !image-2019-01-18-19-54-36-964.png|height=250,width=250!

  was:
In case back-end requests for the job state take longer than 
_web.refresh-interval_ Job UI is not rendered correctly.

Request to back-end is made:
 !image-2019-01-18-19-53-17-358.png|height=250,width=450! 

In case one of refresh calls take more than _web.refresh-interval_ next 
requests are made.

!image-2019-01-18-19-53-42-565.png|height=250,width=250!

The correct answer has been received on the UI side, but nothing has been 
rendered.

 !image-2019-01-18-19-54-36-964.png|height=250,width=250!


> Job UI is not rendered in case of slow back-end
> ---
>
> Key: FLINK-11394
> URL: https://issues.apache.org/jira/browse/FLINK-11394
> Project: Flink
>  Issue Type: Bug
>Reporter: Oleksandr Nitavskyi
>Priority: Major
> Attachments: image-2019-01-18-19-53-17-358.png, 
> image-2019-01-18-19-53-42-565.png, image-2019-01-18-19-54-36-964.png
>
>
> In case back-end requests for the job state take longer than 
> _web.refresh-interval_ Job UI is not rendered correctly.
> Request to back-end is made:
>  !image-2019-01-18-19-53-17-358.png|height=600,width=800! 
> In case one of refresh calls take more than _web.refresh-interval_ next 
> requests are made.
> !image-2019-01-18-19-53-42-565.png|height=250,width=250!
> The correct answer has been received on the UI side, but nothing has been 
> rendered.
>  !image-2019-01-18-19-54-36-964.png|height=250,width=250!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11394) Job UI is not rendered in case of slow back-end

2019-01-19 Thread Oleksandr Nitavskyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Nitavskyi updated FLINK-11394:

Description: 
In case back-end requests for the job state take longer than 
_web.refresh-interval_ Job UI is not rendered correctly.

Request to back-end is made:
 !image-2019-01-18-19-53-17-358.png|height=400,width=800! 

In case one of refresh calls take more than _web.refresh-interval_ next 
requests are made.

!image-2019-01-18-19-53-42-565.png|height=400,width=800! 

The correct answer has been received on the UI side, but nothing has been 
rendered.

 !image-2019-01-18-19-54-36-964.png|height=400,width=800! 

  was:
In case back-end requests for the job state take longer than 
_web.refresh-interval_ Job UI is not rendered correctly.

Request to back-end is made:
 !image-2019-01-18-19-53-17-358.png|height=400,width=800! 

In case one of refresh calls take more than _web.refresh-interval_ next 
requests are made.

!image-2019-01-18-19-53-42-565.png|height=250,width=250!

The correct answer has been received on the UI side, but nothing has been 
rendered.

 !image-2019-01-18-19-54-36-964.png|height=250,width=250!


> Job UI is not rendered in case of slow back-end
> ---
>
> Key: FLINK-11394
> URL: https://issues.apache.org/jira/browse/FLINK-11394
> Project: Flink
>  Issue Type: Bug
>Reporter: Oleksandr Nitavskyi
>Priority: Major
> Attachments: image-2019-01-18-19-53-17-358.png, 
> image-2019-01-18-19-53-42-565.png, image-2019-01-18-19-54-36-964.png
>
>
> In case back-end requests for the job state take longer than 
> _web.refresh-interval_ Job UI is not rendered correctly.
> Request to back-end is made:
>  !image-2019-01-18-19-53-17-358.png|height=400,width=800! 
> In case one of refresh calls take more than _web.refresh-interval_ next 
> requests are made.
> !image-2019-01-18-19-53-42-565.png|height=400,width=800! 
> The correct answer has been received on the UI side, but nothing has been 
> rendered.
>  !image-2019-01-18-19-54-36-964.png|height=400,width=800! 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11394) Job UI is not rendered in case of slow back-end

2019-01-19 Thread Oleksandr Nitavskyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Nitavskyi updated FLINK-11394:

Description: 
In case back-end requests for the job state take longer than 
_web.refresh-interval_ Job UI is not rendered correctly.

Request to back-end is made:
 !image-2019-01-18-19-53-17-358.png|height=400,width=800! 

In case one of refresh calls take more than _web.refresh-interval_ next 
requests are made.

!image-2019-01-18-19-53-42-565.png|height=250,width=250!

The correct answer has been received on the UI side, but nothing has been 
rendered.

 !image-2019-01-18-19-54-36-964.png|height=250,width=250!

  was:
In case back-end requests for the job state take longer than 
_web.refresh-interval_ Job UI is not rendered correctly.

Request to back-end is made:
 !image-2019-01-18-19-53-17-358.png|height=500,width=800! 

In case one of refresh calls take more than _web.refresh-interval_ next 
requests are made.

!image-2019-01-18-19-53-42-565.png|height=250,width=250!

The correct answer has been received on the UI side, but nothing has been 
rendered.

 !image-2019-01-18-19-54-36-964.png|height=250,width=250!


> Job UI is not rendered in case of slow back-end
> ---
>
> Key: FLINK-11394
> URL: https://issues.apache.org/jira/browse/FLINK-11394
> Project: Flink
>  Issue Type: Bug
>Reporter: Oleksandr Nitavskyi
>Priority: Major
> Attachments: image-2019-01-18-19-53-17-358.png, 
> image-2019-01-18-19-53-42-565.png, image-2019-01-18-19-54-36-964.png
>
>
> In case back-end requests for the job state take longer than 
> _web.refresh-interval_ Job UI is not rendered correctly.
> Request to back-end is made:
>  !image-2019-01-18-19-53-17-358.png|height=400,width=800! 
> In case one of refresh calls take more than _web.refresh-interval_ next 
> requests are made.
> !image-2019-01-18-19-53-42-565.png|height=250,width=250!
> The correct answer has been received on the UI side, but nothing has been 
> rendered.
>  !image-2019-01-18-19-54-36-964.png|height=250,width=250!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11394) Job UI is not rendered in case of slow back-end

2019-01-19 Thread Oleksandr Nitavskyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Nitavskyi updated FLINK-11394:

Description: 
In case back-end requests for the job state take longer than 
_web.refresh-interval_ Job UI is not rendered correctly.

Request to back-end is made:
 !image-2019-01-18-19-53-17-358.png|height=500,width=800! 

In case one of refresh calls take more than _web.refresh-interval_ next 
requests are made.

!image-2019-01-18-19-53-42-565.png|height=250,width=250!

The correct answer has been received on the UI side, but nothing has been 
rendered.

 !image-2019-01-18-19-54-36-964.png|height=250,width=250!

  was:
In case back-end requests for the job state take longer than 
_web.refresh-interval_ Job UI is not rendered correctly.

Request to back-end is made:
 !image-2019-01-18-19-53-17-358.png|height=600,width=800! 

In case one of refresh calls take more than _web.refresh-interval_ next 
requests are made.

!image-2019-01-18-19-53-42-565.png|height=250,width=250!

The correct answer has been received on the UI side, but nothing has been 
rendered.

 !image-2019-01-18-19-54-36-964.png|height=250,width=250!


> Job UI is not rendered in case of slow back-end
> ---
>
> Key: FLINK-11394
> URL: https://issues.apache.org/jira/browse/FLINK-11394
> Project: Flink
>  Issue Type: Bug
>Reporter: Oleksandr Nitavskyi
>Priority: Major
> Attachments: image-2019-01-18-19-53-17-358.png, 
> image-2019-01-18-19-53-42-565.png, image-2019-01-18-19-54-36-964.png
>
>
> In case back-end requests for the job state take longer than 
> _web.refresh-interval_ Job UI is not rendered correctly.
> Request to back-end is made:
>  !image-2019-01-18-19-53-17-358.png|height=500,width=800! 
> In case one of refresh calls take more than _web.refresh-interval_ next 
> requests are made.
> !image-2019-01-18-19-53-42-565.png|height=250,width=250!
> The correct answer has been received on the UI side, but nothing has been 
> rendered.
>  !image-2019-01-18-19-54-36-964.png|height=250,width=250!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11394) Job UI is not rendered in case of slow back-end

2019-01-19 Thread Oleksandr Nitavskyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Nitavskyi updated FLINK-11394:

Description: 
In case back-end requests for the job state take longer than 
_web.refresh-interval_ Job UI is not rendered correctly.

Request to back-end is made:
 !image-2019-01-18-19-53-17-358.png|height=250,width=450! 

In case one of refresh calls take more than _web.refresh-interval_ next 
requests are made.

!image-2019-01-18-19-53-42-565.png|height=250,width=250!

The correct answer has been received on the UI side, but nothing has been 
rendered.

 !image-2019-01-18-19-54-36-964.png|height=250,width=250!

  was:
In case back-end requests for the job state take longer than 
_web.refresh-interval_ Job UI is not rendered correctly.

Request to back-end is made:
 !image-2019-01-18-19-53-17-358.png|height=250,width=250! 

In case one of refresh calls take more than _web.refresh-interval_ next 
requests are made.

!image-2019-01-18-19-53-42-565.png|height=250,width=250!

The correct answer has been received on the UI side, but nothing has been 
rendered.

 !image-2019-01-18-19-54-36-964.png|height=250,width=250!


> Job UI is not rendered in case of slow back-end
> ---
>
> Key: FLINK-11394
> URL: https://issues.apache.org/jira/browse/FLINK-11394
> Project: Flink
>  Issue Type: Bug
>Reporter: Oleksandr Nitavskyi
>Priority: Major
> Attachments: image-2019-01-18-19-53-17-358.png, 
> image-2019-01-18-19-53-42-565.png, image-2019-01-18-19-54-36-964.png
>
>
> In case back-end requests for the job state take longer than 
> _web.refresh-interval_ Job UI is not rendered correctly.
> Request to back-end is made:
>  !image-2019-01-18-19-53-17-358.png|height=250,width=450! 
> In case one of refresh calls take more than _web.refresh-interval_ next 
> requests are made.
> !image-2019-01-18-19-53-42-565.png|height=250,width=250!
> The correct answer has been received on the UI side, but nothing has been 
> rendered.
>  !image-2019-01-18-19-54-36-964.png|height=250,width=250!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11394) Job UI is not rendered in case of slow back-end

2019-01-19 Thread Oleksandr Nitavskyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Nitavskyi updated FLINK-11394:

Description: 
In case back-end requests for the job state take longer than 
_web.refresh-interval_ Job UI is not rendered correctly.

Request to back-end is made:
 !image-2019-01-18-19-53-17-358.png|height=250,width=250! 

In case one of refresh calls take more than _web.refresh-interval_ next 
requests are made.

!image-2019-01-18-19-53-42-565.png|height=250,width=250!

The correct answer has been received on the UI side, but nothing has been 
rendered.

 !image-2019-01-18-19-54-36-964.png|height=250,width=250!

  was:
In case back-end requests for the job state take longer than 
_web.refresh-interval_ Job UI is not rendered correctly.

Request to back-end is made:
 !image-2019-01-18-19-53-17-358.png! 

In case one of refresh calls take more than _web.refresh-interval_ next 
requests are made.

!image-2019-01-18-19-53-42-565.png!

The correct answer has been received on the UI side, but nothing has been 
rendered.

 !image-2019-01-18-19-54-36-964.png! 


> Job UI is not rendered in case of slow back-end
> ---
>
> Key: FLINK-11394
> URL: https://issues.apache.org/jira/browse/FLINK-11394
> Project: Flink
>  Issue Type: Bug
>Reporter: Oleksandr Nitavskyi
>Priority: Major
> Attachments: image-2019-01-18-19-53-17-358.png, 
> image-2019-01-18-19-53-42-565.png, image-2019-01-18-19-54-36-964.png
>
>
> In case back-end requests for the job state take longer than 
> _web.refresh-interval_ Job UI is not rendered correctly.
> Request to back-end is made:
>  !image-2019-01-18-19-53-17-358.png|height=250,width=250! 
> In case one of refresh calls take more than _web.refresh-interval_ next 
> requests are made.
> !image-2019-01-18-19-53-42-565.png|height=250,width=250!
> The correct answer has been received on the UI side, but nothing has been 
> rendered.
>  !image-2019-01-18-19-54-36-964.png|height=250,width=250!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11394) Job UI is not rendered in case of slow back-end

2019-01-18 Thread Oleksandr Nitavskyi (JIRA)
Oleksandr Nitavskyi created FLINK-11394:
---

 Summary: Job UI is not rendered in case of slow back-end
 Key: FLINK-11394
 URL: https://issues.apache.org/jira/browse/FLINK-11394
 Project: Flink
  Issue Type: Bug
Reporter: Oleksandr Nitavskyi
 Attachments: image-2019-01-18-19-53-17-358.png, 
image-2019-01-18-19-53-42-565.png, image-2019-01-18-19-54-36-964.png

In case back-end requests for the job state take longer than 
_web.refresh-interval_ Job UI is not rendered correctly.

Request to back-end is made:
 !image-2019-01-18-19-53-17-358.png! 

In case one of refresh calls take more than _web.refresh-interval_ next 
requests are made.

!image-2019-01-18-19-53-42-565.png!

The correct answer has been received on the UI side, but nothing has been 
rendered.

 !image-2019-01-18-19-54-36-964.png! 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10342) Kafka duplicate topic consumption when topic name is changed

2018-09-18 Thread Oleksandr Nitavskyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Nitavskyi updated FLINK-10342:

Priority: Minor  (was: Major)

> Kafka duplicate topic consumption when topic name is changed
> 
>
> Key: FLINK-10342
> URL: https://issues.apache.org/jira/browse/FLINK-10342
> Project: Flink
>  Issue Type: Bug
>Reporter: Oleksandr Nitavskyi
>Assignee: Oleksandr Nitavskyi
>Priority: Minor
>
> In case of topic name is simply renamed for a KafkaConsumer Flink starts to 
> consume from old and a new topic in the same time which can lead to 
> unexpected behavior.
> Here is the PR with reproduce: https://github.com/apache/flink/pull/6691
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10342) Kafka duplicate topic consumption when topic name is changed

2018-09-17 Thread Oleksandr Nitavskyi (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16617172#comment-16617172
 ] 

Oleksandr Nitavskyi edited comment on FLINK-10342 at 9/17/18 11:29 AM:
---

Thanks for the comment. I agree that it works as expected from the Flink 
developer point of view, loading old TopicPartitions consumptions from the 
state and discovering newly requested TopicPartitions. In case there are not 
intersections we consume from both sets of TopicPartitions, since state in 
KafkaSource operator can never expire.

In the same time, this behavior is counterintuitive for the Flink users. When 
KafkaSource is created consuming "topic 1" it expected that "topic 1" will be 
consumed.
{code}
new KafkaSource("topic 1")
{code}

If after the refactoring KafkaSource is starting to consume another "topic 2":
{code}
new KafkaSource("topic 2")
{code}
And for us it sounds intuitive that data will come from the "topic 2" and only 
from the "topic 2" and current behavior has the hole in the abstraction.

I believe that it worth to make some action points at least on of:
* Make a small check in the state restoring method where we skip topics which 
are not passed via class constructor.
* Log some warning if topics in the state and in the constructors are different
* Document such behavior, also can be a good exercise which clarifies how does 
state managed and help to start thinking in a little bit different paradigm

If you think it worth to make some action point let me know and I will 
contribute.
Thank you


was (Author: oleksandr nitavskyi):
Thanks for the comment. I agree that it works as expected from the Flink 
developer point of view, loading old TopicPartitions consumptions from the 
state and discovering newly requested TopicPartitions. In case there are not 
intersections we consume from both sets of TopicPartitions, since state in 
KafkaSource operator can never expire.

In the same time, this behavior is counterintuitive for the Flink users. When 
KafkaSource is created consuming "topic 1" it expected that "topic 1" will be 
consumed.
{code}
new KafkaSource("topic 1")
{code}

If after the refactoring KafkaSource is starting to consume another "topic 2":
{code}
new Kafka Source("topic 2")
{code}
And for us it sounds intuitive that data will come from the "topic 2" and only 
from the "topic 2" and current behavior has the hole in the abstraction.

I believe that it worth to make some action points at least on of:
* Make a small check in the state restoring method where we skip topics which 
are not passed via class constructor.
* Log some warning if topics in the state and in the constructors are different
* Document such behavior, also can be a good exercise which clarifies how does 
state managed and help to start thinking in a little bit different paradigm

If you think it worth to make some action point let me know and I will 
contribute.
Thank you

> Kafka duplicate topic consumption when topic name is changed
> 
>
> Key: FLINK-10342
> URL: https://issues.apache.org/jira/browse/FLINK-10342
> Project: Flink
>  Issue Type: Bug
>Reporter: Oleksandr Nitavskyi
>Assignee: Oleksandr Nitavskyi
>Priority: Major
>
> In case of topic name is simply renamed for a KafkaConsumer Flink starts to 
> consume from old and a new topic in the same time which can lead to 
> unexpected behavior.
> Here is the PR with reproduce: https://github.com/apache/flink/pull/6691
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (FLINK-10342) Kafka duplicate topic consumption when topic name is changed

2018-09-17 Thread Oleksandr Nitavskyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Nitavskyi reopened FLINK-10342:
-

> Kafka duplicate topic consumption when topic name is changed
> 
>
> Key: FLINK-10342
> URL: https://issues.apache.org/jira/browse/FLINK-10342
> Project: Flink
>  Issue Type: Bug
>Reporter: Oleksandr Nitavskyi
>Assignee: Oleksandr Nitavskyi
>Priority: Major
>
> In case of topic name is simply renamed for a KafkaConsumer Flink starts to 
> consume from old and a new topic in the same time which can lead to 
> unexpected behavior.
> Here is the PR with reproduce: https://github.com/apache/flink/pull/6691
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10342) Kafka duplicate topic consumption when topic name is changed

2018-09-17 Thread Oleksandr Nitavskyi (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16617172#comment-16617172
 ] 

Oleksandr Nitavskyi commented on FLINK-10342:
-

Thanks for the comment. I agree that it works as expected from the Flink 
developer point of view, loading old TopicPartitions consumptions from the 
state and discovering newly requested TopicPartitions. In case there are not 
intersections we consume from both sets of TopicPartitions, since state in 
KafkaSource operator can never expire.

In the same time, this behavior is counterintuitive for the Flink users. When 
KafkaSource is created consuming "topic 1" it expected that "topic 1" will be 
consumed.
{code}
new KafkaSource("topic 1")
{code}

If after the refactoring KafkaSource is starting to consume another "topic 2":
{code}
new Kafka Source("topic 2")
{code}
And for us it sounds intuitive that data will come from the "topic 2" and only 
from the "topic 2" and current behavior has the hole in the abstraction.

I believe that it worth to make some action points at least on of:
* Make a small check in the state restoring method where we skip topics which 
are not passed via class constructor.
* Log some warning if topics in the state and in the constructors are different
* Document such behavior, also can be a good exercise which clarifies how does 
state managed and help to start thinking in a little bit different paradigm

If you think it worth to make some action point let me know and I will 
contribute.
Thank you

> Kafka duplicate topic consumption when topic name is changed
> 
>
> Key: FLINK-10342
> URL: https://issues.apache.org/jira/browse/FLINK-10342
> Project: Flink
>  Issue Type: Bug
>Reporter: Oleksandr Nitavskyi
>Assignee: Oleksandr Nitavskyi
>Priority: Major
>
> In case of topic name is simply renamed for a KafkaConsumer Flink starts to 
> consume from old and a new topic in the same time which can lead to 
> unexpected behavior.
> Here is the PR with reproduce: https://github.com/apache/flink/pull/6691
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10342) Kafka duplicate topic consumption when topic name is changed

2018-09-13 Thread Oleksandr Nitavskyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleksandr Nitavskyi reassigned FLINK-10342:
---

Assignee: Oleksandr Nitavskyi

> Kafka duplicate topic consumption when topic name is changed
> 
>
> Key: FLINK-10342
> URL: https://issues.apache.org/jira/browse/FLINK-10342
> Project: Flink
>  Issue Type: Bug
>Reporter: Oleksandr Nitavskyi
>Assignee: Oleksandr Nitavskyi
>Priority: Major
>
> In case of topic name is simply renamed for a KafkaConsumer Flink starts to 
> consume from old and a new topic in the same time which can lead to 
> unexpected behavior.
> Here is the PR with reproduce: https://github.com/apache/flink/pull/6691
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10342) Kafka duplicate topic consumption when topic name is changed

2018-09-13 Thread Oleksandr Nitavskyi (JIRA)
Oleksandr Nitavskyi created FLINK-10342:
---

 Summary: Kafka duplicate topic consumption when topic name is 
changed
 Key: FLINK-10342
 URL: https://issues.apache.org/jira/browse/FLINK-10342
 Project: Flink
  Issue Type: Bug
Reporter: Oleksandr Nitavskyi


In case of topic name is simply renamed for a KafkaConsumer Flink starts to 
consume from old and a new topic in the same time which can lead to unexpected 
behavior.

Here is the PR with reproduce: https://github.com/apache/flink/pull/6691

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9762) CoreOptions.TMP_DIRS wrongly managed on Yarn

2018-07-05 Thread Oleksandr Nitavskyi (JIRA)
Oleksandr Nitavskyi created FLINK-9762:
--

 Summary: CoreOptions.TMP_DIRS wrongly managed on Yarn
 Key: FLINK-9762
 URL: https://issues.apache.org/jira/browse/FLINK-9762
 Project: Flink
  Issue Type: Bug
  Components: YARN
Affects Versions: 1.5.0
Reporter: Oleksandr Nitavskyi


The issue on Yarn is that it is impossible to have different LOCAL_DIRS on 
JobManager and TaskManager, despite LOCAL_DIRS value depends on the container.

The issue is that CoreOptions.TMP_DIRS is configured to the default value 
during JobManager initialization and added to the configuration object. When 
TaskManager is launched the appropriate configuration object is cloned with 
LOCAL_DIRS which makes sense only for Job Manager container. When YARN 
container with TaskManager from his point of view CoreOptions.TMP_DIRS is 
always equal either to path in flink.yml or to the or to the LOCAL_DIRS of Job 
Manager (default behaviour). Is TaskManager’s container do not have an access 
to another folders, that folders allocated by YARN TaskManager cannot be 
started.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)