[jira] [Updated] (FLINK-35461) Improve Runtime Configuration for Flink 2.0

2024-05-29 Thread Xuannan Su (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuannan Su updated FLINK-35461:
---
Release Note: 
The following configurations have been deprecated  as we are phasing out the 
hash-based blocking shuffle:
- `taskmanager.network.sort-shuffle.min-parallelism`
- `taskmanager.network.blocking-shuffle.type`

The following configurations have been deprecated  as we are phasing out the 
legacy hybrid shuffle:
- `taskmanager.network.hybrid-shuffle.spill-index-region-group-size`
- `taskmanager.network.hybrid-shuffle.num-retained-in-memory-regions-max`
- `taskmanager.network.hybrid-shuffle.enable-new-mode`

The following configurations have been deprecated  to simply the configuration 
of network buffers:
- `taskmanager.network.memory.buffers-per-channel`
- `taskmanager.network.memory.floating-buffers-per-gate`
- `taskmanager.network.memory.max-buffers-per-channel`
- `taskmanager.network.memory.max-overdraft-buffers-per-gate`
- `taskmanager.network.memory.exclusive-buffers-request-timeout-ms` (Please use 
`taskmanager.network.memory.buffers-request-timeout` instead.)

The configuration `taskmanager.network.batch-shuffle.compression.enabled` has 
been deprecated. Please set `taskmanager.network.compression.codec` to "NONE" 
to disable compression.

The following Netty-related configurations are no longer recommended for use 
and have been deprecated:
- `taskmanager.network.netty.num-arenas`
- `taskmanager.network.netty.server.numThreads`
- `taskmanager.network.netty.client.numThreads`
- `taskmanager.network.netty.server.backlog`
- `taskmanager.network.netty.sendReceiveBufferSize`
- `taskmanager.network.netty.transport`

The following configurations are unnecessary and have been deprecated:
- `taskmanager.network.max-num-tcp-connections`
- `fine-grained.shuffle-mode.all-blocking`

  was:
The following configurations have been deprecated  as we are phasing out the 
hash-based blocking shuffle:
- `taskmanager.network.sort-shuffle.min-parallelism`
- `taskmanager.network.blocking-shuffle.type`

The following configurations have been deprecated  as we are phasing out the 
legacy hybrid shuffle:
- `taskmanager.network.hybrid-shuffle.spill-index-region-group-size`
- `taskmanager.network.hybrid-shuffle.num-retained-in-memory-regions-max`
- `taskmanager.network.hybrid-shuffle.enable-new-mode`

The following configurations have been deprecated  to simply the configuration 
of network buffers:
- `taskmanager.network.memory.buffers-per-channel`
- `taskmanager.network.memory.floating-buffers-per-gate`
- `taskmanager.network.memory.max-buffers-per-channel`
- `taskmanager.network.memory.max-overdraft-buffers-per-gate`
- `taskmanager.network.memory.exclusive-buffers-request-timeout-ms` (Please use 
`taskmanager.network.memory.exclusive-buffers-request-timeout` instead.)

The configuration `taskmanager.network.batch-shuffle.compression.enabled` has 
been deprecated. Please set `taskmanager.network.compression.codec` to "NONE" 
to disable compression.

The following Netty-related configurations are no longer recommended for use 
and have been deprecated:
- `taskmanager.network.netty.num-arenas`
- `taskmanager.network.netty.server.numThreads`
- `taskmanager.network.netty.client.numThreads`
- `taskmanager.network.netty.server.backlog`
- `taskmanager.network.netty.sendReceiveBufferSize`
- `taskmanager.network.netty.transport`

The following configurations are unnecessary and have been deprecated:
- `taskmanager.network.max-num-tcp-connections`
- `fine-grained.shuffle-mode.all-blocking`


> Improve Runtime Configuration for Flink 2.0
> ---
>
> Key: FLINK-35461
> URL: https://issues.apache.org/jira/browse/FLINK-35461
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Reporter: Xuannan Su
>Assignee: Xuannan Su
>Priority: Major
>  Labels: pull-request-available
>
> As Flink moves toward 2.0, we have revisited all runtime configurations and 
> identified several improvements to enhance user-friendliness and 
> maintainability. We want to refine the runtime configuration.
>  
> This ticket implements all the changes discussed in 
> [FLIP-450|https://cwiki.apache.org/confluence/display/FLINK/FLIP-450%3A+Improve+Runtime+Configuration+for+Flink+2.0].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35461) Improve Runtime Configuration for Flink 2.0

2024-05-29 Thread Xuannan Su (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuannan Su updated FLINK-35461:
---
Release Note: 
The following configurations have been deprecated  as we are phasing out the 
hash-based blocking shuffle:
- `taskmanager.network.sort-shuffle.min-parallelism`
- `taskmanager.network.blocking-shuffle.type`

The following configurations have been deprecated  as we are phasing out the 
legacy hybrid shuffle:
- `taskmanager.network.hybrid-shuffle.spill-index-region-group-size`
- `taskmanager.network.hybrid-shuffle.num-retained-in-memory-regions-max`
- `taskmanager.network.hybrid-shuffle.enable-new-mode`

The following configurations have been deprecated  to simply the configuration 
of network buffers:
- `taskmanager.network.memory.buffers-per-channel`
- `taskmanager.network.memory.floating-buffers-per-gate`
- `taskmanager.network.memory.max-buffers-per-channel`
- `taskmanager.network.memory.max-overdraft-buffers-per-gate`
- `taskmanager.network.memory.exclusive-buffers-request-timeout-ms` (Please use 
`taskmanager.network.memory.exclusive-buffers-request-timeout` instead.)

The configuration `taskmanager.network.batch-shuffle.compression.enabled` has 
been deprecated. Please set `taskmanager.network.compression.codec` to "NONE" 
to disable compression.

The following Netty-related configurations are no longer recommended for use 
and have been deprecated:
- `taskmanager.network.netty.num-arenas`
- `taskmanager.network.netty.server.numThreads`
- `taskmanager.network.netty.client.numThreads`
- `taskmanager.network.netty.server.backlog`
- `taskmanager.network.netty.sendReceiveBufferSize`
- `taskmanager.network.netty.transport`

The following configurations are unnecessary and have been deprecated:
- `taskmanager.network.max-num-tcp-connections`
- `fine-grained.shuffle-mode.all-blocking`

> Improve Runtime Configuration for Flink 2.0
> ---
>
> Key: FLINK-35461
> URL: https://issues.apache.org/jira/browse/FLINK-35461
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Reporter: Xuannan Su
>Assignee: Xuannan Su
>Priority: Major
>  Labels: pull-request-available
>
> As Flink moves toward 2.0, we have revisited all runtime configurations and 
> identified several improvements to enhance user-friendliness and 
> maintainability. We want to refine the runtime configuration.
>  
> This ticket implements all the changes discussed in 
> [FLIP-450|https://cwiki.apache.org/confluence/display/FLINK/FLIP-450%3A+Improve+Runtime+Configuration+for+Flink+2.0].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35475) Introduce isInternalSorterSupport to OperatorAttributes

2024-05-28 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-35475:
--

 Summary: Introduce isInternalSorterSupport to OperatorAttributes
 Key: FLINK-35475
 URL: https://issues.apache.org/jira/browse/FLINK-35475
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Task
Reporter: Xuannan Su


Introduce isInternalSorterSupport to OperatorAttributes to notify Flink whether 
the operator will sort the data internally in batch mode or during backlog.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35461) Improve Runtime Configuration for Flink 2.0

2024-05-28 Thread Xuannan Su (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuannan Su updated FLINK-35461:
---
Description: 
As Flink moves toward 2.0, we have revisited all runtime configurations and 
identified several improvements to enhance user-friendliness and 
maintainability. We want to refine the runtime configuration.

 

This ticket implements all the changes discussed in 
[FLIP-450|https://cwiki.apache.org/confluence/display/FLINK/FLIP-450%3A+Improve+Runtime+Configuration+for+Flink+2.0].

  was:
As Flink moves toward 2.0, we have revisited all runtime configurations and 
identified several improvements to enhance user-friendliness and 
maintainability. In this FLIP, we aim to refine the runtime configuration.

 

This ticket implements all the changes discussed in 
[FLIP-450|https://cwiki.apache.org/confluence/display/FLINK/FLIP-450%3A+Improve+Runtime+Configuration+for+Flink+2.0].


> Improve Runtime Configuration for Flink 2.0
> ---
>
> Key: FLINK-35461
> URL: https://issues.apache.org/jira/browse/FLINK-35461
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Reporter: Xuannan Su
>Priority: Major
>  Labels: pull-request-available
>
> As Flink moves toward 2.0, we have revisited all runtime configurations and 
> identified several improvements to enhance user-friendliness and 
> maintainability. We want to refine the runtime configuration.
>  
> This ticket implements all the changes discussed in 
> [FLIP-450|https://cwiki.apache.org/confluence/display/FLINK/FLIP-450%3A+Improve+Runtime+Configuration+for+Flink+2.0].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35461) Improve Runtime Configuration for Flink 2.0

2024-05-26 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-35461:
--

 Summary: Improve Runtime Configuration for Flink 2.0
 Key: FLINK-35461
 URL: https://issues.apache.org/jira/browse/FLINK-35461
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Configuration
Reporter: Xuannan Su


As Flink moves toward 2.0, we have revisited all runtime configurations and 
identified several improvements to enhance user-friendliness and 
maintainability. In this FLIP, we aim to refine the runtime configuration.

 

This ticket implements all the changes discussed in 
[FLIP-450|https://cwiki.apache.org/confluence/display/FLINK/FLIP-450%3A+Improve+Runtime+Configuration+for+Flink+2.0].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35359) General Improvement to Configuration for Flink 2.0

2024-05-23 Thread Xuannan Su (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuannan Su updated FLINK-35359:
---
Release Note: 
- The following configurations have been updated to the Duration type in a 
backward-compatible manner: `client.heartbeat.interval`, 
`client.heartbeat.timeout`, `cluster.registration.error-delay`, 
`cluster.registration.initial-timeout`, `cluster.registration.max-timeout`, 
`cluster.registration.refused-registration-delay`, 
`cluster.services.shutdown-timeout`, `heartbeat.interval`, `heartbeat.timeout`, 
`high-availability.zookeeper.client.connection-timeout`, 
`high-availability.zookeeper.client.retry-wait`, 
`high-availability.zookeeper.client.session-timeout`, 
`historyserver.archive.fs.refresh-interval`, 
`historyserver.web.refresh-interval`, `metrics.fetcher.update-interval`, 
`metrics.latency.interval`, `metrics.reporter.influxdb.connectTimeout`, 
`metrics.reporter.influxdb.writeTimeout`, 
`metrics.system-resource-probing-interval`, `pekko.startup-timeout`, 
`pekko.tcp.timeout`, `resourcemanager.job.timeout`, 
`resourcemanager.standalone.start-up-time`, 
`resourcemanager.taskmanager-timeout`, `rest.await-leader-timeout`, 
`rest.connection-timeout`, `rest.idleness-timeout`, `rest.retry.delay`, 
`slot.idle.timeout`, `slot.request.timeout`, `task.cancellation.interval`, 
`task.cancellation.timeout`, `task.cancellation.timers.timeout`, 
`taskmanager.debug.memory.log-interval`, `web.refresh-interval`, `web.timeout`, 
`yarn.heartbeat.container-request-interval`.
- The `taskmanager.network.compression.codec` and 
`table.optimizer.agg-phase-strategy` configurations have been updated to the 
Enum type in a backward-compatible manner.
- The `yarn.application-attempts` configuration has been updated to the Int 
type in a backward-compatible manner.

  was:
- The following configurations have been updated to the Duration type in a 
backward-compatible manner: `client.heartbeat.interval`, 
`client.heartbeat.timeout`, `cluster.registration.error-delay`, 
`cluster.registration.initial-timeout`, `cluster.registration.max-timeout`, 
`cluster.registration.refused-registration-delay`, 
`cluster.services.shutdown-timeout`, `heartbeat.interval`, `heartbeat.timeout`, 
`high-availability.zookeeper.client.connection-timeout`, 
`high-availability.zookeeper.client.retry-wait`, 
`high-availability.zookeeper.client.session-timeout`, 
`historyserver.archive.fs.refresh-interval`, 
`historyserver.web.refresh-interval`, `metrics.fetcher.update-interval`, 
`metrics.latency.interval`, `metrics.reporter.influxdb.connectTimeout`, 
`metrics.reporter.influxdb.writeTimeout`, 
`metrics.system-resource-probing-interval`, `pekko.startup-timeout`, 
`pekko.tcp.timeout`, `resourcemanager.job.timeout`, 
`resourcemanager.standalone.start-up-time`, 
`resourcemanager.taskmanager-timeout`, `rest.await-leader-timeout`, 
`rest.connection-timeout`, `rest.idleness-timeout`, `rest.retry.delay`, 
`slot.idle.timeout`, `slot.request.timeout`, `task.cancellation.interval`, 
`task.cancellation.timeout`, `task.cancellation.timers.timeout`, 
`taskmanager.debug.memory.log-interval`, `web.refresh-interval`, `web.timeout`, 
`yarn.heartbeat.container-request-interval`
- The `taskmanager.network.compression.codec` and 
`table.optimizer.agg-phase-strategy` configurations have been updated to the 
Enum type in a backward-compatible manner.
- The `yarn.application-attempts` configuration has been updated to the Int 
type in a backward-compatible manner.


> General Improvement to Configuration for Flink 2.0
> --
>
> Key: FLINK-35359
> URL: https://issues.apache.org/jira/browse/FLINK-35359
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Reporter: Xuannan Su
>Assignee: Xuannan Su
>Priority: Major
>  Labels: pull-request-available
>
> As Flink moves toward version 2.0, we want to provide users with a better 
> experience with the existing configuration. In this FLIP, we outline several 
> general improvements to the current configuration:
>  * Ensure all the ConfigOptions are properly annotated
>  * Ensure all user-facing configurations are included in the documentation 
> generation process
>  * Make the existing ConfigOptions use the proper type
>  * Mark all internally used ConfigOptions with the @Internal annotation
>  
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-442%3A+General+Improvement+to+Configuration+for+Flink+2.0
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35359) General Improvement to Configuration for Flink 2.0

2024-05-23 Thread Xuannan Su (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuannan Su updated FLINK-35359:
---
Release Note: 
- The following configurations have been updated to the Duration type in a 
backward-compatible manner: `client.heartbeat.interval`, 
`client.heartbeat.timeout`, `cluster.registration.error-delay`, 
`cluster.registration.initial-timeout`, `cluster.registration.max-timeout`, 
`cluster.registration.refused-registration-delay`, 
`cluster.services.shutdown-timeout`, `heartbeat.interval`, `heartbeat.timeout`, 
`high-availability.zookeeper.client.connection-timeout`, 
`high-availability.zookeeper.client.retry-wait`, 
`high-availability.zookeeper.client.session-timeout`, 
`historyserver.archive.fs.refresh-interval`, 
`historyserver.web.refresh-interval`, `metrics.fetcher.update-interval`, 
`metrics.latency.interval`, `metrics.reporter.influxdb.connectTimeout`, 
`metrics.reporter.influxdb.writeTimeout`, 
`metrics.system-resource-probing-interval`, `pekko.startup-timeout`, 
`pekko.tcp.timeout`, `resourcemanager.job.timeout`, 
`resourcemanager.standalone.start-up-time`, 
`resourcemanager.taskmanager-timeout`, `rest.await-leader-timeout`, 
`rest.connection-timeout`, `rest.idleness-timeout`, `rest.retry.delay`, 
`slot.idle.timeout`, `slot.request.timeout`, `task.cancellation.interval`, 
`task.cancellation.timeout`, `task.cancellation.timers.timeout`, 
`taskmanager.debug.memory.log-interval`, `web.refresh-interval`, `web.timeout`, 
`yarn.heartbeat.container-request-interval`
- The `taskmanager.network.compression.codec` and 
`table.optimizer.agg-phase-strategy` configurations have been updated to the 
Enum type in a backward-compatible manner.
- The `yarn.application-attempts` configuration has been updated to the Int 
type in a backward-compatible manner.

  was:
- The following configurations have been updated to the Duration type in a 
backward-compatible manner:: `client.heartbeat.interval`, 
`client.heartbeat.timeout`, `cluster.registration.error-delay`, 
`cluster.registration.initial-timeout`, `cluster.registration.max-timeout`, 
`cluster.registration.refused-registration-delay`, 
`cluster.services.shutdown-timeout`, `heartbeat.interval`, `heartbeat.timeout`, 
`high-availability.zookeeper.client.connection-timeout`, 
`high-availability.zookeeper.client.retry-wait`, 
`high-availability.zookeeper.client.session-timeout`, 
`historyserver.archive.fs.refresh-interval`, 
`historyserver.web.refresh-interval`, `metrics.fetcher.update-interval`, 
`metrics.latency.interval`, `metrics.reporter.influxdb.connectTimeout`, 
`metrics.reporter.influxdb.writeTimeout`, 
`metrics.system-resource-probing-interval`, `pekko.startup-timeout`, 
`pekko.tcp.timeout`, `resourcemanager.job.timeout`, 
`resourcemanager.standalone.start-up-time`, 
`resourcemanager.taskmanager-timeout`, `rest.await-leader-timeout`, 
`rest.connection-timeout`, `rest.idleness-timeout`, `rest.retry.delay`, 
`slot.idle.timeout`, `slot.request.timeout`, `task.cancellation.interval`, 
`task.cancellation.timeout`, `task.cancellation.timers.timeout`, 
`taskmanager.debug.memory.log-interval`, `web.refresh-interval`, `web.timeout`, 
`yarn.heartbeat.container-request-interval`, 
- The `taskmanager.network.compression.codec` and 
`table.optimizer.agg-phase-strategy` configurations have been updated to the 
Enum type in a backward-compatible manner.
- The `yarn.application-attempts` configuration has been updated to the Int 
type in a backward-compatible manner.


> General Improvement to Configuration for Flink 2.0
> --
>
> Key: FLINK-35359
> URL: https://issues.apache.org/jira/browse/FLINK-35359
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Reporter: Xuannan Su
>Assignee: Xuannan Su
>Priority: Major
>  Labels: pull-request-available
>
> As Flink moves toward version 2.0, we want to provide users with a better 
> experience with the existing configuration. In this FLIP, we outline several 
> general improvements to the current configuration:
>  * Ensure all the ConfigOptions are properly annotated
>  * Ensure all user-facing configurations are included in the documentation 
> generation process
>  * Make the existing ConfigOptions use the proper type
>  * Mark all internally used ConfigOptions with the @Internal annotation
>  
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-442%3A+General+Improvement+to+Configuration+for+Flink+2.0
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35359) General Improvement to Configuration for Flink 2.0

2024-05-23 Thread Xuannan Su (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuannan Su updated FLINK-35359:
---
Release Note: 
- The following configurations have been updated to the Duration type in a 
backward-compatible manner:: `client.heartbeat.interval`, 
`client.heartbeat.timeout`, `cluster.registration.error-delay`, 
`cluster.registration.initial-timeout`, `cluster.registration.max-timeout`, 
`cluster.registration.refused-registration-delay`, 
`cluster.services.shutdown-timeout`, `heartbeat.interval`, `heartbeat.timeout`, 
`high-availability.zookeeper.client.connection-timeout`, 
`high-availability.zookeeper.client.retry-wait`, 
`high-availability.zookeeper.client.session-timeout`, 
`historyserver.archive.fs.refresh-interval`, 
`historyserver.web.refresh-interval`, `metrics.fetcher.update-interval`, 
`metrics.latency.interval`, `metrics.reporter.influxdb.connectTimeout`, 
`metrics.reporter.influxdb.writeTimeout`, 
`metrics.system-resource-probing-interval`, `pekko.startup-timeout`, 
`pekko.tcp.timeout`, `resourcemanager.job.timeout`, 
`resourcemanager.standalone.start-up-time`, 
`resourcemanager.taskmanager-timeout`, `rest.await-leader-timeout`, 
`rest.connection-timeout`, `rest.idleness-timeout`, `rest.retry.delay`, 
`slot.idle.timeout`, `slot.request.timeout`, `task.cancellation.interval`, 
`task.cancellation.timeout`, `task.cancellation.timers.timeout`, 
`taskmanager.debug.memory.log-interval`, `web.refresh-interval`, `web.timeout`, 
`yarn.heartbeat.container-request-interval`, 
- The `taskmanager.network.compression.codec` and 
`table.optimizer.agg-phase-strategy` configurations have been updated to the 
Enum type in a backward-compatible manner.
- The `yarn.application-attempts` configuration has been updated to the Int 
type in a backward-compatible manner.

> General Improvement to Configuration for Flink 2.0
> --
>
> Key: FLINK-35359
> URL: https://issues.apache.org/jira/browse/FLINK-35359
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Reporter: Xuannan Su
>Assignee: Xuannan Su
>Priority: Major
>  Labels: pull-request-available
>
> As Flink moves toward version 2.0, we want to provide users with a better 
> experience with the existing configuration. In this FLIP, we outline several 
> general improvements to the current configuration:
>  * Ensure all the ConfigOptions are properly annotated
>  * Ensure all user-facing configurations are included in the documentation 
> generation process
>  * Make the existing ConfigOptions use the proper type
>  * Mark all internally used ConfigOptions with the @Internal annotation
>  
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-442%3A+General+Improvement+to+Configuration+for+Flink+2.0
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35359) General Improvement to Configuration for Flink 2.0

2024-05-15 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-35359:
--

 Summary: General Improvement to Configuration for Flink 2.0
 Key: FLINK-35359
 URL: https://issues.apache.org/jira/browse/FLINK-35359
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Configuration
Reporter: Xuannan Su


As Flink moves toward version 2.0, we want to provide users with a better 
experience with the existing configuration. In this FLIP, we outline several 
general improvements to the current configuration:
 * Ensure all the ConfigOptions are properly annotated

 * Ensure all user-facing configurations are included in the documentation 
generation process

 * Make the existing ConfigOptions use the proper type

 * Mark all internally used ConfigOptions with the @Internal annotation

 

https://cwiki.apache.org/confluence/display/FLINK/FLIP-442%3A+General+Improvement+to+Configuration+for+Flink+2.0

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35089) Two input AbstractStreamOperator may throw NPE when receiving RecordAttributes

2024-04-16 Thread Xuannan Su (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuannan Su updated FLINK-35089:
---
Description: 
Currently the `lastRecordAttributes1` and `lastRecordAttributes2` in the 
`AbstractStreamOperator` are transient. The two fields will be null when it is 
deserialized in TaskManager, which may cause an NPE.

To fix it, we will initialize the two fields in the {{setup}} method.

 

  was:
Currently the `lastRecordAttributes1` and `lastRecordAttributes2` in the 
`AbstractStreamOperator` are transient. The two fields will be null when it is 
deserialized in TaskManager, which may cause an NPE.

To fix it, we propose to make the RecordAttributes serializable and these 
fields non-transient.

 


> Two input AbstractStreamOperator may throw NPE when receiving RecordAttributes
> --
>
> Key: FLINK-35089
> URL: https://issues.apache.org/jira/browse/FLINK-35089
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.19.0
>Reporter: Xuannan Su
>Assignee: Xuannan Su
>Priority: Major
>  Labels: pull-request-available
>
> Currently the `lastRecordAttributes1` and `lastRecordAttributes2` in the 
> `AbstractStreamOperator` are transient. The two fields will be null when it 
> is deserialized in TaskManager, which may cause an NPE.
> To fix it, we will initialize the two fields in the {{setup}} method.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35089) Two input AbstractStreamOperator may throw NPE when receiving RecordAttributes

2024-04-11 Thread Xuannan Su (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuannan Su updated FLINK-35089:
---
Description: 
Currently the `lastRecordAttributes1` and `lastRecordAttributes2` in the 
`AbstractStreamOperator` are transient. The two fields will be null when it is 
deserialized in TaskManager, which may cause an NPE.

To fix it, we propose to make the RecordAttributes serializable and these 
fields non-transient.

 

  was:
Currently the `lastRecordAttributes1` and `lastRecordAttributes2` in the 
`AbstractStreamOperator` are transient. The two fields will be null when it is 
deserialized in TaskManager, which may cause an NPE.

To fix it, we propose to make the RecordAttributes serialization and these 
fields non-transient.

 


> Two input AbstractStreamOperator may throw NPE when receiving RecordAttributes
> --
>
> Key: FLINK-35089
> URL: https://issues.apache.org/jira/browse/FLINK-35089
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.19.0
>Reporter: Xuannan Su
>Priority: Major
>
> Currently the `lastRecordAttributes1` and `lastRecordAttributes2` in the 
> `AbstractStreamOperator` are transient. The two fields will be null when it 
> is deserialized in TaskManager, which may cause an NPE.
> To fix it, we propose to make the RecordAttributes serializable and these 
> fields non-transient.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35089) Two input AbstractStreamOperator may throw NPE when receiving RecordAttributes

2024-04-11 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-35089:
--

 Summary: Two input AbstractStreamOperator may throw NPE when 
receiving RecordAttributes
 Key: FLINK-35089
 URL: https://issues.apache.org/jira/browse/FLINK-35089
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Affects Versions: 1.19.0
Reporter: Xuannan Su


Currently the `lastRecordAttributes1` and `lastRecordAttributes2` in the 
`AbstractStreamOperator` are transient. The two fields will be null when it is 
deserialized in TaskManager, which may cause an NPE.

To fix it, we propose to make the RecordAttributes serialization and these 
fields non-transient.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34297) Release Testing Instructions: Verify FLINK-34079 Migrate string configuration key to ConfigOption

2024-01-31 Thread Xuannan Su (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17812649#comment-17812649
 ] 

Xuannan Su commented on FLINK-34297:


[~lincoln.86xy] This change doesn't need crossteam testing. I think we can 
close the ticket.

> Release Testing Instructions: Verify FLINK-34079 Migrate string configuration 
> key to ConfigOption
> -
>
> Key: FLINK-34297
> URL: https://issues.apache.org/jira/browse/FLINK-34297
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Configuration
>Affects Versions: 1.19.0
>Reporter: lincoln lee
>Assignee: Xuannan Su
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34085) Remove deprecated string configuration keys in Flink 2.0

2024-01-14 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-34085:
--

 Summary: Remove deprecated string configuration keys in Flink 2.0
 Key: FLINK-34085
 URL: https://issues.apache.org/jira/browse/FLINK-34085
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Configuration
Reporter: Xuannan Su
 Fix For: 2.0.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34084) Deprecate unused configuration in BinaryInput/OutputFormat and FileInput/OutputFormat

2024-01-14 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-34084:
--

 Summary: Deprecate unused configuration in 
BinaryInput/OutputFormat and FileInput/OutputFormat
 Key: FLINK-34084
 URL: https://issues.apache.org/jira/browse/FLINK-34084
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Configuration
Reporter: Xuannan Su
 Fix For: 1.19.0


Update FileInputFormat.java, FileOutputFormat.java, BinaryInputFormat.java, and 
BinaryOutputFormat.java to deprecate unused string configuration keys.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34083) Deprecate string configuration keys and unused constants in ConfigConstants

2024-01-14 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-34083:
--

 Summary: Deprecate string configuration keys and unused constants 
in ConfigConstants
 Key: FLINK-34083
 URL: https://issues.apache.org/jira/browse/FLINK-34083
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Configuration
Reporter: Xuannan Su
 Fix For: 1.19.0


* Update ConfigConstants.java to deprecate and replace string configuration keys
 * Mark unused constants in ConfigConstants.java as deprecated



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33890) Determine the initial status before receiving the first RecordAttributes

2023-12-19 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-33890:
--

 Summary: Determine the initial status before receiving the first 
RecordAttributes 
 Key: FLINK-33890
 URL: https://issues.apache.org/jira/browse/FLINK-33890
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Task
Reporter: Xuannan Su


Currently, all the operators are initialized with non-backlog mode. Ideally, we 
should determine the initial status before receiving the first 
{{RecordAttributes}} so that we don't have to initialize the operator in 
non-backlog mode and immediately switch to backlog mode before processing any 
records.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33810) Propagate RecordAttributes that contains isProcessingBacklog status

2023-12-12 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-33810:
--

 Summary: Propagate RecordAttributes that contains 
isProcessingBacklog status
 Key: FLINK-33810
 URL: https://issues.apache.org/jira/browse/FLINK-33810
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Task
Reporter: Xuannan Su






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33399) Support switching from batch to stream mode for KeyedCoProcessOperator and IntervalJoinOperator

2023-10-30 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-33399:
--

 Summary: Support switching from batch to stream mode for 
KeyedCoProcessOperator and IntervalJoinOperator
 Key: FLINK-33399
 URL: https://issues.apache.org/jira/browse/FLINK-33399
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Task
Reporter: Xuannan Su


Support switching from batch to stream mode for KeyedCoProcessOperator and 
IntervalJoinOperator.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33398) Support switching from batch to stream mode for one input stream operator

2023-10-30 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-33398:
--

 Summary: Support switching from batch to stream mode for one input 
stream operator
 Key: FLINK-33398
 URL: https://issues.apache.org/jira/browse/FLINK-33398
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Task
Reporter: Xuannan Su


Introduce the infra to support switching from batch to stream mode for one 
input stream operator.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33184) HybridShuffleITCase fails with exception in resource cleanup of task Map on AZP

2023-10-16 Thread Xuannan Su (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17775983#comment-17775983
 ] 

Xuannan Su commented on FLINK-33184:


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=53752=logs=5c8e7682-d68f-54d1-16a2-a09310218a49=86f654fa-ab48-5c1a-25f4-7e7f6afb9bba=9031

> HybridShuffleITCase fails with exception in resource cleanup of task Map on 
> AZP
> ---
>
> Key: FLINK-33184
> URL: https://issues.apache.org/jira/browse/FLINK-33184
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.19.0
>Reporter: Sergey Nuyanzin
>Priority: Critical
>  Labels: test-stability
>
> This build fails 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=53548=logs=baf26b34-3c6a-54e8-f93f-cf269b32f802=8c9d126d-57d2-5a9e-a8c8-ff53f7b35cd9=8710
> {noformat} 
> Map (5/10)#0] ERROR org.apache.flink.runtime.taskmanager.Task 
>[] - FATAL - exception in resource cleanup of task Map (5/10)#0 
> (159f887fbd200ea7cfa4aaeb1127c4ab_0a448493b4782967b150582570326227_4_0)
> .
> java.lang.IllegalStateException: Leaking buffers.
> at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) 
> ~[flink-core-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at 
> org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManagerImpl.release(TieredStorageMemoryManagerImpl.java:236)
>  ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at java.util.ArrayList.forEach(ArrayList.java:1259) ~[?:1.8.0_292]
> at 
> org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry.clearResourceFor(TieredStorageResourceRegistry.java:59)
>  ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at 
> org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredResultPartition.releaseInternal(TieredResultPartition.java:195)
>  ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.release(ResultPartition.java:262)
>  ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at 
> org.apache.flink.runtime.io.network.partition.ResultPartitionManager.releasePartition(ResultPartitionManager.java:88)
>  ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.fail(ResultPartition.java:284)
>  ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at 
> org.apache.flink.runtime.taskmanager.Task.failAllResultPartitions(Task.java:1004)
>  ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at 
> org.apache.flink.runtime.taskmanager.Task.releaseResources(Task.java:990) 
> ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:838) 
> [flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) 
> [flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]
> 01:17:22,375 [flink-pekko.actor.default-dispatcher-5] INFO  
> org.apache.flink.runtime.taskmanager.Task[] - Task Sink: 
> Unnamed (3/10)#0 is already in state CANCELING
> 01:17:22,375 [Map (5/10)#0] ERROR 
> org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - FATAL - 
> exception in resource cleanup of task Map (5/10)#0 
> (159f887fbd200ea7cfa4aaeb1127c4ab_0a448493b4782967b150582570326227_4_0)
> .
> java.lang.IllegalStateException: Leaking buffers.
> at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) 
> ~[flink-core-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at 
> org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManagerImpl.release(TieredStorageMemoryManagerImpl.java:236)
>  ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at java.util.ArrayList.forEach(ArrayList.java:1259) ~[?:1.8.0_292]
> at 
> org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry.clearResourceFor(TieredStorageResourceRegistry.java:59)
>  ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at 
> org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredResultPartition.releaseInternal(TieredResultPartition.java:195)
>  ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.release(ResultPartition.java:262)
>  ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
> at 
> 

[jira] [Created] (FLINK-33202) FLIP-327: Support switching from batch to stream mode to improve throughput when processing backlog data

2023-10-07 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-33202:
--

 Summary: FLIP-327: Support switching from batch to stream mode to 
improve throughput when processing backlog data
 Key: FLINK-33202
 URL: https://issues.apache.org/jira/browse/FLINK-33202
 Project: Flink
  Issue Type: New Feature
  Components: Runtime / Task
Reporter: Xuannan Su
 Fix For: 1.19.0


Umbrella issue for 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+switching+from+batch+to+stream+mode+to+improve+throughput+when+processing+backlog+data|https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog]
h4.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32476) Support configuring object-reuse for internal operators

2023-06-28 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-32476:
--

 Summary: Support configuring object-reuse for internal operators
 Key: FLINK-32476
 URL: https://issues.apache.org/jira/browse/FLINK-32476
 Project: Flink
  Issue Type: New Feature
  Components: Runtime / Task
Reporter: Xuannan Su


Currently, object reuse is disabled by default for streaming jobs in order to 
prevent unexpected behavior. Object reuse becomes problematic when the upstream 
operator stores its output while the downstream operator modifies the input.

However, many operators implemented by Flink, such as Flink SQL operators, do 
not modify the input. This implies that it is safe to reuse the input object in 
such cases. Therefore, we intend to enable object reuse specifically for 
operators that do not modify the input.

As the first step, we will focus on the operators implemented within Flink. We 
will create the FLIP to introduce the API that allows user-defined operators to 
enable object reuse in the future.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32008) Protobuf format throws exception with Map datatype

2023-05-05 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-32008:
--

 Summary: Protobuf format throws exception with Map datatype
 Key: FLINK-32008
 URL: https://issues.apache.org/jira/browse/FLINK-32008
 Project: Flink
  Issue Type: Bug
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.17.0
Reporter: Xuannan Su
 Attachments: flink-protobuf-example.zip

The protobuf format throws exception when working with Map data type. I 
uploaded a example project to reproduce the problem.

 
{code:java}
Caused by: java.lang.RuntimeException: One or more fetchers have encountered 
exception
    at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:261)
    at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
    at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:131)
    at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:417)
    at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
    at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received 
unexpected exception while polling the records
    at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
    at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
Caused by: java.io.IOException: Failed to deserialize PB object.
    at 
org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.deserialize(PbRowDataDeserializationSchema.java:75)
    at 
org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.deserialize(PbRowDataDeserializationSchema.java:42)
    at 
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
    at 
org.apache.flink.connector.file.table.DeserializationSchemaAdapter$LineBytesInputFormat.readRecord(DeserializationSchemaAdapter.java:197)
    at 
org.apache.flink.connector.file.table.DeserializationSchemaAdapter$LineBytesInputFormat.nextRecord(DeserializationSchemaAdapter.java:210)
    at 
org.apache.flink.connector.file.table.DeserializationSchemaAdapter$Reader.readBatch(DeserializationSchemaAdapter.java:124)
    at 
org.apache.flink.connector.file.src.util.RecordMapperWrapperRecordIterator$1.readBatch(RecordMapperWrapperRecordIterator.java:82)
    at 
org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:67)
    at 
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
    at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162)
    ... 6 more
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at 
org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.convertProtoBinaryToRow(ProtoToRowConverter.java:129)
    at 
org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.deserialize(PbRowDataDeserializationSchema.java:70)
    ... 15 more
Caused by: com.google.protobuf.InvalidProtocolBufferException: While parsing a 
protocol message, the input ended unexpectedly in the middle of a field.  This 
could mean either that the input has been truncated or that an embedded 

[jira] [Created] (FLINK-31944) Protobuf format throw com.google.protobuf.InvalidProtocolBufferException

2023-04-26 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-31944:
--

 Summary: Protobuf format throw 
com.google.protobuf.InvalidProtocolBufferException
 Key: FLINK-31944
 URL: https://issues.apache.org/jira/browse/FLINK-31944
 Project: Flink
  Issue Type: Bug
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.17.0
Reporter: Xuannan Su
 Attachments: flink-protobuf-example.zip

It seems that protobuf format throws the following exception when the first 
field of the message is string type. This may also occur for other types. I 
uploaded the maven project to reproduce the problem.

 
{code:java}
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received 
unexpected exception while polling the records
    at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
    at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
Caused by: java.io.IOException: Failed to deserialize PB object.
    at 
org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.deserialize(PbRowDataDeserializationSchema.java:75)
    at 
org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.deserialize(PbRowDataDeserializationSchema.java:42)
    at 
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
    at 
org.apache.flink.connector.file.table.DeserializationSchemaAdapter$LineBytesInputFormat.readRecord(DeserializationSchemaAdapter.java:197)
    at 
org.apache.flink.connector.file.table.DeserializationSchemaAdapter$LineBytesInputFormat.nextRecord(DeserializationSchemaAdapter.java:210)
    at 
org.apache.flink.connector.file.table.DeserializationSchemaAdapter$Reader.readBatch(DeserializationSchemaAdapter.java:124)
    at 
org.apache.flink.connector.file.src.util.RecordMapperWrapperRecordIterator$1.readBatch(RecordMapperWrapperRecordIterator.java:82)
    at 
org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:67)
    at 
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
    at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162)
    ... 6 more
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at 
org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.convertProtoBinaryToRow(ProtoToRowConverter.java:129)
    at 
org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.deserialize(PbRowDataDeserializationSchema.java:70)
    ... 15 more
Caused by: com.google.protobuf.InvalidProtocolBufferException: Protocol message 
contained an invalid tag (zero).
    at 
com.google.protobuf.InvalidProtocolBufferException.invalidTag(InvalidProtocolBufferException.java:133)
    at 
com.google.protobuf.CodedInputStream$ArrayDecoder.readTag(CodedInputStream.java:633)
    at com.example.proto.Message.(Message.java:47)
    at com.example.proto.Message.(Message.java:9)
    at com.example.proto.Message$1.parsePartialFrom(Message.java:540)
    at com.example.proto.Message$1.parsePartialFrom(Message.java:534)
    at 
com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:158)
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:191)
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:203)
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:208)
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:48)
    at com.example.proto.Message.parseFrom(Message.java:218)
    ... 21 more {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31943) Multiple t_env cause class loading problem

2023-04-25 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-31943:
--

 Summary: Multiple t_env cause class loading problem
 Key: FLINK-31943
 URL: https://issues.apache.org/jira/browse/FLINK-31943
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.17.0
Reporter: Xuannan Su
 Attachments: flink-sql-connector-kafka-1.17.0.jar, 
pyflink_classloader.py

When a PyFlink process creates multiple StreamTableEnvironment with different 
EnvironmentSettings and sets the "pipeline.jars" at the first created t_env, it 
appears that the jar is not added to the classloader of the first created t_env.

 

After digging a little bit, the reason may be that when creating the second 
table environment with a new EnvironmentSettings, the context classloader 
overwrites by a new classloader, see `EnvironmentSettings.Builder.build` method.

 

I uploaded the script to reproduce the problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30607) Table.to_pandas doesn't support Map type

2023-03-28 Thread Xuannan Su (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30607?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17706208#comment-17706208
 ] 

Xuannan Su commented on FLINK-30607:


[~dianfu] Thanks for the patch! This is very useful in our use case. May I ask 
if we have a plan to backport this to version 1.16?

> Table.to_pandas doesn't support Map type
> 
>
> Key: FLINK-30607
> URL: https://issues.apache.org/jira/browse/FLINK-30607
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.15.3
>Reporter: Xuannan Su
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> It seems that the Table#to_pandas method in PyFlink doesn't support Map type. 
> It throws the following exception.
> {code:java}
> py4j.protocol.Py4JJavaError: An error occurred while calling 
> z:org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame.
> : java.lang.UnsupportedOperationException: Python vectorized UDF doesn't 
> support logical type MAP currently.
>     at 
> org.apache.flink.table.runtime.arrow.ArrowUtils$LogicalTypeToArrowTypeConverter.defaultMethod(ArrowUtils.java:743)
>     at 
> org.apache.flink.table.runtime.arrow.ArrowUtils$LogicalTypeToArrowTypeConverter.defaultMethod(ArrowUtils.java:617)
>     at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:167)
>     at org.apache.flink.table.types.logical.MapType.accept(MapType.java:115)
>     at 
> org.apache.flink.table.runtime.arrow.ArrowUtils.toArrowField(ArrowUtils.java:189)
>     at 
> org.apache.flink.table.runtime.arrow.ArrowUtils.lambda$toArrowSchema$0(ArrowUtils.java:180)
>     at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>     at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
>     at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>     at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>     at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>     at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>     at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
>     at 
> org.apache.flink.table.runtime.arrow.ArrowUtils.toArrowSchema(ArrowUtils.java:181)
>     at 
> org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame(ArrowUtils.java:483)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>     at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>     at 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>     at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>     at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>     at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>     at java.lang.Thread.run(Thread.java:748) {code}
> This can be reproduced with the following code.
> {code:java}
> env = StreamExecutionEnvironment.get_execution_environment()
> t_env = StreamTableEnvironment.create(env)
> table = t_env.from_descriptor(
> TableDescriptor.for_connector("datagen")
> .schema(
> Schema.new_builder()
> .column("val", DataTypes.MAP(DataTypes.INT(), DataTypes.INT()))
> .build()
> )
> .option("number-of-rows", "10")
> .build()
> )
> df = table.to_pandas()
> print(df) {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-24456) Support bounded offset in the Kafka table connector

2023-03-28 Thread Xuannan Su (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17706184#comment-17706184
 ] 

Xuannan Su commented on FLINK-24456:


Thanks for the effort! This is a particularly useful feature in our use case. 
May I ask if we plan to backport this patch to 1.16?

> Support bounded offset in the Kafka table connector
> ---
>
> Key: FLINK-24456
> URL: https://issues.apache.org/jira/browse/FLINK-24456
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka, Table SQL / Ecosystem
>Reporter: Haohui Mai
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.17.0
>
>
> The {{setBounded}} API in the DataStream connector of Kafka is particularly 
> useful when writing tests. Unfortunately the table connector of Kafka lacks 
> the same API.
> It would be good to have this API added.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30607) Table.to_pandas doesn't support Map type

2023-01-09 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-30607:
--

 Summary: Table.to_pandas doesn't support Map type
 Key: FLINK-30607
 URL: https://issues.apache.org/jira/browse/FLINK-30607
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.15.3
Reporter: Xuannan Su


It seems that the Table#to_pandas method in PyFlink doesn't support Map type. 
It throws the following exception.
{code:java}
py4j.protocol.Py4JJavaError: An error occurred while calling 
z:org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame.
: java.lang.UnsupportedOperationException: Python vectorized UDF doesn't 
support logical type MAP currently.
    at 
org.apache.flink.table.runtime.arrow.ArrowUtils$LogicalTypeToArrowTypeConverter.defaultMethod(ArrowUtils.java:743)
    at 
org.apache.flink.table.runtime.arrow.ArrowUtils$LogicalTypeToArrowTypeConverter.defaultMethod(ArrowUtils.java:617)
    at 
org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:167)
    at org.apache.flink.table.types.logical.MapType.accept(MapType.java:115)
    at 
org.apache.flink.table.runtime.arrow.ArrowUtils.toArrowField(ArrowUtils.java:189)
    at 
org.apache.flink.table.runtime.arrow.ArrowUtils.lambda$toArrowSchema$0(ArrowUtils.java:180)
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
    at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
    at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
    at 
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
    at 
org.apache.flink.table.runtime.arrow.ArrowUtils.toArrowSchema(ArrowUtils.java:181)
    at 
org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame(ArrowUtils.java:483)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
    at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
    at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748) {code}
This can be reproduced with the following code.
{code:java}
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
table = t_env.from_descriptor(
TableDescriptor.for_connector("datagen")
.schema(
Schema.new_builder()
.column("val", DataTypes.MAP(DataTypes.INT(), DataTypes.INT()))
.build()
)
.option("number-of-rows", "10")
.build()
)
df = table.to_pandas()
print(df) {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30258) PyFlink supports closing loopback server

2022-12-01 Thread Xuannan Su (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuannan Su updated FLINK-30258:
---
Summary: PyFlink supports closing loopback server  (was: PyFlink supports 
closing loop back server)

> PyFlink supports closing loopback server
> 
>
> Key: FLINK-30258
> URL: https://issues.apache.org/jira/browse/FLINK-30258
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Affects Versions: 1.16.0
>Reporter: Xuannan Su
>Priority: Major
>
> Currently, a loopback server will be started whenever a 
> StreamExecutionEnvironment or StreamTableEnvironment is created. The loopback 
> server can only be closed after the process exit. This might not be a problem 
> for regular uses where only one environment object is used.
> However, when running tests, such as the unit tests for PyFlink itself, as 
> the environment objects are created, the process starts more and more 
> loopback servers and takes more and more resources.
> Therefore, we want to support closing the loopback server.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30258) PyFlink supports closing loop back server

2022-12-01 Thread Xuannan Su (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuannan Su updated FLINK-30258:
---
Description: 
Currently, a loopback server will be started whenever a 
StreamExecutionEnvironment or StreamTableEnvironment is created. The loopback 
server can only be closed after the process exit. This might not be a problem 
for regular uses where only one environment object is used.

However, when running tests, such as the unit tests for PyFlink itself, as the 
environment objects are created, the process starts more and more loopback 
servers and takes more and more resources.

Therefore, we want to support closing the loopback server.

  was:
Currently, a loopback server will be started whenever a 
StreamExecutionEnvironment or StreamTableEnvironment is created. The loopback 
server can only be closed after the process exit. This might not be a problem 
for regular uses where only one environment object is used.

However, when running tests, such as the unit tests for PyFlink itself, as the 
environment objects are created, the process starts more and more loopback 
servers and takes more and more resources.


> PyFlink supports closing loop back server
> -
>
> Key: FLINK-30258
> URL: https://issues.apache.org/jira/browse/FLINK-30258
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Affects Versions: 1.16.0
>Reporter: Xuannan Su
>Priority: Major
>
> Currently, a loopback server will be started whenever a 
> StreamExecutionEnvironment or StreamTableEnvironment is created. The loopback 
> server can only be closed after the process exit. This might not be a problem 
> for regular uses where only one environment object is used.
> However, when running tests, such as the unit tests for PyFlink itself, as 
> the environment objects are created, the process starts more and more 
> loopback servers and takes more and more resources.
> Therefore, we want to support closing the loopback server.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30258) PyFlink supports closing loop back server

2022-12-01 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-30258:
--

 Summary: PyFlink supports closing loop back server
 Key: FLINK-30258
 URL: https://issues.apache.org/jira/browse/FLINK-30258
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Affects Versions: 1.16.0
Reporter: Xuannan Su


Currently, a loopback server will be started whenever a 
StreamExecutionEnvironment or StreamTableEnvironment is created. The loopback 
server can only be closed after the process exit. This might not be a problem 
for regular uses where only one environment object is used.

However, when running tests, such as the unit tests for PyFlink itself, as the 
environment objects are created, the process starts more and more loopback 
servers and takes more and more resources.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30078) Temporal join should finish when the left table is bounded and finished

2022-11-17 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-30078:
--

 Summary: Temporal join should finish when the left table is 
bounded and finished
 Key: FLINK-30078
 URL: https://issues.apache.org/jira/browse/FLINK-30078
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Runtime
Affects Versions: 1.16.0
Reporter: Xuannan Su


Currently, temporal join with a bounded left table and an unbounded right table 
keeps running even when the left table is finished.

>From the user's perspective, when the left table is finished, the result of 
>the temporal join is finalized so the temporal join should finish and the job 
>may also finish depending on the job topology.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-19200) UNIX_TIMESTAMP function support return in millisecond

2022-11-10 Thread Xuannan Su (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17632108#comment-17632108
 ] 

Xuannan Su commented on FLINK-19200:


Any update on this issue? I am also looking for a way to convert timestamp to 
millisecond epoch.

> UNIX_TIMESTAMP function support return in millisecond
> -
>
> Key: FLINK-19200
> URL: https://issues.apache.org/jira/browse/FLINK-19200
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: leslieyuan
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> Now i use Flink1.10.0, i found that:
> time = "2020-09-11 13:14:29.153"
> UNIX_TIMESTAMP(time)  return 1599801269
> UNIX_TIMESTAMP(time, '-MM-dd HH:mm:ss.SSS')  also return 1599801269
> Yes, i see the Official website description, this function return in seconds, 
> but i think if i had given the format as above, which means that i need the  
> millisecond.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29951) Kafka SQL connector supports setting end offset/timestamp

2022-11-08 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-29951:
--

 Summary: Kafka SQL connector supports setting end offset/timestamp
 Key: FLINK-29951
 URL: https://issues.apache.org/jira/browse/FLINK-29951
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Affects Versions: 1.17.0
Reporter: Xuannan Su


Currently, KafkaSource at DataStream API allows specifying end offset/timestamp 
by `KafkaSourceBuilder#setBounded`, while Kafka SQL connector has no way to do 
that.

To better align the functionality with DataStream and support bounded stream, 
we want to support setting end offset/timestamp in Kafka SQL connector.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29753) FileSource throws exception reading file with name that ends with xz

2022-10-25 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-29753:
--

 Summary: FileSource throws exception reading file with name that 
ends with xz 
 Key: FLINK-29753
 URL: https://issues.apache.org/jira/browse/FLINK-29753
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem
Affects Versions: 1.15.2
Reporter: Xuannan Su


FileSource throws the following exception reading file with a name that ends 
with xz
{code:java}
Caused by: java.lang.RuntimeException: One or more fetchers have encountered 
exception
    at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
    at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
    at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
    at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
    at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
    at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoClassDefFoundError: org/tukaani/xz/XZInputStream
    at 
org.apache.flink.api.common.io.compression.XZInputStreamFactory.create(XZInputStreamFactory.java:42)
    at 
org.apache.flink.api.common.io.compression.XZInputStreamFactory.create(XZInputStreamFactory.java:31)
    at 
org.apache.flink.connector.file.src.impl.StreamFormatAdapter.lambda$openStream$3(StreamFormatAdapter.java:178)
    at 
org.apache.flink.connector.file.src.util.Utils.doWithCleanupOnException(Utils.java:45)
    at 
org.apache.flink.connector.file.src.impl.StreamFormatAdapter.openStream(StreamFormatAdapter.java:172)
    at 
org.apache.flink.connector.file.src.impl.StreamFormatAdapter.createReader(StreamFormatAdapter.java:70)
    at 
org.apache.flink.connector.file.src.impl.FileSourceSplitReader.checkSplitOrStartNext(FileSourceSplitReader.java:112)
    at 
org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:65)
    at 
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
    at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
    at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
Caused by: java.lang.ClassNotFoundException: org.tukaani.xz.XZInputStream
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 16 more {code}
The code to reproduce the error:
{code:java}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

final FileSource source =
FileSource.forRecordStreamFormat(new TextLineInputFormat(), new 
Path("/tmp/abcxz"))
.build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "source").print();
env.execute();
} {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29339) JobMasterPartitionTrackerImpl#requestShuffleDescriptorsFromResourceManager blocks main thread

2022-09-20 Thread Xuannan Su (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17606950#comment-17606950
 ] 

Xuannan Su commented on FLINK-29339:


[~gaoyunhaii] I will take a look. Could you assign the ticket to me?

> JobMasterPartitionTrackerImpl#requestShuffleDescriptorsFromResourceManager 
> blocks main thread
> -
>
> Key: FLINK-29339
> URL: https://issues.apache.org/jira/browse/FLINK-29339
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.16.0
>Reporter: Chesnay Schepler
>Priority: Critical
>
> {code:java}
> private List requestShuffleDescriptorsFromResourceManager(
> IntermediateDataSetID intermediateDataSetID) {
> Preconditions.checkNotNull(
> resourceManagerGateway, "JobMaster is not connected to 
> ResourceManager");
> try {
> return this.resourceManagerGateway
> .getClusterPartitionsShuffleDescriptors(intermediateDataSetID)
> .get(); // <-- there's your problem
> } catch (Throwable e) {
> throw new RuntimeException(
> String.format(
> "Failed to get shuffle descriptors of intermediate 
> dataset %s from ResourceManager",
> intermediateDataSetID),
> e);
> }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29231) PyFlink udtaf produces different results in the same sliding window

2022-09-08 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-29231:
--

 Summary: PyFlink udtaf produces different results in the same 
sliding window
 Key: FLINK-29231
 URL: https://issues.apache.org/jira/browse/FLINK-29231
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.15.2
Reporter: Xuannan Su
 Attachments: image-2022-09-08-17-20-06-296.png, input, test_agg.py

It seems that PyFlink udtaf produces different results in the same sliding 
window. It can be reproduced with the given code and input. It is not always 
happening but the possibility is relatively high.

The incorrect output is the following:

!image-2022-09-08-17-20-06-296.png!

 

We can see that the output contains different `val_sum` at `window_time` 
2022-01-01 00:01:59.999.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-28964) Release Testing: Verify FLIP-205 Cache in DataStream for Batch Processing

2022-08-30 Thread Xuannan Su (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuannan Su resolved FLINK-28964.

Resolution: Fixed

> Release Testing: Verify FLIP-205 Cache in DataStream for Batch Processing
> -
>
> Key: FLINK-28964
> URL: https://issues.apache.org/jira/browse/FLINK-28964
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataStream
>Reporter: Xuannan Su
>Assignee: jiaxiong
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.16.0
>
>
> DataStream API provides the `cache` method to cache the result of a 
> DataStream and reuse it in later jobs with batch execution mode.
> I think we should verify:
>  # Follow the doc to write a Flink job that produces cache and a job that 
> consumes cache and submit it to a session cluster(standalone or yarn).
>  # You can remove the source physically after the cache-producing job is 
> finished to verify that the cache-consuming job is not reading from the 
> source. For example, delete the file in the filesystem if you are using a 
> file source. 
>  # You can restart the TaskManager after the cache-producing job is finished 
> to verify that the cache-consuming job will re-compute the result.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28964) Release Testing: Verify FLIP-205 Cache in DataStream for Batch Processing

2022-08-29 Thread Xuannan Su (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17597462#comment-17597462
 ] 

Xuannan Su commented on FLINK-28964:


Thank [~jessionX] for your testing. I think you have run the test successfully. 
If there are no more problems till tomorrow, I'd like to close the ticket.

> Release Testing: Verify FLIP-205 Cache in DataStream for Batch Processing
> -
>
> Key: FLINK-28964
> URL: https://issues.apache.org/jira/browse/FLINK-28964
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataStream
>Reporter: Xuannan Su
>Assignee: jiaxiong
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.16.0
>
>
> DataStream API provides the `cache` method to cache the result of a 
> DataStream and reuse it in later jobs with batch execution mode.
> I think we should verify:
>  # Follow the doc to write a Flink job that produces cache and a job that 
> consumes cache and submit it to a session cluster(standalone or yarn).
>  # You can remove the source physically after the cache-producing job is 
> finished to verify that the cache-consuming job is not reading from the 
> source. For example, delete the file in the filesystem if you are using a 
> file source. 
>  # You can restart the TaskManager after the cache-producing job is finished 
> to verify that the cache-consuming job will re-compute the result.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28988) Incorrect result for filter after temporal join

2022-08-21 Thread Xuannan Su (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuannan Su updated FLINK-28988:
---
Description: 
The following code can reproduce the case

 
{code:java}
public class TemporalJoinSQLExample1 {

public static void main(String[] args) throws Exception {

// set up the Java DataStream API
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

// set up the Java Table API
final StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env);

final DataStreamSource> ds =
env.fromElements(
new Tuple3<>(0, "online", Instant.ofEpochMilli(0)),
new Tuple3<>(0, "offline", Instant.ofEpochMilli(10)),
new Tuple3<>(0, "online", Instant.ofEpochMilli(20)));

final Table table =
tableEnv.fromDataStream(
ds,
Schema.newBuilder()
.column("f0", DataTypes.INT())
.column("f1", DataTypes.STRING())
.column("f2", 
DataTypes.TIMESTAMP_LTZ(3))
.watermark("f2", "f2 - INTERVAL '2' 
SECONDS")
.build())
.as("id", "state", "ts");
tableEnv.createTemporaryView("source_table", table);
final Table dedupeTable =
tableEnv.sqlQuery(
"SELECT * FROM ("
+ " SELECT *, ROW_NUMBER() OVER (PARTITION BY 
id ORDER BY ts DESC) AS row_num FROM source_table"
+ ") WHERE row_num = 1");
tableEnv.createTemporaryView("versioned_table", dedupeTable);

DataStreamSource> event =
env.fromElements(
new Tuple2<>(0, Instant.ofEpochMilli(0)),
new Tuple2<>(0, Instant.ofEpochMilli(5)),
new Tuple2<>(0, Instant.ofEpochMilli(10)),
new Tuple2<>(0, Instant.ofEpochMilli(15)),
new Tuple2<>(0, Instant.ofEpochMilli(20)),
new Tuple2<>(0, Instant.ofEpochMilli(25)));

final Table eventTable =
tableEnv.fromDataStream(
event,
Schema.newBuilder()
.column("f0", DataTypes.INT())
.column("f1", 
DataTypes.TIMESTAMP_LTZ(3))
.watermark("f1", "f1 - INTERVAL '2' 
SECONDS")
.build())
.as("id", "ts");

tableEnv.createTemporaryView("event_table", eventTable);

final Table result =
tableEnv.sqlQuery(
"SELECT * FROM event_table"
+ " LEFT JOIN versioned_table FOR SYSTEM_TIME 
AS OF event_table.ts"
+ " ON event_table.id = versioned_table.id");
result.execute().print();

result.filter($("state").isEqual("online")).execute().print();
}
} {code}
 

The result of temporal join is the following:
|op|         id|                     ts|        id0|                         
state|                    ts0|             row_num|
|+I|          0|1970-01-01 08:00:00.000|          0|                        
online|1970-01-01 08:00:00.000|                   1|
|+I|          0|1970-01-01 08:00:00.005|          0|                        
online|1970-01-01 08:00:00.000|                   1|
|+I|          0|1970-01-01 08:00:00.010|          0|                       
offline|1970-01-01 08:00:00.010|                   1|
|+I|          0|1970-01-01 08:00:00.015|          0|                       
offline|1970-01-01 08:00:00.010|                   1|
|+I|          0|1970-01-01 08:00:00.020|          0|                        
online|1970-01-01 08:00:00.020|                   1|
|+I|          0|1970-01-01 08:00:00.025|          0|                        
online|1970-01-01 08:00:00.020|                   1|

 

After filtering with predicate state = 'online', I expect only the two rows 
with state offline will be filtered out. But I got the following result:

|op|         id|                     ts|        id0|                         
state|                    ts0|             row_num|
|+I|          0|1970-01-01 08:00:00.020|          0|                        
online|1970-01-01 08:00:00.020|                   1|
|+I|          0|1970-01-01 08:00:00.025|          0|                        
online|1970-01-01 08:00:00.020|                   1|

 
 
 

 

  was:
The following code can reproduce the case

 
{code:java}
public class 

[jira] [Created] (FLINK-28988) Incorrect result for filter after temporal join

2022-08-16 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-28988:
--

 Summary: Incorrect result for filter after temporal join
 Key: FLINK-28988
 URL: https://issues.apache.org/jira/browse/FLINK-28988
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.15.1
Reporter: Xuannan Su


The following code can reproduce the case

 
{code:java}
public class TemporalJoinSQLExample1 {

public static void main(String[] args) throws Exception {

// set up the Java DataStream API
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

// set up the Java Table API
final StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env);

final DataStreamSource> ds =
env.fromElements(
new Tuple3<>(0, "online", Instant.ofEpochMilli(0)),
new Tuple3<>(0, "offline", Instant.ofEpochMilli(10)),
new Tuple3<>(0, "online", Instant.ofEpochMilli(20)));

final Table table =
tableEnv.fromDataStream(
ds,
Schema.newBuilder()
.column("f0", DataTypes.INT())
.column("f1", DataTypes.STRING())
.column("f2", 
DataTypes.TIMESTAMP_LTZ(3))
.watermark("f2", "f2 - INTERVAL '2' 
SECONDS")
.build())
.as("id", "state", "ts");
tableEnv.createTemporaryView("source_table", table);
final Table dedupeTable =
tableEnv.sqlQuery(
"SELECT * FROM ("
+ " SELECT *, ROW_NUMBER() OVER (PARTITION BY 
id ORDER BY ts DESC) AS row_num FROM source_table"
+ ") WHERE row_num = 1");
tableEnv.createTemporaryView("versioned_table", dedupeTable);

DataStreamSource> event =
env.fromElements(
new Tuple2<>(0, Instant.ofEpochMilli(0)),
new Tuple2<>(0, Instant.ofEpochMilli(5)),
new Tuple2<>(0, Instant.ofEpochMilli(10)),
new Tuple2<>(0, Instant.ofEpochMilli(15)),
new Tuple2<>(0, Instant.ofEpochMilli(20)),
new Tuple2<>(0, Instant.ofEpochMilli(25)));

final Table eventTable =
tableEnv.fromDataStream(
event,
Schema.newBuilder()
.column("f0", DataTypes.INT())
.column("f1", 
DataTypes.TIMESTAMP_LTZ(3))
.watermark("f1", "f1 - INTERVAL '2' 
SECONDS")
.build())
.as("id", "ts");

tableEnv.createTemporaryView("event_table", eventTable);

final Table result =
tableEnv.sqlQuery(
"SELECT * FROM event_table"
+ " LEFT JOIN versioned_table FOR SYSTEM_TIME 
AS OF event_table.ts"
+ " ON event_table.id = versioned_table.id");
result.execute().print();

result.filter($("state").isEqual("online")).execute().print();
}
} {code}
 

The result of temporal join is the following:

++-+-+-++-+--+
| op |          id |                      ts |         id0 |                    
      state |                     ts0 |              row_num |
++-+-+-++-+--+
| +I |           0 | 1970-01-01 08:00:00.000 |           0 |                    
     online | 1970-01-01 08:00:00.000 |                    1 |
| +I |           0 | 1970-01-01 08:00:00.005 |           0 |                    
     online | 1970-01-01 08:00:00.000 |                    1 |
| +I |           0 | 1970-01-01 08:00:00.010 |           0 |                    
    offline | 1970-01-01 08:00:00.010 |                    1 |
| +I |           0 | 1970-01-01 08:00:00.015 |           0 |                    
    offline | 1970-01-01 08:00:00.010 |                    1 |
| +I |           0 | 1970-01-01 08:00:00.020 |           0 |                    
     online | 1970-01-01 08:00:00.020 |                    1 |
| +I |           0 | 1970-01-01 08:00:00.025 |           0 |                    
     online | 1970-01-01 08:00:00.020 |                    1 |

[jira] [Created] (FLINK-28964) Release Testing: Verify FLIP-205 Cache in DataStream for Batch Processing

2022-08-15 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-28964:
--

 Summary: Release Testing: Verify FLIP-205 Cache in DataStream for 
Batch Processing
 Key: FLINK-28964
 URL: https://issues.apache.org/jira/browse/FLINK-28964
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream
Reporter: Xuannan Su
 Fix For: 1.16.0


DataStream API provides the `cache` method to cache the result of a DataStream 
and reuse it in later jobs with batch execution mode.

I think we should verify:
 # Follow the doc to write a Flink job that produces cache and a job that 
consumes cache and submit it to a session cluster(standalone or yarn).
 # You can remove the source physically after the cache-producing job is 
finished to verify that the cache-consuming job is not reading from the source. 
For example, delete the file in the filesystem if you are using a file source. 
 # You can restart the TaskManager after the cache-producing job is finished to 
verify that the cache-consuming job will re-compute the result.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28860) CacheITCase.testBatchProduceCacheStreamConsume failed

2022-08-08 Thread Xuannan Su (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17577084#comment-17577084
 ] 

Xuannan Su commented on FLINK-28860:


[~hxbks2ks]I will take a look at it. Could you assign the ticket to me?

> CacheITCase.testBatchProduceCacheStreamConsume failed
> -
>
> Key: FLINK-28860
> URL: https://issues.apache.org/jira/browse/FLINK-28860
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.16.0
>Reporter: Huang Xingbo
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.16.0
>
>
> {code:java}
> 2022-08-08T03:27:22.1988575Z Aug 08 03:27:22 [ERROR] 
> org.apache.flink.test.streaming.runtime.CacheITCase.testBatchProduceCacheStreamConsume(Path)
>   Time elapsed: 0.593 s  <<< ERROR!
> 2022-08-08T03:27:22.1989338Z Aug 08 03:27:22 java.lang.RuntimeException: 
> Producing cache IntermediateResult is not supported in streaming mode
> 2022-08-08T03:27:22.1990401Z Aug 08 03:27:22  at 
> org.apache.flink.streaming.runtime.translators.CacheTransformationTranslator.translateForStreamingInternal(CacheTransformationTranslator.java:75)
> 2022-08-08T03:27:22.1991511Z Aug 08 03:27:22  at 
> org.apache.flink.streaming.runtime.translators.CacheTransformationTranslator.translateForStreamingInternal(CacheTransformationTranslator.java:42)
> 2022-08-08T03:27:22.1993671Z Aug 08 03:27:22  at 
> org.apache.flink.streaming.api.graph.SimpleTransformationTranslator.translateForStreaming(SimpleTransformationTranslator.java:62)
> 2022-08-08T03:27:22.1994900Z Aug 08 03:27:22  at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:830)
> 2022-08-08T03:27:22.1995748Z Aug 08 03:27:22  at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:560)
> 2022-08-08T03:27:22.1996932Z Aug 08 03:27:22  at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.getParentInputIds(StreamGraphGenerator.java:851)
> 2022-08-08T03:27:22.1998562Z Aug 08 03:27:22  at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:809)
> 2022-08-08T03:27:22.1999581Z Aug 08 03:27:22  at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:560)
> 2022-08-08T03:27:22.2000376Z Aug 08 03:27:22  at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:319)
> 2022-08-08T03:27:22.2001359Z Aug 08 03:27:22  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2250)
> 2022-08-08T03:27:22.2002767Z Aug 08 03:27:22  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2241)
> 2022-08-08T03:27:22.2004121Z Aug 08 03:27:22  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2227)
> 2022-08-08T03:27:22.2005059Z Aug 08 03:27:22  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2178)
> 2022-08-08T03:27:22.2005939Z Aug 08 03:27:22  at 
> org.apache.flink.streaming.api.datastream.DataStream.executeAndCollectWithClient(DataStream.java:1469)
> 2022-08-08T03:27:22.2006735Z Aug 08 03:27:22  at 
> org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1334)
> 2022-08-08T03:27:22.2007500Z Aug 08 03:27:22  at 
> org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1320)
> 2022-08-08T03:27:22.2008315Z Aug 08 03:27:22  at 
> org.apache.flink.test.streaming.runtime.CacheITCase.testBatchProduceCacheStreamConsume(CacheITCase.java:190)
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=39518=logs=a57e0635-3fad-5b08-57c7-a4142d7d6fa9=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28857) Add Document for DataStream Cache API

2022-08-07 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-28857:
--

 Summary: Add Document for DataStream Cache API
 Key: FLINK-28857
 URL: https://issues.apache.org/jira/browse/FLINK-28857
 Project: Flink
  Issue Type: Sub-task
Reporter: Xuannan Su






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28742) Table.to_pandas fails with lit("xxx")

2022-07-29 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-28742:
--

 Summary: Table.to_pandas fails with lit("xxx")
 Key: FLINK-28742
 URL: https://issues.apache.org/jira/browse/FLINK-28742
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.15.1
Reporter: Xuannan Su


Table.to_pandas method throws the following exception when the table contains 
lit("anyString").

 
{code:none}
py4j.protocol.Py4JJavaError: An error occurred while calling 
z:org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame.
: java.lang.UnsupportedOperationException: Python vectorized UDF doesn't 
support logical type CHAR(3) NOT NULL currently.
    at 
org.apache.flink.table.runtime.arrow.ArrowUtils$LogicalTypeToArrowTypeConverter.defaultMethod(ArrowUtils.java:743)
    at 
org.apache.flink.table.runtime.arrow.ArrowUtils$LogicalTypeToArrowTypeConverter.defaultMethod(ArrowUtils.java:617)
    at 
org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:62)
    at org.apache.flink.table.types.logical.CharType.accept(CharType.java:148)
    at 
org.apache.flink.table.runtime.arrow.ArrowUtils.toArrowField(ArrowUtils.java:189)
    at 
org.apache.flink.table.runtime.arrow.ArrowUtils.lambda$toArrowSchema$0(ArrowUtils.java:180)
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
    at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
    at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
    at 
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
    at 
org.apache.flink.table.runtime.arrow.ArrowUtils.toArrowSchema(ArrowUtils.java:181)
    at 
org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame(ArrowUtils.java:483)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
    at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
    at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
 {code}
 

The code to reproduce the problem
{code:python}
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)

src_table = t_env.from_data_stream(
env.from_collection([1, 2], type_info=BasicTypeInfo.INT_TYPE_INFO())
)

table = src_table.select(expr.lit("123"))
# table.execute().print()
print(table.to_pandas()){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28528) Table.getSchema fails on table with watermark

2022-07-12 Thread Xuannan Su (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuannan Su updated FLINK-28528:
---
Description: 
The bug can be reproduced with the following test. The test can pass if we use 
the commented way to define the watermark.
{code:python}
def test_flink_2(self):
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
table = t_env.from_descriptor(
TableDescriptor.for_connector("filesystem")
.schema(
Schema.new_builder()
.column("name", DataTypes.STRING())
.column("cost", DataTypes.INT())
.column("distance", DataTypes.INT())
.column("time", DataTypes.TIMESTAMP(3))
.watermark("time", expr.col("time") - expr.lit(60).seconds)
# .watermark("time", "`time` - INTERVAL '60' SECOND")
.build()
)
.format("csv")
.option("path", "./input.csv")
.build()
)

print(table.get_schema())
{code}
It causes the following exception
{code:none}
E   pyflink.util.exceptions.TableException: 
org.apache.flink.table.api.TableException: Expression 'minus(time, 6)' is 
not string serializable. Currently, only expressions that originated from a SQL 
expression have a well-defined string representation.
E   at 
org.apache.flink.table.expressions.ResolvedExpression.asSerializableString(ResolvedExpression.java:51)
E   at 
org.apache.flink.table.api.TableSchema.lambda$fromResolvedSchema$13(TableSchema.java:455)
E   at 
java.util.Collections$SingletonList.forEach(Collections.java:4824)
E   at 
org.apache.flink.table.api.TableSchema.fromResolvedSchema(TableSchema.java:451)
E   at org.apache.flink.table.api.Table.getSchema(Table.java:101)
E   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
E   at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
E   at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
E   at java.lang.reflect.Method.invoke(Method.java:498)
E   at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
E   at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
E   at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
E   at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
E   at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
E   at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
E   at java.lang.Thread.run(Thread.java:748)
{code}

  was:
The bug can be reproduced with the following test. The test can pass if we use 
the commented way to define the watermark.

{code:python}
def test_flink_2(self):
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
table = t_env.from_descriptor(
TableDescriptor.for_connector("filesystem")
.schema(
Schema.new_builder()
.column("name", DataTypes.STRING())
.column("cost", DataTypes.INT())
.column("distance", DataTypes.INT())
.column("time", DataTypes.TIMESTAMP(3))
.watermark("time", expr.col("time") - expr.lit(60).seconds)
# .watermark("time", "`time` - INTERVAL '60' SECOND")
.build()
)
.format("csv")
.option("path", "./input.csv")
.build()
)

print(table.get_schema())
{code}

It causes the following exception

{code:none}
// Some comments here
E   pyflink.util.exceptions.TableException: 
org.apache.flink.table.api.TableException: Expression 'minus(time, 6)' is 
not string serializable. Currently, only expressions that originated from a SQL 
expression have a well-defined string representation.
E   at 
org.apache.flink.table.expressions.ResolvedExpression.asSerializableString(ResolvedExpression.java:51)
E   at 
org.apache.flink.table.api.TableSchema.lambda$fromResolvedSchema$13(TableSchema.java:455)
E   at 
java.util.Collections$SingletonList.forEach(Collections.java:4824)
E   at 
org.apache.flink.table.api.TableSchema.fromResolvedSchema(TableSchema.java:451)
E   at org.apache.flink.table.api.Table.getSchema(Table.java:101)
E   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
E   at 

[jira] [Created] (FLINK-28528) Table.getSchema fails on table with watermark

2022-07-12 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-28528:
--

 Summary: Table.getSchema fails on table with watermark
 Key: FLINK-28528
 URL: https://issues.apache.org/jira/browse/FLINK-28528
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.15.1
Reporter: Xuannan Su


The bug can be reproduced with the following test. The test can pass if we use 
the commented way to define the watermark.

{code:python}
def test_flink_2(self):
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
table = t_env.from_descriptor(
TableDescriptor.for_connector("filesystem")
.schema(
Schema.new_builder()
.column("name", DataTypes.STRING())
.column("cost", DataTypes.INT())
.column("distance", DataTypes.INT())
.column("time", DataTypes.TIMESTAMP(3))
.watermark("time", expr.col("time") - expr.lit(60).seconds)
# .watermark("time", "`time` - INTERVAL '60' SECOND")
.build()
)
.format("csv")
.option("path", "./input.csv")
.build()
)

print(table.get_schema())
{code}

It causes the following exception

{code:none}
// Some comments here
E   pyflink.util.exceptions.TableException: 
org.apache.flink.table.api.TableException: Expression 'minus(time, 6)' is 
not string serializable. Currently, only expressions that originated from a SQL 
expression have a well-defined string representation.
E   at 
org.apache.flink.table.expressions.ResolvedExpression.asSerializableString(ResolvedExpression.java:51)
E   at 
org.apache.flink.table.api.TableSchema.lambda$fromResolvedSchema$13(TableSchema.java:455)
E   at 
java.util.Collections$SingletonList.forEach(Collections.java:4824)
E   at 
org.apache.flink.table.api.TableSchema.fromResolvedSchema(TableSchema.java:451)
E   at org.apache.flink.table.api.Table.getSchema(Table.java:101)
E   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
E   at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
E   at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
E   at java.lang.reflect.Method.invoke(Method.java:498)
E   at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
E   at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
E   at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
E   at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
E   at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
E   at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
E   at java.lang.Thread.run(Thread.java:748)
{code}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28527) Fail to lateral join with UDTF from Table with timstamp column

2022-07-12 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-28527:
--

 Summary: Fail to lateral join with UDTF from Table with timstamp 
column
 Key: FLINK-28527
 URL: https://issues.apache.org/jira/browse/FLINK-28527
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.15.1
Reporter: Xuannan Su


The bug can be reproduced with the following test


{code:python}
def test_flink(self):
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
table = t_env.from_descriptor(
TableDescriptor.for_connector("filesystem")
.schema(
Schema.new_builder()
.column("name", DataTypes.STRING())
.column("cost", DataTypes.INT())
.column("distance", DataTypes.INT())
.column("time", DataTypes.TIMESTAMP(3))
.watermark("time", "`time` - INTERVAL '60' SECOND")
.build()
)
.format("csv")
.option("path", "./input.csv")
.build()
)

@udtf(result_types=DataTypes.INT())
def table_func(row: Row):
return row.cost + row.distance

table = table.join_lateral(table_func.alias("cost_times_distance"))

table.execute().print()
{code}

It causes the following exception


{code:none}
E   pyflink.util.exceptions.TableException: 
org.apache.flink.table.api.TableException: Unsupported Python SqlFunction CAST.
E   at 
org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil.createPythonFunctionInfo(CommonPythonUtil.java:146)
E   at 
org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil.createPythonFunctionInfo(CommonPythonUtil.java:429)
E   at 
org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil.createPythonFunctionInfo(CommonPythonUtil.java:135)
E   at 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCorrelate.extractPythonTableFunctionInfo(CommonExecPythonCorrelate.java:133)
E   at 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCorrelate.createPythonOneInputTransformation(CommonExecPythonCorrelate.java:106)
E   at 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCorrelate.translateToPlanInternal(CommonExecPythonCorrelate.java:95)
E   at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:148)
E   at 
org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:249)
E   at 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:136)
E   at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:148)
E   at 
org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:79)
E   at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
E   at scala.collection.Iterator.foreach(Iterator.scala:937)
E   at scala.collection.Iterator.foreach$(Iterator.scala:937)
E   at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
E   at scala.collection.IterableLike.foreach(IterableLike.scala:70)
E   at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
E   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
E   at 
scala.collection.TraversableLike.map(TraversableLike.scala:233)
E   at 
scala.collection.TraversableLike.map$(TraversableLike.scala:226)
E   at 
scala.collection.AbstractTraversable.map(Traversable.scala:104)
E   at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:78)
E   at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:181)
E   at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1656)
E   at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:828)
E   at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1317)
E   at 
org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:605)
E   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
E   at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
E   at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
E   at 

[jira] [Created] (FLINK-28526) Fail to lateral join with UDTF from Table with timstamp column

2022-07-12 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-28526:
--

 Summary: Fail to lateral join with UDTF from Table with timstamp 
column
 Key: FLINK-28526
 URL: https://issues.apache.org/jira/browse/FLINK-28526
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.15.1
Reporter: Xuannan Su


The bug can be reproduced with the following test


{code:python}
def test_flink(self):
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
table = t_env.from_descriptor(
TableDescriptor.for_connector("filesystem")
.schema(
Schema.new_builder()
.column("name", DataTypes.STRING())
.column("cost", DataTypes.INT())
.column("distance", DataTypes.INT())
.column("time", DataTypes.TIMESTAMP(3))
.watermark("time", "`time` - INTERVAL '60' SECOND")
.build()
)
.format("csv")
.option("path", "./input.csv")
.build()
)

@udtf(result_types=DataTypes.INT())
def table_func(row: Row):
return row.cost + row.distance

table = table.join_lateral(table_func.alias("cost_times_distance"))

table.execute().print()
{code}

It causes the following exception


{code:none}
E   pyflink.util.exceptions.TableException: 
org.apache.flink.table.api.TableException: Unsupported Python SqlFunction CAST.
E   at 
org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil.createPythonFunctionInfo(CommonPythonUtil.java:146)
E   at 
org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil.createPythonFunctionInfo(CommonPythonUtil.java:429)
E   at 
org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil.createPythonFunctionInfo(CommonPythonUtil.java:135)
E   at 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCorrelate.extractPythonTableFunctionInfo(CommonExecPythonCorrelate.java:133)
E   at 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCorrelate.createPythonOneInputTransformation(CommonExecPythonCorrelate.java:106)
E   at 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCorrelate.translateToPlanInternal(CommonExecPythonCorrelate.java:95)
E   at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:148)
E   at 
org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:249)
E   at 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:136)
E   at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:148)
E   at 
org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:79)
E   at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
E   at scala.collection.Iterator.foreach(Iterator.scala:937)
E   at scala.collection.Iterator.foreach$(Iterator.scala:937)
E   at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
E   at scala.collection.IterableLike.foreach(IterableLike.scala:70)
E   at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
E   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
E   at 
scala.collection.TraversableLike.map(TraversableLike.scala:233)
E   at 
scala.collection.TraversableLike.map$(TraversableLike.scala:226)
E   at 
scala.collection.AbstractTraversable.map(Traversable.scala:104)
E   at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:78)
E   at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:181)
E   at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1656)
E   at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:828)
E   at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1317)
E   at 
org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:605)
E   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
E   at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
E   at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
E   at 

[jira] [Created] (FLINK-28461) PyFlink Table should add get_resolved_schema method

2022-07-08 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-28461:
--

 Summary: PyFlink Table should add get_resolved_schema method
 Key: FLINK-28461
 URL: https://issues.apache.org/jira/browse/FLINK-28461
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Affects Versions: 1.15.1
Reporter: Xuannan Su


The Table#getSchema method is deprecated and replaced with the 
Table#getResolvedSchema. 

We should add the get_resolved_schema method to the PyFlink Table.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28443) Provide official Flink image for PyFlink

2022-07-07 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-28443:
--

 Summary: Provide official Flink image for PyFlink
 Key: FLINK-28443
 URL: https://issues.apache.org/jira/browse/FLINK-28443
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Affects Versions: 1.15.0
Reporter: Xuannan Su


Users must build a custom Flink image to include python and pyFlink to run a 
pyFlink job with Native Kubernetes application mode.

I think we can improve the user experience by providing an official image that 
pre-installs python and pyFlink.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28294) Some metric reporters don't allow register same metric with different names

2022-06-29 Thread Xuannan Su (Jira)
Title: Message Title


 
 
 
 

 
 
 

 
   
 Xuannan Su created an issue  
 

  
 
 
 
 

 
 
  
 
 
 
 

 
 Flink /  FLINK-28294  
 
 
  Some metric reporters don't allow register same metric with different names   
 

  
 
 
 
 

 
Issue Type: 
  Bug  
 
 
Affects Versions: 
 1.15.0  
 
 
Assignee: 
 Unassigned  
 
 
Components: 
 Runtime / Metrics  
 
 
Created: 
 29/Jun/22 06:50  
 
 
Priority: 
  Major  
 
 
Reporter: 
 Xuannan Su  
 

  
 
 
 
 

 
 Currently, some metric reporters keep an internal Map keyed by the metric object to keep track of the registered metric. The problem with the map with metric object as the key is that when the same metric object is registered with different names or metric groups, only the last registered metric will be reported.  If I understand correctly, we do not forbid registering the same metric with different names or metric groups. For example, in `SchedulerBase#registerJobMetrics`, we register `numberOfRestarts` with two names, "numRestarts" and "fullRestarts". Unfortunately, in this case, the metric reporter will only report the fullRestarts metric, which is deprecated. I found that the following metric reporters have the problem, Influxdb metric reporter, Datadog metric reporter, and Dropwizard metric reporter.  One possible fix is to swap the key and value of the internal map in the metric reporter.   
 

  
 
 
 
 

 
 
 

 

[jira] [Created] (FLINK-28126) Iteration gets stuck when replayable datastream and its downstream operator have different parallelism

2022-06-19 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-28126:
--

 Summary: Iteration gets stuck when replayable datastream and its 
downstream operator have different parallelism
 Key: FLINK-28126
 URL: https://issues.apache.org/jira/browse/FLINK-28126
 Project: Flink
  Issue Type: Bug
  Components: Library / Machine Learning
Affects Versions: ml-2.0.0
Reporter: Xuannan Su


Iteration gets stuck when replayable datastream and its downstream operator 
have different parallelism. It can be reproduced with the following code 
snippet.


{code:java}
@Test
public void testIteration() throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

final SingleOutputStreamOperator variable = 
env.fromElements(0).name("i");
final SingleOutputStreamOperator data = env.fromElements(1, 
2).name("inc")
.map(x -> x).setParallelism(1); // test can pass if parallelism 
is 2.

final IterationConfig config = IterationConfig.newBuilder().build();

Iterations.iterateBoundedStreamsUntilTermination(
DataStreamList.of(variable),
ReplayableDataStreamList.replay(data),
config,
(IterationBody) (variableStreams, dataStreams) -> {
final DataStream sample = dataStreams.get(0);
final SingleOutputStreamOperator trainOutput =
sample
.transform(
"iter",
TypeInformation.of(Integer.class),
new IterTransform())
.setParallelism(2)
.map((MapFunction) 
integer -> integer)
.setParallelism(1);

return new IterationBodyResult(
DataStreamList.of(trainOutput), 
DataStreamList.of(trainOutput));
});

env.execute();
}

public static class IterTransform extends AbstractStreamOperator
implements OneInputStreamOperator, 
IterationListener {

@Override
public void processElement(StreamRecord element) throws 
Exception {
LOG.info("Processing element: {}", element);
}

@Override
public void onEpochWatermarkIncremented(
int epochWatermark, Context context, Collector 
collector)
throws Exception {
LOG.info("onEpochWatermarkIncremented: {}", epochWatermark);
if (epochWatermark >= 10) {
return;
}
collector.collect(0);
}

@Override
public void onIterationTerminated(Context context, Collector 
collector)
throws Exception {
LOG.info("onIterationTerminated");
}
}
{code}

After digging into the code, I found that the `ReplayOperator` doesn't emit the 
epoch watermark with a broadcast output. [~gaoyunhaii], could you look to see 
if this is the case?





--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27524) Introduce cache API to DataStream

2022-05-06 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-27524:
--

 Summary: Introduce cache API to DataStream
 Key: FLINK-27524
 URL: https://issues.apache.org/jira/browse/FLINK-27524
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream
Affects Versions: 1.16.0
Reporter: Xuannan Su






--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27523) Runtime supports producing and consuming cached intermediate result

2022-05-06 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-27523:
--

 Summary: Runtime supports producing and consuming cached 
intermediate result
 Key: FLINK-27523
 URL: https://issues.apache.org/jira/browse/FLINK-27523
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream
Affects Versions: 1.16.0
Reporter: Xuannan Su






--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27521) FLIP-205: Support Cache in DataStream for Batch Processing

2022-05-06 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-27521:
--

 Summary: FLIP-205: Support Cache in DataStream for Batch Processing
 Key: FLINK-27521
 URL: https://issues.apache.org/jira/browse/FLINK-27521
 Project: Flink
  Issue Type: New Feature
  Components: API / DataStream
Affects Versions: 1.16.0
Reporter: Xuannan Su


As the DataStream API now supports batch execution mode, we see users using the 
DataStream API to run batch jobs. Interactive programming is an important use 
case of Flink batch processing. And the ability to cache intermediate results 
of a DataStream is crucial to the interactive programming experience.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-26171) pyFlink -py options has issue with path that starts with //

2022-02-15 Thread Xuannan Su (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17493008#comment-17493008
 ] 

Xuannan Su commented on FLINK-26171:


After some research, I found that the leading two slashes are not necessarily 
equal to a single slash, according to 
[here|https://unix.stackexchange.com/a/1919].


> pyFlink -py options has issue with path that starts with //
> ---
>
> Key: FLINK-26171
> URL: https://issues.apache.org/jira/browse/FLINK-26171
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.14.3
>Reporter: Xuannan Su
>Assignee: Huang Xingbo
>Priority: Major
>
> It seems that when submitting a pyFlink job with the `flink run` command, the 
> -py option has issues with a path that start with double slash "//". 
> You can reproduce the problem with the wordcount python example.
> {code:bash}
> # Job successfully submitted
> ./bin/flink run -py "${PWD}"/examples/python/datastream/word_count.py
> Job has been submitted with JobID 54ab79a32b4e441ee80bda92be7cb547
> # Fail to submit the job if the path start with double slash
> ./bin/flink run -py /"${PWD}"/examples/python/datastream/word_count.py
> org.apache.flink.client.program.ProgramAbortException: 
> java.nio.file.NoSuchFileException: 
> /var/folders/j4/td7bvghd1tg4tb7ty_7lk6z0gp/T/pyflink/1bba3878-9ed4-4d4e-a5ed-1208c1ee9425/c9ece3de-8ff2-40dc-8436-e3377e540baf/word_count.py
>   at 
> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:140)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>   at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>   at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
>   at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
>   at 
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
>   at 
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
>   at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
> Caused by: java.nio.file.NoSuchFileException: 
> /var/folders/j4/td7bvghd1tg4tb7ty_7lk6z0gp/T/pyflink/1bba3878-9ed4-4d4e-a5ed-1208c1ee9425/c9ece3de-8ff2-40dc-8436-e3377e540baf/word_count.py
>   at 
> sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
>   at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
>   at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
>   at sun.nio.fs.UnixPath.toRealPath(UnixPath.java:837)
>   at 
> org.apache.flink.client.python.PythonEnvUtils.addToPythonPath(PythonEnvUtils.java:278)
>   at 
> org.apache.flink.client.python.PythonEnvUtils.preparePythonEnvironment(PythonEnvUtils.java:195)
>   at 
> org.apache.flink.client.python.PythonEnvUtils.launchPy4jPythonClient(PythonEnvUtils.java:456)
>   at 
> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:92)
>   ... 13 more
> # Job successfully submitted if path starts with three slash
> ./bin/flink run -py //"${PWD}"/examples/python/datastream/word_count.py
> Job has been submitted with JobID 270d07407b297b566c6903c451a834cb
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26171) pyFlink -py options has issue with path that starts with //

2022-02-15 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-26171:
--

 Summary: pyFlink -py options has issue with path that starts with 
//
 Key: FLINK-26171
 URL: https://issues.apache.org/jira/browse/FLINK-26171
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.14.3
Reporter: Xuannan Su


It seems that when submitting a pyFlink job with the `flink run` command, the 
-py option has issues with a path that start with double slash "//". 

You can reproduce the problem with the wordcount python example.


{code:bash}
# Job successfully submitted
./bin/flink run -py "${PWD}"/examples/python/datastream/word_count.py
Job has been submitted with JobID 54ab79a32b4e441ee80bda92be7cb547

# Fail to submit the job if the path start with double slash
./bin/flink run -py /"${PWD}"/examples/python/datastream/word_count.py
org.apache.flink.client.program.ProgramAbortException: 
java.nio.file.NoSuchFileException: 
/var/folders/j4/td7bvghd1tg4tb7ty_7lk6z0gp/T/pyflink/1bba3878-9ed4-4d4e-a5ed-1208c1ee9425/c9ece3de-8ff2-40dc-8436-e3377e540baf/word_count.py
at 
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:140)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: java.nio.file.NoSuchFileException: 
/var/folders/j4/td7bvghd1tg4tb7ty_7lk6z0gp/T/pyflink/1bba3878-9ed4-4d4e-a5ed-1208c1ee9425/c9ece3de-8ff2-40dc-8436-e3377e540baf/word_count.py
at 
sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at sun.nio.fs.UnixPath.toRealPath(UnixPath.java:837)
at 
org.apache.flink.client.python.PythonEnvUtils.addToPythonPath(PythonEnvUtils.java:278)
at 
org.apache.flink.client.python.PythonEnvUtils.preparePythonEnvironment(PythonEnvUtils.java:195)
at 
org.apache.flink.client.python.PythonEnvUtils.launchPy4jPythonClient(PythonEnvUtils.java:456)
at 
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:92)
... 13 more

# Job successfully submitted if path starts with three slash
./bin/flink run -py //"${PWD}"/examples/python/datastream/word_count.py
Job has been submitted with JobID 270d07407b297b566c6903c451a834cb
{code}




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25685) RestClusterClient gets stuck on submitting job with local user artifact

2022-01-20 Thread Xuannan Su (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17479869#comment-17479869
 ] 

Xuannan Su commented on FLINK-25685:


[~wangyang0918] You are right. (y) The fix is to use artifactFilePath.getPath() 

> RestClusterClient gets stuck on submitting job with local user artifact
> ---
>
> Key: FLINK-25685
> URL: https://issues.apache.org/jira/browse/FLINK-25685
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / REST
>Affects Versions: 1.15.0, 1.14.3
>Reporter: Xuannan Su
>Priority: Major
>  Labels: pull-request-available
>
> I found that a job submission gets stuck if 
> StreamExecutionEnvironment#registerCachedFile is called with a local file. 
> After some digging, I found that it gets stuck when the RestClusterClient 
> sends the job-submission request to the JobManager. 
> Below is the unit test added to the `RestClusterClientTest` to reproduce the 
> problem on my local machine.
> {code:java}
> @Test
> public void testJobSubmissionWithUserArtifact() throws Exception {
> try (final TestRestServerEndpoint restServerEndpoint =
>  createRestServerEndpoint(new TestJobSubmitHandler())) {
> try (RestClusterClient restClusterClient =
>  
> createRestClusterClient(restServerEndpoint.getServerAddress().getPort())) {
> TemporaryFolder temporaryFolder = new TemporaryFolder();
> temporaryFolder.create();
> File file = temporaryFolder.newFile();
> Files.write(file.toPath(), "hello 
> world".getBytes(ConfigConstants.DEFAULT_CHARSET));
> jobGraph.addUserArtifact("file",
> new 
> DistributedCache.DistributedCacheEntry(file.toURI().toString(),
> false));
> restClusterClient
> .submitJob(jobGraph)
> .get();
> }
> }
> }
> {code}
> The test can pass if the `jobGraph.addUserArtifact` is not called.  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25685) RestClusterClient gets stuck on submitting job with local user artifact

2022-01-20 Thread Xuannan Su (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuannan Su updated FLINK-25685:
---
Affects Version/s: 1.15.0

> RestClusterClient gets stuck on submitting job with local user artifact
> ---
>
> Key: FLINK-25685
> URL: https://issues.apache.org/jira/browse/FLINK-25685
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / REST
>Affects Versions: 1.15.0, 1.14.3
>Reporter: Xuannan Su
>Priority: Major
>  Labels: pull-request-available
>
> I found that a job submission gets stuck if 
> StreamExecutionEnvironment#registerCachedFile is called with a local file. 
> After some digging, I found that it gets stuck when the RestClusterClient 
> sends the job-submission request to the JobManager. 
> Below is the unit test added to the `RestClusterClientTest` to reproduce the 
> problem on my local machine.
> {code:java}
> @Test
> public void testJobSubmissionWithUserArtifact() throws Exception {
> try (final TestRestServerEndpoint restServerEndpoint =
>  createRestServerEndpoint(new TestJobSubmitHandler())) {
> try (RestClusterClient restClusterClient =
>  
> createRestClusterClient(restServerEndpoint.getServerAddress().getPort())) {
> TemporaryFolder temporaryFolder = new TemporaryFolder();
> temporaryFolder.create();
> File file = temporaryFolder.newFile();
> Files.write(file.toPath(), "hello 
> world".getBytes(ConfigConstants.DEFAULT_CHARSET));
> jobGraph.addUserArtifact("file",
> new 
> DistributedCache.DistributedCacheEntry(file.toURI().toString(),
> false));
> restClusterClient
> .submitJob(jobGraph)
> .get();
> }
> }
> }
> {code}
> The test can pass if the `jobGraph.addUserArtifact` is not called.  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25685) RestClusterClient gets stuck on submitting job with local user artifact

2022-01-20 Thread Xuannan Su (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17479802#comment-17479802
 ] 

Xuannan Su commented on FLINK-25685:


[~chesnay], I created a PR with unit test and the fix. May you assign the 
ticket to me and help review the PR?

> RestClusterClient gets stuck on submitting job with local user artifact
> ---
>
> Key: FLINK-25685
> URL: https://issues.apache.org/jira/browse/FLINK-25685
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / REST
>Affects Versions: 1.14.3
>Reporter: Xuannan Su
>Priority: Major
>  Labels: pull-request-available
>
> I found that a job submission gets stuck if 
> StreamExecutionEnvironment#registerCachedFile is called with a local file. 
> After some digging, I found that it gets stuck when the RestClusterClient 
> sends the job-submission request to the JobManager. 
> Below is the unit test added to the `RestClusterClientTest` to reproduce the 
> problem on my local machine.
> {code:java}
> @Test
> public void testJobSubmissionWithUserArtifact() throws Exception {
> try (final TestRestServerEndpoint restServerEndpoint =
>  createRestServerEndpoint(new TestJobSubmitHandler())) {
> try (RestClusterClient restClusterClient =
>  
> createRestClusterClient(restServerEndpoint.getServerAddress().getPort())) {
> TemporaryFolder temporaryFolder = new TemporaryFolder();
> temporaryFolder.create();
> File file = temporaryFolder.newFile();
> Files.write(file.toPath(), "hello 
> world".getBytes(ConfigConstants.DEFAULT_CHARSET));
> jobGraph.addUserArtifact("file",
> new 
> DistributedCache.DistributedCacheEntry(file.toURI().toString(),
> false));
> restClusterClient
> .submitJob(jobGraph)
> .get();
> }
> }
> }
> {code}
> The test can pass if the `jobGraph.addUserArtifact` is not called.  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25685) RestClusterClient gets stuck on submitting job with local user artifact

2022-01-18 Thread Xuannan Su (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17478023#comment-17478023
 ] 

Xuannan Su commented on FLINK-25685:


I create a pr that adds the unit tests with and without 
`jobGraph.addUserArtifact`. The test with `jobGraph.addUserArtifact` fail while 
the other pass.

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=29653=results

[~chesnay] Could you have a look at the CI run?

> RestClusterClient gets stuck on submitting job with local user artifact
> ---
>
> Key: FLINK-25685
> URL: https://issues.apache.org/jira/browse/FLINK-25685
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / REST
>Affects Versions: 1.14.3
>Reporter: Xuannan Su
>Priority: Major
>  Labels: pull-request-available
>
> I found that a job submission gets stuck if 
> StreamExecutionEnvironment#registerCachedFile is called with a local file. 
> After some digging, I found that it gets stuck when the RestClusterClient 
> sends the job-submission request to the JobManager. 
> Below is the unit test added to the `RestClusterClientTest` to reproduce the 
> problem on my local machine.
> {code:java}
> @Test
> public void testJobSubmissionWithUserArtifact() throws Exception {
> try (final TestRestServerEndpoint restServerEndpoint =
>  createRestServerEndpoint(new TestJobSubmitHandler())) {
> try (RestClusterClient restClusterClient =
>  
> createRestClusterClient(restServerEndpoint.getServerAddress().getPort())) {
> TemporaryFolder temporaryFolder = new TemporaryFolder();
> temporaryFolder.create();
> File file = temporaryFolder.newFile();
> Files.write(file.toPath(), "hello 
> world".getBytes(ConfigConstants.DEFAULT_CHARSET));
> jobGraph.addUserArtifact("file",
> new 
> DistributedCache.DistributedCacheEntry(file.toURI().toString(),
> false));
> restClusterClient
> .submitJob(jobGraph)
> .get();
> }
> }
> }
> {code}
> The test can pass if the `jobGraph.addUserArtifact` is not called.  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25685) RestClusterClient gets stuck on submitting job with local user artifact

2022-01-17 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-25685:
--

 Summary: RestClusterClient gets stuck on submitting job with local 
user artifact
 Key: FLINK-25685
 URL: https://issues.apache.org/jira/browse/FLINK-25685
 Project: Flink
  Issue Type: Bug
  Components: Runtime / REST
Affects Versions: 1.14.3
Reporter: Xuannan Su


I found that a job submission gets stuck if 
StreamExecutionEnvironment#registerCachedFile is called with a local file. 
After some digging, I found that it gets stuck when the RestClusterClient sends 
the job-submission request to the JobManager. 

Below is the unit test added to the `RestClusterClientTest` to reproduce the 
problem on my local machine.


{code:java}
@Test
public void testJobSubmissionWithUserArtifact() throws Exception {
try (final TestRestServerEndpoint restServerEndpoint =
 createRestServerEndpoint(new TestJobSubmitHandler())) {
try (RestClusterClient restClusterClient =
 
createRestClusterClient(restServerEndpoint.getServerAddress().getPort())) {

TemporaryFolder temporaryFolder = new TemporaryFolder();
temporaryFolder.create();
File file = temporaryFolder.newFile();
Files.write(file.toPath(), "hello 
world".getBytes(ConfigConstants.DEFAULT_CHARSET));
jobGraph.addUserArtifact("file",
new 
DistributedCache.DistributedCacheEntry(file.toURI().toString(),
false));

restClusterClient
.submitJob(jobGraph)
.get();
}
}
}
{code}

The test can pass if the `jobGraph.addUserArtifact` is not called.  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-19343) FLIP-36: Support Interactive Programming in Flink

2020-12-01 Thread Xuannan Su (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241382#comment-17241382
 ] 

Xuannan Su commented on FLINK-19343:


Hi, Jin Xing. Thanks for your comments.
1. I agree with you that it is indeed weird for the "Interactive" relies on the 
clusterPartitionTracker for managing the lifecycle of the cluster partition, 
which is also a form of shuffle data. And to support cache across jobs, we need 
to have a component whose lifecycle outlive the job to manage the shuffle data. 
2. TBH, also, I wouldn't say I like the idea to pass the ShuffleDescriptor back 
to the client-side. But at that time being, as you say, we do not have a 
separate component to manage the lifecycle of shuffle data across jobs in the 
runtime. Therefore, the decision is made to support share shuffle data across 
jobs. To support caching in DataStream, it is a more clear design to have a 
component at runtime scope to manage the shuffle data across jobs. 
3. I don't think the remote shuffle data should be managed by the 
PartitionTracker as well. Instead, I think the ClusterPartition is just a kind 
of shuffle data and therefore should be managed by the ShuffleService.
I am pulling in [~chesnay]. He may have more insight from the ClusterPartition 
perspective. 

> FLIP-36: Support Interactive Programming in Flink
> -
>
> Key: FLINK-19343
> URL: https://issues.apache.org/jira/browse/FLINK-19343
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: Xuannan Su
>Priority: Major
>
> Please refer to the FLIP for any details: 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19253) SourceReaderTestBase.testAddSplitToExistingFetcher hangs

2020-11-12 Thread Xuannan Su (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17230756#comment-17230756
 ] 

Xuannan Su commented on FLINK-19253:


Thanks for letting me know. I will stay tuned until we can backport. 

> SourceReaderTestBase.testAddSplitToExistingFetcher hangs
> 
>
> Key: FLINK-19253
> URL: https://issues.apache.org/jira/browse/FLINK-19253
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.12.0
>Reporter: Dian Fu
>Assignee: Xuannan Su
>Priority: Major
>  Labels: pull-request-available, test-stability
> Fix For: 1.12.0, 1.11.3
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=6521=logs=fc5181b0-e452-5c8f-68de-1097947f6483=62110053-334f-5295-a0ab-80dd7e2babbf
> {code}
> 2020-09-15T10:51:35.5236837Z "SourceFetcher" #39 prio=5 os_prio=0 
> tid=0x7f70d0a57000 nid=0x858 in Object.wait() [0x7f6fd81f]
> 2020-09-15T10:51:35.5237447Zjava.lang.Thread.State: WAITING (on object 
> monitor)
> 2020-09-15T10:51:35.5237962Z  at java.lang.Object.wait(Native Method)
> 2020-09-15T10:51:35.5238886Z  - waiting on <0xc27f5be8> (a 
> java.util.ArrayDeque)
> 2020-09-15T10:51:35.5239380Z  at java.lang.Object.wait(Object.java:502)
> 2020-09-15T10:51:35.5240401Z  at 
> org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader.fetch(TestingSplitReader.java:52)
> 2020-09-15T10:51:35.5241471Z  - locked <0xc27f5be8> (a 
> java.util.ArrayDeque)
> 2020-09-15T10:51:35.5242180Z  at 
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
> 2020-09-15T10:51:35.5243245Z  at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:128)
> 2020-09-15T10:51:35.5244263Z  at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:95)
> 2020-09-15T10:51:35.5245128Z  at 
> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42)
> 2020-09-15T10:51:35.5245973Z  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 2020-09-15T10:51:35.5247081Z  at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 2020-09-15T10:51:35.5247816Z  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 2020-09-15T10:51:35.5248809Z  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 2020-09-15T10:51:35.5249463Z  at java.lang.Thread.run(Thread.java:748)
> 2020-09-15T10:51:35.5249827Z 
> 2020-09-15T10:51:35.5250383Z "SourceFetcher" #37 prio=5 os_prio=0 
> tid=0x7f70d0a4b000 nid=0x856 in Object.wait() [0x7f6f80cfa000]
> 2020-09-15T10:51:35.5251124Zjava.lang.Thread.State: WAITING (on object 
> monitor)
> 2020-09-15T10:51:35.5251636Z  at java.lang.Object.wait(Native Method)
> 2020-09-15T10:51:35.5252767Z  - waiting on <0xc298d0b8> (a 
> java.util.ArrayDeque)
> 2020-09-15T10:51:35.5253336Z  at java.lang.Object.wait(Object.java:502)
> 2020-09-15T10:51:35.5254184Z  at 
> org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader.fetch(TestingSplitReader.java:52)
> 2020-09-15T10:51:35.5255220Z  - locked <0xc298d0b8> (a 
> java.util.ArrayDeque)
> 2020-09-15T10:51:35.5255678Z  at 
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
> 2020-09-15T10:51:35.5256235Z  at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:128)
> 2020-09-15T10:51:35.5256803Z  at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:95)
> 2020-09-15T10:51:35.5257351Z  at 
> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42)
> 2020-09-15T10:51:35.5257838Z  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 2020-09-15T10:51:35.5258284Z  at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 2020-09-15T10:51:35.5258856Z  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 2020-09-15T10:51:35.5259350Z  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 2020-09-15T10:51:35.5260011Z  at java.lang.Thread.run(Thread.java:748)
> 2020-09-15T10:51:35.5260211Z 
> 2020-09-15T10:51:35.5260574Z "process reaper" #24 daemon prio=10 os_prio=0 
> tid=0x7f6f70042000 nid=0x844 waiting on condition [0x7f6fd832a000]
> 2020-09-15T10:51:35.5261036Zjava.lang.Thread.State: TIMED_WAITING 
> (parking)
> 2020-09-15T10:51:35.5261342Z  at sun.misc.Unsafe.park(Native Method)
> 2020-09-15T10:51:35.5261972Z  - parking to wait for  <0x815d0810> (a 
> 

[jira] [Commented] (FLINK-19253) SourceReaderTestBase.testAddSplitToExistingFetcher hangs

2020-11-11 Thread Xuannan Su (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17230377#comment-17230377
 ] 

Xuannan Su commented on FLINK-19253:


[~becket_qin] I think https://issues.apache.org/jira/browse/FLINK-19448 has 
fixed the problem. Can you take a look?

> SourceReaderTestBase.testAddSplitToExistingFetcher hangs
> 
>
> Key: FLINK-19253
> URL: https://issues.apache.org/jira/browse/FLINK-19253
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.12.0
>Reporter: Dian Fu
>Assignee: Xuannan Su
>Priority: Major
>  Labels: pull-request-available, test-stability
> Fix For: 1.12.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=6521=logs=fc5181b0-e452-5c8f-68de-1097947f6483=62110053-334f-5295-a0ab-80dd7e2babbf
> {code}
> 2020-09-15T10:51:35.5236837Z "SourceFetcher" #39 prio=5 os_prio=0 
> tid=0x7f70d0a57000 nid=0x858 in Object.wait() [0x7f6fd81f]
> 2020-09-15T10:51:35.5237447Zjava.lang.Thread.State: WAITING (on object 
> monitor)
> 2020-09-15T10:51:35.5237962Z  at java.lang.Object.wait(Native Method)
> 2020-09-15T10:51:35.5238886Z  - waiting on <0xc27f5be8> (a 
> java.util.ArrayDeque)
> 2020-09-15T10:51:35.5239380Z  at java.lang.Object.wait(Object.java:502)
> 2020-09-15T10:51:35.5240401Z  at 
> org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader.fetch(TestingSplitReader.java:52)
> 2020-09-15T10:51:35.5241471Z  - locked <0xc27f5be8> (a 
> java.util.ArrayDeque)
> 2020-09-15T10:51:35.5242180Z  at 
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
> 2020-09-15T10:51:35.5243245Z  at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:128)
> 2020-09-15T10:51:35.5244263Z  at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:95)
> 2020-09-15T10:51:35.5245128Z  at 
> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42)
> 2020-09-15T10:51:35.5245973Z  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 2020-09-15T10:51:35.5247081Z  at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 2020-09-15T10:51:35.5247816Z  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 2020-09-15T10:51:35.5248809Z  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 2020-09-15T10:51:35.5249463Z  at java.lang.Thread.run(Thread.java:748)
> 2020-09-15T10:51:35.5249827Z 
> 2020-09-15T10:51:35.5250383Z "SourceFetcher" #37 prio=5 os_prio=0 
> tid=0x7f70d0a4b000 nid=0x856 in Object.wait() [0x7f6f80cfa000]
> 2020-09-15T10:51:35.5251124Zjava.lang.Thread.State: WAITING (on object 
> monitor)
> 2020-09-15T10:51:35.5251636Z  at java.lang.Object.wait(Native Method)
> 2020-09-15T10:51:35.5252767Z  - waiting on <0xc298d0b8> (a 
> java.util.ArrayDeque)
> 2020-09-15T10:51:35.5253336Z  at java.lang.Object.wait(Object.java:502)
> 2020-09-15T10:51:35.5254184Z  at 
> org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader.fetch(TestingSplitReader.java:52)
> 2020-09-15T10:51:35.5255220Z  - locked <0xc298d0b8> (a 
> java.util.ArrayDeque)
> 2020-09-15T10:51:35.5255678Z  at 
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
> 2020-09-15T10:51:35.5256235Z  at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:128)
> 2020-09-15T10:51:35.5256803Z  at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:95)
> 2020-09-15T10:51:35.5257351Z  at 
> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42)
> 2020-09-15T10:51:35.5257838Z  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 2020-09-15T10:51:35.5258284Z  at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 2020-09-15T10:51:35.5258856Z  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 2020-09-15T10:51:35.5259350Z  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 2020-09-15T10:51:35.5260011Z  at java.lang.Thread.run(Thread.java:748)
> 2020-09-15T10:51:35.5260211Z 
> 2020-09-15T10:51:35.5260574Z "process reaper" #24 daemon prio=10 os_prio=0 
> tid=0x7f6f70042000 nid=0x844 waiting on condition [0x7f6fd832a000]
> 2020-09-15T10:51:35.5261036Zjava.lang.Thread.State: TIMED_WAITING 
> (parking)
> 2020-09-15T10:51:35.5261342Z  at sun.misc.Unsafe.park(Native Method)
> 2020-09-15T10:51:35.5261972Z  - parking to wait for  <0x815d0810> (a 
> 

[jira] [Commented] (FLINK-19253) SourceReaderTestBase.testAddSplitToExistingFetcher hangs

2020-11-11 Thread Xuannan Su (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17229980#comment-17229980
 ] 

Xuannan Su commented on FLINK-19253:


[~becket_qin] You are right. This indeed can cause IllegalStateException to be 
thrown. After closely looking at the code, I think it made more sense to return 
MORE_AVAILABLE instead of throwing the IllegalStateException at that point. The 
rationale is shown in the comment of the code below. And we might want to 
change the name of the method from `finishedOrAvailableLater` to something like 
`getInputStatus`. What do you think?

{code:java}
private InputStatus finishedOrAvailableLater() {
// assumption: elementQueue is empty (might not be valid 
already)
final boolean allFetchersHaveShutdown = 
splitFetcherManager.maybeShutdownFinishedFetchers();
if (!(noMoreSplitsAssignment && allFetchersHaveShutdown)) {
return InputStatus.NOTHING_AVAILABLE;
}
// at this point we are guarantee that no more split will be 
assigned
// and all the SplitFetchers are idle and has been shutdown
if (elementsQueue.isEmpty()) {
splitFetcherManager.checkErrors();
return InputStatus.END_OF_INPUT;
} else {
// if we see a non-empty elementsQueue here, it means 
that our assumption that the
// elementsQueue is empty is violated. This indicates 
that after our last check on
// the elementsQueue, all SplitFetchers fetch all the 
elements, put them in the
// element queue and closed, which means there are 
element available before
// we can return END_OF_INPUT. Therefore, we return 
MORE_AVAILABLE 
// instead of throwing exception.
return InputStatus.MORE_AVAILABLE;
//  throw new IllegalStateException("Called 
'finishedOrAvailableLater()' with shut-down fetchers but non-empty queue");
}
}
{code}



> SourceReaderTestBase.testAddSplitToExistingFetcher hangs
> 
>
> Key: FLINK-19253
> URL: https://issues.apache.org/jira/browse/FLINK-19253
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.12.0
>Reporter: Dian Fu
>Assignee: Xuannan Su
>Priority: Major
>  Labels: pull-request-available, test-stability
> Fix For: 1.12.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=6521=logs=fc5181b0-e452-5c8f-68de-1097947f6483=62110053-334f-5295-a0ab-80dd7e2babbf
> {code}
> 2020-09-15T10:51:35.5236837Z "SourceFetcher" #39 prio=5 os_prio=0 
> tid=0x7f70d0a57000 nid=0x858 in Object.wait() [0x7f6fd81f]
> 2020-09-15T10:51:35.5237447Zjava.lang.Thread.State: WAITING (on object 
> monitor)
> 2020-09-15T10:51:35.5237962Z  at java.lang.Object.wait(Native Method)
> 2020-09-15T10:51:35.5238886Z  - waiting on <0xc27f5be8> (a 
> java.util.ArrayDeque)
> 2020-09-15T10:51:35.5239380Z  at java.lang.Object.wait(Object.java:502)
> 2020-09-15T10:51:35.5240401Z  at 
> org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader.fetch(TestingSplitReader.java:52)
> 2020-09-15T10:51:35.5241471Z  - locked <0xc27f5be8> (a 
> java.util.ArrayDeque)
> 2020-09-15T10:51:35.5242180Z  at 
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
> 2020-09-15T10:51:35.5243245Z  at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:128)
> 2020-09-15T10:51:35.5244263Z  at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:95)
> 2020-09-15T10:51:35.5245128Z  at 
> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42)
> 2020-09-15T10:51:35.5245973Z  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 2020-09-15T10:51:35.5247081Z  at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 2020-09-15T10:51:35.5247816Z  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 2020-09-15T10:51:35.5248809Z  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 2020-09-15T10:51:35.5249463Z  at java.lang.Thread.run(Thread.java:748)
> 2020-09-15T10:51:35.5249827Z 
> 2020-09-15T10:51:35.5250383Z "SourceFetcher" #37 prio=5 os_prio=0 
> tid=0x7f70d0a4b000 nid=0x856 in Object.wait() [0x7f6f80cfa000]
> 2020-09-15T10:51:35.5251124Zjava.lang.Thread.State: WAITING (on object 
> monitor)
> 2020-09-15T10:51:35.5251636Z  at 

[jira] [Commented] (FLINK-20068) KafkaSubscriberTest.testTopicPatternSubscriber failed with unexpected results

2020-11-10 Thread Xuannan Su (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17229079#comment-17229079
 ] 

Xuannan Su commented on FLINK-20068:


Another instance
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9388=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5

> KafkaSubscriberTest.testTopicPatternSubscriber failed with unexpected results
> -
>
> Key: FLINK-20068
> URL: https://issues.apache.org/jira/browse/FLINK-20068
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.12.0
>Reporter: Dian Fu
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.12.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9365=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5
> {code}
> 2020-11-10T00:14:22.7658242Z [ERROR] 
> testTopicPatternSubscriber(org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberTest)
>   Time elapsed: 0.012 s  <<< FAILURE!
> 2020-11-10T00:14:22.7659838Z java.lang.AssertionError: 
> expected:<[pattern-topic-5, pattern-topic-4, pattern-topic-7, 
> pattern-topic-6, pattern-topic-9, pattern-topic-8, pattern-topic-1, 
> pattern-topic-0, pattern-topic-3]> but was:<[]>
> 2020-11-10T00:14:22.7660740Z  at org.junit.Assert.fail(Assert.java:88)
> 2020-11-10T00:14:22.7661245Z  at 
> org.junit.Assert.failNotEquals(Assert.java:834)
> 2020-11-10T00:14:22.7661788Z  at 
> org.junit.Assert.assertEquals(Assert.java:118)
> 2020-11-10T00:14:22.7662312Z  at 
> org.junit.Assert.assertEquals(Assert.java:144)
> 2020-11-10T00:14:22.7663051Z  at 
> org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberTest.testTopicPatternSubscriber(KafkaSubscriberTest.java:94)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19253) SourceReaderTestBase.testAddSplitToExistingFetcher hangs

2020-11-09 Thread Xuannan Su (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17229013#comment-17229013
 ] 

Xuannan Su commented on FLINK-19253:


[~becket_qin] I'd love to submit a fix. Could you assign the ticket to me?

> SourceReaderTestBase.testAddSplitToExistingFetcher hangs
> 
>
> Key: FLINK-19253
> URL: https://issues.apache.org/jira/browse/FLINK-19253
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.12.0
>Reporter: Dian Fu
>Priority: Major
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=6521=logs=fc5181b0-e452-5c8f-68de-1097947f6483=62110053-334f-5295-a0ab-80dd7e2babbf
> {code}
> 2020-09-15T10:51:35.5236837Z "SourceFetcher" #39 prio=5 os_prio=0 
> tid=0x7f70d0a57000 nid=0x858 in Object.wait() [0x7f6fd81f]
> 2020-09-15T10:51:35.5237447Zjava.lang.Thread.State: WAITING (on object 
> monitor)
> 2020-09-15T10:51:35.5237962Z  at java.lang.Object.wait(Native Method)
> 2020-09-15T10:51:35.5238886Z  - waiting on <0xc27f5be8> (a 
> java.util.ArrayDeque)
> 2020-09-15T10:51:35.5239380Z  at java.lang.Object.wait(Object.java:502)
> 2020-09-15T10:51:35.5240401Z  at 
> org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader.fetch(TestingSplitReader.java:52)
> 2020-09-15T10:51:35.5241471Z  - locked <0xc27f5be8> (a 
> java.util.ArrayDeque)
> 2020-09-15T10:51:35.5242180Z  at 
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
> 2020-09-15T10:51:35.5243245Z  at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:128)
> 2020-09-15T10:51:35.5244263Z  at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:95)
> 2020-09-15T10:51:35.5245128Z  at 
> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42)
> 2020-09-15T10:51:35.5245973Z  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 2020-09-15T10:51:35.5247081Z  at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 2020-09-15T10:51:35.5247816Z  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 2020-09-15T10:51:35.5248809Z  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 2020-09-15T10:51:35.5249463Z  at java.lang.Thread.run(Thread.java:748)
> 2020-09-15T10:51:35.5249827Z 
> 2020-09-15T10:51:35.5250383Z "SourceFetcher" #37 prio=5 os_prio=0 
> tid=0x7f70d0a4b000 nid=0x856 in Object.wait() [0x7f6f80cfa000]
> 2020-09-15T10:51:35.5251124Zjava.lang.Thread.State: WAITING (on object 
> monitor)
> 2020-09-15T10:51:35.5251636Z  at java.lang.Object.wait(Native Method)
> 2020-09-15T10:51:35.5252767Z  - waiting on <0xc298d0b8> (a 
> java.util.ArrayDeque)
> 2020-09-15T10:51:35.5253336Z  at java.lang.Object.wait(Object.java:502)
> 2020-09-15T10:51:35.5254184Z  at 
> org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader.fetch(TestingSplitReader.java:52)
> 2020-09-15T10:51:35.5255220Z  - locked <0xc298d0b8> (a 
> java.util.ArrayDeque)
> 2020-09-15T10:51:35.5255678Z  at 
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
> 2020-09-15T10:51:35.5256235Z  at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:128)
> 2020-09-15T10:51:35.5256803Z  at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:95)
> 2020-09-15T10:51:35.5257351Z  at 
> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42)
> 2020-09-15T10:51:35.5257838Z  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 2020-09-15T10:51:35.5258284Z  at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 2020-09-15T10:51:35.5258856Z  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 2020-09-15T10:51:35.5259350Z  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 2020-09-15T10:51:35.5260011Z  at java.lang.Thread.run(Thread.java:748)
> 2020-09-15T10:51:35.5260211Z 
> 2020-09-15T10:51:35.5260574Z "process reaper" #24 daemon prio=10 os_prio=0 
> tid=0x7f6f70042000 nid=0x844 waiting on condition [0x7f6fd832a000]
> 2020-09-15T10:51:35.5261036Zjava.lang.Thread.State: TIMED_WAITING 
> (parking)
> 2020-09-15T10:51:35.5261342Z  at sun.misc.Unsafe.park(Native Method)
> 2020-09-15T10:51:35.5261972Z  - parking to wait for  <0x815d0810> (a 
> java.util.concurrent.SynchronousQueue$TransferStack)
> 2020-09-15T10:51:35.5262456Z  at 
> 

[jira] [Commented] (FLINK-19253) SourceReaderTestBase.testAddSplitToExistingFetcher hangs

2020-11-09 Thread Xuannan Su (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17228958#comment-17228958
 ] 

Xuannan Su commented on FLINK-19253:


I think it is a potential race condition in SplitFetcher.

It occurs when one thread executing the SplitFetcher#checkAndSetIdle method and 
the other thread adding split to the SplitFetcher with SplitFetcher#addSplits. 
i.e., the checkAndSetIdle first check if it should go to idle and set the 
isIdle flag. Between these two steps, another thread could call the addSplits, 
which put a new task into the taskQueue and set the isIdle flag to false. Then, 
the first thread set the isIdle flag to true.

We need to sychronize the thread that modifying the isIdle flag.

cc [~becket_qin]


> SourceReaderTestBase.testAddSplitToExistingFetcher hangs
> 
>
> Key: FLINK-19253
> URL: https://issues.apache.org/jira/browse/FLINK-19253
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.12.0
>Reporter: Dian Fu
>Priority: Major
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=6521=logs=fc5181b0-e452-5c8f-68de-1097947f6483=62110053-334f-5295-a0ab-80dd7e2babbf
> {code}
> 2020-09-15T10:51:35.5236837Z "SourceFetcher" #39 prio=5 os_prio=0 
> tid=0x7f70d0a57000 nid=0x858 in Object.wait() [0x7f6fd81f]
> 2020-09-15T10:51:35.5237447Zjava.lang.Thread.State: WAITING (on object 
> monitor)
> 2020-09-15T10:51:35.5237962Z  at java.lang.Object.wait(Native Method)
> 2020-09-15T10:51:35.5238886Z  - waiting on <0xc27f5be8> (a 
> java.util.ArrayDeque)
> 2020-09-15T10:51:35.5239380Z  at java.lang.Object.wait(Object.java:502)
> 2020-09-15T10:51:35.5240401Z  at 
> org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader.fetch(TestingSplitReader.java:52)
> 2020-09-15T10:51:35.5241471Z  - locked <0xc27f5be8> (a 
> java.util.ArrayDeque)
> 2020-09-15T10:51:35.5242180Z  at 
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
> 2020-09-15T10:51:35.5243245Z  at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:128)
> 2020-09-15T10:51:35.5244263Z  at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:95)
> 2020-09-15T10:51:35.5245128Z  at 
> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42)
> 2020-09-15T10:51:35.5245973Z  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 2020-09-15T10:51:35.5247081Z  at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 2020-09-15T10:51:35.5247816Z  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 2020-09-15T10:51:35.5248809Z  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 2020-09-15T10:51:35.5249463Z  at java.lang.Thread.run(Thread.java:748)
> 2020-09-15T10:51:35.5249827Z 
> 2020-09-15T10:51:35.5250383Z "SourceFetcher" #37 prio=5 os_prio=0 
> tid=0x7f70d0a4b000 nid=0x856 in Object.wait() [0x7f6f80cfa000]
> 2020-09-15T10:51:35.5251124Zjava.lang.Thread.State: WAITING (on object 
> monitor)
> 2020-09-15T10:51:35.5251636Z  at java.lang.Object.wait(Native Method)
> 2020-09-15T10:51:35.5252767Z  - waiting on <0xc298d0b8> (a 
> java.util.ArrayDeque)
> 2020-09-15T10:51:35.5253336Z  at java.lang.Object.wait(Object.java:502)
> 2020-09-15T10:51:35.5254184Z  at 
> org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader.fetch(TestingSplitReader.java:52)
> 2020-09-15T10:51:35.5255220Z  - locked <0xc298d0b8> (a 
> java.util.ArrayDeque)
> 2020-09-15T10:51:35.5255678Z  at 
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
> 2020-09-15T10:51:35.5256235Z  at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:128)
> 2020-09-15T10:51:35.5256803Z  at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:95)
> 2020-09-15T10:51:35.5257351Z  at 
> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42)
> 2020-09-15T10:51:35.5257838Z  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 2020-09-15T10:51:35.5258284Z  at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 2020-09-15T10:51:35.5258856Z  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 2020-09-15T10:51:35.5259350Z  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 2020-09-15T10:51:35.5260011Z  at java.lang.Thread.run(Thread.java:748)
> 2020-09-15T10:51:35.5260211Z 
> 

[jira] [Commented] (FLINK-20050) SourceCoordinatorProviderTest.testCheckpointAndReset failed with NullPointerException

2020-11-09 Thread Xuannan Su (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17228464#comment-17228464
 ] 

Xuannan Su commented on FLINK-20050:


I think it is introduced by this commit 
[d5b5103c|https://github.com/apache/flink/commit/d5b5103c1ed855b45fd08b1738f044b01864de58].
 cc [~becket_qin] 

> SourceCoordinatorProviderTest.testCheckpointAndReset failed with 
> NullPointerException
> -
>
> Key: FLINK-20050
> URL: https://issues.apache.org/jira/browse/FLINK-20050
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.12.0
>Reporter: Dian Fu
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.12.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9322=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=7c61167f-30b3-5893-cc38-a9e3d057e392
> {code}
> 2020-11-08T22:24:39.5642544Z [ERROR] 
> testCheckpointAndReset(org.apache.flink.runtime.source.coordinator.SourceCoordinatorProviderTest)
>   Time elapsed: 0.954 s  <<< ERROR!
> 2020-11-08T22:24:39.5643055Z java.lang.NullPointerException
> 2020-11-08T22:24:39.5643578Z  at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinatorProviderTest.testCheckpointAndReset(SourceCoordinatorProviderTest.java:94)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20018) pipeline.cached-files option cannot escape ':' in path

2020-11-06 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-20018:
--

 Summary: pipeline.cached-files option cannot escape ':' in path
 Key: FLINK-20018
 URL: https://issues.apache.org/jira/browse/FLINK-20018
 Project: Flink
  Issue Type: Bug
Reporter: Xuannan Su


pipeline.cached-files option cannot escape ':' in path
e.g. When setting it to `name:file1,path:oss://bucket/file1` The path of file1 
is parsed as oss. And there are no way to escape the ':' in path.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19761) Add lookup method for registered ShuffleDescriptor in ShuffleMaster

2020-10-25 Thread Xuannan Su (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17220452#comment-17220452
 ] 

Xuannan Su commented on FLINK-19761:


You are right that not returning all the information it needs to consume the 
result partition will weaken the functionality when sharing cluster partition 
across clusters. In that case, I agree to send the shuffle descriptor back to 
the client. However, I feel like it is still missing something without the 
look-up method. e.g., If the user indeed using an external shuffle service, and 
some partition is promoted so that it remains after the job is finished. In 
that case, the external shuffle service should not rely on the client to keep 
the information to consume the partition.

> Add lookup method for registered ShuffleDescriptor in ShuffleMaster
> ---
>
> Key: FLINK-19761
> URL: https://issues.apache.org/jira/browse/FLINK-19761
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Xuannan Su
>Priority: Major
>
> Currently, the ShuffleMaster can register a partition and get the shuffle 
> descriptor. However, it lacks the ability to look up the registered 
> ShuffleDescriptors belongs to an IntermediateResult by the 
> IntermediateDataSetID.
> Adding the lookup method to the ShuffleMaster can make reusing the cluster 
> partition more easily. For example, we don't have to return the 
> ShuffleDescriptor to the client just so that the other job can somehow encode 
> the ShuffleDescriptor in the JobGraph to consume the cluster partition. 
> Instead, we only need to return the IntermediateDatSetID and use it to lookup 
> the ShuffleDescriptor by another job.
> By adding the lookup method in ShuffleMaster, if we have an external shuffle 
> service and the lifecycle of the IntermediateResult is not bounded to the 
> cluster, we can look up the ShuffleDescriptor and reuse the 
> IntermediateResult by a job running on another cluster even if the cluster 
> that produced the IntermediateResult is shutdown.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19761) Add lookup method for registered ShuffleDescriptor in ShuffleMaster

2020-10-22 Thread Xuannan Su (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218891#comment-17218891
 ] 

Xuannan Su commented on FLINK-19761:


[~trohrmann] The assumption is that the ShuffleMaster should know about the 
cluster partition you are asking for. Meaning if NettyShuffleService is used, 
the cluster partition should only be accessible in the same cluster. If you 
want to share the intermediate result across different clusters, you need to 
have an external shuffle service whose lifecycle is not bound to the cluster.
If the client tries to ask for a cluster partition that the shuffle master 
doesn't know of, the job will fail, and it is up to the client-side to decide 
what to do. For the cache table on Table API, it can re-execute the original 
graph that produces the intermediate result. 

> Add lookup method for registered ShuffleDescriptor in ShuffleMaster
> ---
>
> Key: FLINK-19761
> URL: https://issues.apache.org/jira/browse/FLINK-19761
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Xuannan Su
>Priority: Major
>
> Currently, the ShuffleMaster can register a partition and get the shuffle 
> descriptor. However, it lacks the ability to look up the registered 
> ShuffleDescriptors belongs to an IntermediateResult by the 
> IntermediateDataSetID.
> Adding the lookup method to the ShuffleMaster can make reusing the cluster 
> partition more easily. For example, we don't have to return the 
> ShuffleDescriptor to the client just so that the other job can somehow encode 
> the ShuffleDescriptor in the JobGraph to consume the cluster partition. 
> Instead, we only need to return the IntermediateDatSetID and use it to lookup 
> the ShuffleDescriptor by another job.
> By adding the lookup method in ShuffleMaster, if we have an external shuffle 
> service and the lifecycle of the IntermediateResult is not bounded to the 
> cluster, we can look up the ShuffleDescriptor and reuse the 
> IntermediateResult by a job running on another cluster even if the cluster 
> that produced the IntermediateResult is shutdown.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19761) Add lookup method for registered ShuffleDescriptor in ShuffleMaster

2020-10-22 Thread Xuannan Su (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218826#comment-17218826
 ] 

Xuannan Su commented on FLINK-19761:


cc [~trohrmann] [~azagrebin] [~zjwang] 
I'd love to hear your thought on this.

> Add lookup method for registered ShuffleDescriptor in ShuffleMaster
> ---
>
> Key: FLINK-19761
> URL: https://issues.apache.org/jira/browse/FLINK-19761
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Xuannan Su
>Priority: Major
>
> Currently, the ShuffleMaster can register a partition and get the shuffle 
> descriptor. However, it lacks the ability to look up the registered 
> ShuffleDescriptors belongs to an IntermediateResult by the 
> IntermediateDataSetID.
> Adding the lookup method to the ShuffleMaster can make reusing the cluster 
> partition more easily. For example, we don't have to return the 
> ShuffleDescriptor to the client just so that the other job can somehow encode 
> the ShuffleDescriptor in the JobGraph to consume the cluster partition. 
> Instead, we only need to return the IntermediateDatSetID and use it to lookup 
> the ShuffleDescriptor by another job.
> By adding the lookup method in ShuffleMaster, if we have an external shuffle 
> service and the lifecycle of the IntermediateResult is not bounded to the 
> cluster, we can look up the ShuffleDescriptor and reuse the 
> IntermediateResult by a job running on another cluster even if the cluster 
> that produced the IntermediateResult is shutdown.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19761) Add lookup method for registered ShuffleDescriptor in ShuffleMaster

2020-10-22 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-19761:
--

 Summary: Add lookup method for registered ShuffleDescriptor in 
ShuffleMaster
 Key: FLINK-19761
 URL: https://issues.apache.org/jira/browse/FLINK-19761
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: Xuannan Su


Currently, the ShuffleMaster can register a partition and get the shuffle 
descriptor. However, it lacks the ability to look up the registered 
ShuffleDescriptors belongs to an IntermediateResult by the 
IntermediateDataSetID.
Adding the lookup method to the ShuffleMaster can make reusing the cluster 
partition more easily. For example, we don't have to return the 
ShuffleDescriptor to the client just so that the other job can somehow encode 
the ShuffleDescriptor in the JobGraph to consume the cluster partition. 
Instead, we only need to return the IntermediateDatSetID and use it to lookup 
the ShuffleDescriptor by another job.
By adding the lookup method in ShuffleMaster, if we have an external shuffle 
service and the lifecycle of the IntermediateResult is not bounded to the 
cluster, we can look up the ShuffleDescriptor and reuse the IntermediateResult 
by a job running on another cluster even if the cluster that produced the 
IntermediateResult is shutdown.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19669) PipelinedRegionSchedulingStrategy#init ResultPartitionType blocking check should use isBlocking method

2020-10-15 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-19669:
--

 Summary: PipelinedRegionSchedulingStrategy#init 
ResultPartitionType blocking check should use isBlocking method
 Key: FLINK-19669
 URL: https://issues.apache.org/jira/browse/FLINK-19669
 Project: Flink
  Issue Type: Bug
Reporter: Xuannan Su


PipelinedRegionSchedulingStrategy#init ResultPartitionType blocking check 
should use isBlocking method instead of checking of the enum type.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19508) Add collect() operation on DataStream

2020-10-14 Thread Xuannan Su (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17214408#comment-17214408
 ] 

Xuannan Su commented on FLINK-19508:


This is a very useful feature to have. Thanks [~sjwiesman] for picking up the 
ticket and working on this. Could you let me know the timeline when the work 
can be done? And I'd like to help to work on this if needed.

> Add collect() operation on DataStream
> -
>
> Key: FLINK-19508
> URL: https://issues.apache.org/jira/browse/FLINK-19508
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Aljoscha Krettek
>Assignee: Seth Wiesman
>Priority: Major
>
> With the recent changes/additions to {{DataStreamUtils.collect()}} that make 
> it more robust by using the regular REST client to fetch results from 
> operators it might make sense to add a {{collect()}} operation right on 
> {{DataStream}}.
> This operation is still not meant for big data volumes but I think it can be 
> useful for debugging and fetching small amounts of messages to the client. 
> When we do this, we can also think about changing {{print()}} to print on the 
> client instead of to the {{TaskManager}} stdout. I think the current 
> behaviour of this operation is mostly confusing for users.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-19346) Generate and put ClusterPartitionDescriptor of ClusterPartition in JobResult when job finishes

2020-10-12 Thread Xuannan Su (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuannan Su updated FLINK-19346:
---
Component/s: Runtime / Coordination

> Generate and put ClusterPartitionDescriptor of ClusterPartition in JobResult 
> when job finishes
> --
>
> Key: FLINK-19346
> URL: https://issues.apache.org/jira/browse/FLINK-19346
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination, Table SQL / API
>Reporter: Xuannan Su
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-19343) FLIP-36: Support Interactive Programming in Flink

2020-10-12 Thread Xuannan Su (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuannan Su updated FLINK-19343:
---
Component/s: (was: Table SQL / API)

> FLIP-36: Support Interactive Programming in Flink
> -
>
> Key: FLINK-19343
> URL: https://issues.apache.org/jira/browse/FLINK-19343
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Coordination
>Reporter: Xuannan Su
>Priority: Major
>
> Please refer to the FLIP for any details: 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-19343) FLIP-36: Support Interactive Programming in Flink

2020-10-12 Thread Xuannan Su (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuannan Su updated FLINK-19343:
---
Component/s: (was: Runtime / Coordination)
 Table SQL / API

> FLIP-36: Support Interactive Programming in Flink
> -
>
> Key: FLINK-19343
> URL: https://issues.apache.org/jira/browse/FLINK-19343
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: Xuannan Su
>Priority: Major
>
> Please refer to the FLIP for any details: 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-19343) FLIP-36: Support Interactive Programming in Flink

2020-10-12 Thread Xuannan Su (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuannan Su updated FLINK-19343:
---
Component/s: Runtime / Coordination

> FLIP-36: Support Interactive Programming in Flink
> -
>
> Key: FLINK-19343
> URL: https://issues.apache.org/jira/browse/FLINK-19343
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Coordination, Table SQL / API
>Reporter: Xuannan Su
>Priority: Major
>
> Please refer to the FLIP for any details: 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19476) Introduce CacheManager in CatalogManager to keep track of the ClusterPartitionDescriptor

2020-09-30 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-19476:
--

 Summary: Introduce CacheManager in CatalogManager to keep track of 
the ClusterPartitionDescriptor 
 Key: FLINK-19476
 URL: https://issues.apache.org/jira/browse/FLINK-19476
 Project: Flink
  Issue Type: Sub-task
Reporter: Xuannan Su






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19355) Add close() method to TableEnvironment

2020-09-22 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-19355:
--

 Summary: Add close() method to TableEnvironment
 Key: FLINK-19355
 URL: https://issues.apache.org/jira/browse/FLINK-19355
 Project: Flink
  Issue Type: Sub-task
Reporter: Xuannan Su






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19353) BlinkPlanner translate and optimize CacheOperation

2020-09-22 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-19353:
--

 Summary: BlinkPlanner translate and optimize CacheOperation
 Key: FLINK-19353
 URL: https://issues.apache.org/jira/browse/FLINK-19353
 Project: Flink
  Issue Type: Sub-task
Reporter: Xuannan Su






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19354) Add invalidateCache() method in CachedTable

2020-09-22 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-19354:
--

 Summary: Add invalidateCache() method in CachedTable 
 Key: FLINK-19354
 URL: https://issues.apache.org/jira/browse/FLINK-19354
 Project: Flink
  Issue Type: Sub-task
Reporter: Xuannan Su






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19352) Add cache() method to Table

2020-09-22 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-19352:
--

 Summary: Add cache() method to Table
 Key: FLINK-19352
 URL: https://issues.apache.org/jira/browse/FLINK-19352
 Project: Flink
  Issue Type: Sub-task
Reporter: Xuannan Su






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-19350) StreamingJobGraphGenerator add the ClusterPartitionDescriptor of cached node to JobVertex

2020-09-22 Thread Xuannan Su (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuannan Su updated FLINK-19350:
---
Summary: StreamingJobGraphGenerator add the ClusterPartitionDescriptor of 
cached node to JobVertex   (was: StreamingJobGraphGenerator generate job graph 
with cached node)

> StreamingJobGraphGenerator add the ClusterPartitionDescriptor of cached node 
> to JobVertex 
> --
>
> Key: FLINK-19350
> URL: https://issues.apache.org/jira/browse/FLINK-19350
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Xuannan Su
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-19349) StreamGraphGenerator handle CacheSource and CacheSink

2020-09-22 Thread Xuannan Su (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuannan Su updated FLINK-19349:
---
Summary: StreamGraphGenerator handle CacheSource and CacheSink  (was: 
StreamGraph handle CacheSource and CacheSink)

> StreamGraphGenerator handle CacheSource and CacheSink
> -
>
> Key: FLINK-19349
> URL: https://issues.apache.org/jira/browse/FLINK-19349
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Xuannan Su
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19351) StreamingJobGraphGenerator set the caching node to BoundedBlockingType

2020-09-22 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-19351:
--

 Summary: StreamingJobGraphGenerator set the caching node to 
BoundedBlockingType
 Key: FLINK-19351
 URL: https://issues.apache.org/jira/browse/FLINK-19351
 Project: Flink
  Issue Type: Sub-task
Reporter: Xuannan Su






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19350) StreamingJobGraphGenerator generate job graph with cached node

2020-09-22 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-19350:
--

 Summary: StreamingJobGraphGenerator generate job graph with cached 
node
 Key: FLINK-19350
 URL: https://issues.apache.org/jira/browse/FLINK-19350
 Project: Flink
  Issue Type: Sub-task
Reporter: Xuannan Su






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19348) Introduce CacheSource and CacheSink

2020-09-22 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-19348:
--

 Summary: Introduce CacheSource and CacheSink
 Key: FLINK-19348
 URL: https://issues.apache.org/jira/browse/FLINK-19348
 Project: Flink
  Issue Type: Sub-task
Reporter: Xuannan Su






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19349) StreamGraph handle CacheSource and CacheSink

2020-09-22 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-19349:
--

 Summary: StreamGraph handle CacheSource and CacheSink
 Key: FLINK-19349
 URL: https://issues.apache.org/jira/browse/FLINK-19349
 Project: Flink
  Issue Type: Sub-task
Reporter: Xuannan Su






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19346) Generate and put ClusterPartitionDescriptor of ClusterPartition in JobResult when job finishes

2020-09-22 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-19346:
--

 Summary: Generate and put ClusterPartitionDescriptor of 
ClusterPartition in JobResult when job finishes
 Key: FLINK-19346
 URL: https://issues.apache.org/jira/browse/FLINK-19346
 Project: Flink
  Issue Type: Sub-task
Reporter: Xuannan Su






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19347) Generate InputGateDeploymentDescriptor from a JobVertex with ClusterPartitionDescriptor

2020-09-22 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-19347:
--

 Summary: Generate InputGateDeploymentDescriptor from a JobVertex 
with ClusterPartitionDescriptor
 Key: FLINK-19347
 URL: https://issues.apache.org/jira/browse/FLINK-19347
 Project: Flink
  Issue Type: Sub-task
Reporter: Xuannan Su






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-19343) FLIP-36: Support Interactive Programming in Flink

2020-09-22 Thread Xuannan Su (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuannan Su updated FLINK-19343:
---
Component/s: Table SQL / API

> FLIP-36: Support Interactive Programming in Flink
> -
>
> Key: FLINK-19343
> URL: https://issues.apache.org/jira/browse/FLINK-19343
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: Xuannan Su
>Priority: Major
>
> Please refer to the FLIP for any details: 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19343) FLIP-36: Support Interactive Programming in Flink

2020-09-22 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-19343:
--

 Summary: FLIP-36: Support Interactive Programming in Flink
 Key: FLINK-19343
 URL: https://issues.apache.org/jira/browse/FLINK-19343
 Project: Flink
  Issue Type: New Feature
Reporter: Xuannan Su


Please refer to the FLIP for any details: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-18820) SourceOperator should send MAX_WATERMARK to downstream operator when closed

2020-08-05 Thread Xuannan Su (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18820?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuannan Su updated FLINK-18820:
---
Description: SourceOperator should send MAX_WATERMARK to the downstream 
operator when closed. Otherwise, the watermark of the downstream operator may 
not advance.   (was: SourceOperator should send MAX_WATERMARK to the downstream 
operator when closed. Otherwise, the watermark of the downstream operator will 
not advance. )

> SourceOperator should send MAX_WATERMARK to downstream operator when closed
> ---
>
> Key: FLINK-18820
> URL: https://issues.apache.org/jira/browse/FLINK-18820
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Reporter: Xuannan Su
>Priority: Major
>  Labels: pull-request-available
>
> SourceOperator should send MAX_WATERMARK to the downstream operator when 
> closed. Otherwise, the watermark of the downstream operator may not advance. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18820) SourceOperator should send MAX_WATERMARK to downstream operator when closed

2020-08-04 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-18820:
--

 Summary: SourceOperator should send MAX_WATERMARK to downstream 
operator when closed
 Key: FLINK-18820
 URL: https://issues.apache.org/jira/browse/FLINK-18820
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream
Reporter: Xuannan Su


SourceOperator should send MAX_WATERMARK to the downstream operator when 
closed. Otherwise, the watermark of the downstream operator will not advance. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   >