[jira] [Updated] (FLINK-35461) Improve Runtime Configuration for Flink 2.0
[ https://issues.apache.org/jira/browse/FLINK-35461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xuannan Su updated FLINK-35461: --- Release Note: The following configurations have been deprecated as we are phasing out the hash-based blocking shuffle: - `taskmanager.network.sort-shuffle.min-parallelism` - `taskmanager.network.blocking-shuffle.type` The following configurations have been deprecated as we are phasing out the legacy hybrid shuffle: - `taskmanager.network.hybrid-shuffle.spill-index-region-group-size` - `taskmanager.network.hybrid-shuffle.num-retained-in-memory-regions-max` - `taskmanager.network.hybrid-shuffle.enable-new-mode` The following configurations have been deprecated to simply the configuration of network buffers: - `taskmanager.network.memory.buffers-per-channel` - `taskmanager.network.memory.floating-buffers-per-gate` - `taskmanager.network.memory.max-buffers-per-channel` - `taskmanager.network.memory.max-overdraft-buffers-per-gate` - `taskmanager.network.memory.exclusive-buffers-request-timeout-ms` (Please use `taskmanager.network.memory.buffers-request-timeout` instead.) The configuration `taskmanager.network.batch-shuffle.compression.enabled` has been deprecated. Please set `taskmanager.network.compression.codec` to "NONE" to disable compression. The following Netty-related configurations are no longer recommended for use and have been deprecated: - `taskmanager.network.netty.num-arenas` - `taskmanager.network.netty.server.numThreads` - `taskmanager.network.netty.client.numThreads` - `taskmanager.network.netty.server.backlog` - `taskmanager.network.netty.sendReceiveBufferSize` - `taskmanager.network.netty.transport` The following configurations are unnecessary and have been deprecated: - `taskmanager.network.max-num-tcp-connections` - `fine-grained.shuffle-mode.all-blocking` was: The following configurations have been deprecated as we are phasing out the hash-based blocking shuffle: - `taskmanager.network.sort-shuffle.min-parallelism` - `taskmanager.network.blocking-shuffle.type` The following configurations have been deprecated as we are phasing out the legacy hybrid shuffle: - `taskmanager.network.hybrid-shuffle.spill-index-region-group-size` - `taskmanager.network.hybrid-shuffle.num-retained-in-memory-regions-max` - `taskmanager.network.hybrid-shuffle.enable-new-mode` The following configurations have been deprecated to simply the configuration of network buffers: - `taskmanager.network.memory.buffers-per-channel` - `taskmanager.network.memory.floating-buffers-per-gate` - `taskmanager.network.memory.max-buffers-per-channel` - `taskmanager.network.memory.max-overdraft-buffers-per-gate` - `taskmanager.network.memory.exclusive-buffers-request-timeout-ms` (Please use `taskmanager.network.memory.exclusive-buffers-request-timeout` instead.) The configuration `taskmanager.network.batch-shuffle.compression.enabled` has been deprecated. Please set `taskmanager.network.compression.codec` to "NONE" to disable compression. The following Netty-related configurations are no longer recommended for use and have been deprecated: - `taskmanager.network.netty.num-arenas` - `taskmanager.network.netty.server.numThreads` - `taskmanager.network.netty.client.numThreads` - `taskmanager.network.netty.server.backlog` - `taskmanager.network.netty.sendReceiveBufferSize` - `taskmanager.network.netty.transport` The following configurations are unnecessary and have been deprecated: - `taskmanager.network.max-num-tcp-connections` - `fine-grained.shuffle-mode.all-blocking` > Improve Runtime Configuration for Flink 2.0 > --- > > Key: FLINK-35461 > URL: https://issues.apache.org/jira/browse/FLINK-35461 > Project: Flink > Issue Type: Improvement > Components: Runtime / Configuration >Reporter: Xuannan Su >Assignee: Xuannan Su >Priority: Major > Labels: pull-request-available > > As Flink moves toward 2.0, we have revisited all runtime configurations and > identified several improvements to enhance user-friendliness and > maintainability. We want to refine the runtime configuration. > > This ticket implements all the changes discussed in > [FLIP-450|https://cwiki.apache.org/confluence/display/FLINK/FLIP-450%3A+Improve+Runtime+Configuration+for+Flink+2.0]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35461) Improve Runtime Configuration for Flink 2.0
[ https://issues.apache.org/jira/browse/FLINK-35461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xuannan Su updated FLINK-35461: --- Release Note: The following configurations have been deprecated as we are phasing out the hash-based blocking shuffle: - `taskmanager.network.sort-shuffle.min-parallelism` - `taskmanager.network.blocking-shuffle.type` The following configurations have been deprecated as we are phasing out the legacy hybrid shuffle: - `taskmanager.network.hybrid-shuffle.spill-index-region-group-size` - `taskmanager.network.hybrid-shuffle.num-retained-in-memory-regions-max` - `taskmanager.network.hybrid-shuffle.enable-new-mode` The following configurations have been deprecated to simply the configuration of network buffers: - `taskmanager.network.memory.buffers-per-channel` - `taskmanager.network.memory.floating-buffers-per-gate` - `taskmanager.network.memory.max-buffers-per-channel` - `taskmanager.network.memory.max-overdraft-buffers-per-gate` - `taskmanager.network.memory.exclusive-buffers-request-timeout-ms` (Please use `taskmanager.network.memory.exclusive-buffers-request-timeout` instead.) The configuration `taskmanager.network.batch-shuffle.compression.enabled` has been deprecated. Please set `taskmanager.network.compression.codec` to "NONE" to disable compression. The following Netty-related configurations are no longer recommended for use and have been deprecated: - `taskmanager.network.netty.num-arenas` - `taskmanager.network.netty.server.numThreads` - `taskmanager.network.netty.client.numThreads` - `taskmanager.network.netty.server.backlog` - `taskmanager.network.netty.sendReceiveBufferSize` - `taskmanager.network.netty.transport` The following configurations are unnecessary and have been deprecated: - `taskmanager.network.max-num-tcp-connections` - `fine-grained.shuffle-mode.all-blocking` > Improve Runtime Configuration for Flink 2.0 > --- > > Key: FLINK-35461 > URL: https://issues.apache.org/jira/browse/FLINK-35461 > Project: Flink > Issue Type: Improvement > Components: Runtime / Configuration >Reporter: Xuannan Su >Assignee: Xuannan Su >Priority: Major > Labels: pull-request-available > > As Flink moves toward 2.0, we have revisited all runtime configurations and > identified several improvements to enhance user-friendliness and > maintainability. We want to refine the runtime configuration. > > This ticket implements all the changes discussed in > [FLIP-450|https://cwiki.apache.org/confluence/display/FLINK/FLIP-450%3A+Improve+Runtime+Configuration+for+Flink+2.0]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35475) Introduce isInternalSorterSupport to OperatorAttributes
Xuannan Su created FLINK-35475: -- Summary: Introduce isInternalSorterSupport to OperatorAttributes Key: FLINK-35475 URL: https://issues.apache.org/jira/browse/FLINK-35475 Project: Flink Issue Type: Sub-task Components: Runtime / Task Reporter: Xuannan Su Introduce isInternalSorterSupport to OperatorAttributes to notify Flink whether the operator will sort the data internally in batch mode or during backlog. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35461) Improve Runtime Configuration for Flink 2.0
[ https://issues.apache.org/jira/browse/FLINK-35461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xuannan Su updated FLINK-35461: --- Description: As Flink moves toward 2.0, we have revisited all runtime configurations and identified several improvements to enhance user-friendliness and maintainability. We want to refine the runtime configuration. This ticket implements all the changes discussed in [FLIP-450|https://cwiki.apache.org/confluence/display/FLINK/FLIP-450%3A+Improve+Runtime+Configuration+for+Flink+2.0]. was: As Flink moves toward 2.0, we have revisited all runtime configurations and identified several improvements to enhance user-friendliness and maintainability. In this FLIP, we aim to refine the runtime configuration. This ticket implements all the changes discussed in [FLIP-450|https://cwiki.apache.org/confluence/display/FLINK/FLIP-450%3A+Improve+Runtime+Configuration+for+Flink+2.0]. > Improve Runtime Configuration for Flink 2.0 > --- > > Key: FLINK-35461 > URL: https://issues.apache.org/jira/browse/FLINK-35461 > Project: Flink > Issue Type: Improvement > Components: Runtime / Configuration >Reporter: Xuannan Su >Priority: Major > Labels: pull-request-available > > As Flink moves toward 2.0, we have revisited all runtime configurations and > identified several improvements to enhance user-friendliness and > maintainability. We want to refine the runtime configuration. > > This ticket implements all the changes discussed in > [FLIP-450|https://cwiki.apache.org/confluence/display/FLINK/FLIP-450%3A+Improve+Runtime+Configuration+for+Flink+2.0]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35461) Improve Runtime Configuration for Flink 2.0
Xuannan Su created FLINK-35461: -- Summary: Improve Runtime Configuration for Flink 2.0 Key: FLINK-35461 URL: https://issues.apache.org/jira/browse/FLINK-35461 Project: Flink Issue Type: Improvement Components: Runtime / Configuration Reporter: Xuannan Su As Flink moves toward 2.0, we have revisited all runtime configurations and identified several improvements to enhance user-friendliness and maintainability. In this FLIP, we aim to refine the runtime configuration. This ticket implements all the changes discussed in [FLIP-450|https://cwiki.apache.org/confluence/display/FLINK/FLIP-450%3A+Improve+Runtime+Configuration+for+Flink+2.0]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35359) General Improvement to Configuration for Flink 2.0
[ https://issues.apache.org/jira/browse/FLINK-35359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xuannan Su updated FLINK-35359: --- Release Note: - The following configurations have been updated to the Duration type in a backward-compatible manner: `client.heartbeat.interval`, `client.heartbeat.timeout`, `cluster.registration.error-delay`, `cluster.registration.initial-timeout`, `cluster.registration.max-timeout`, `cluster.registration.refused-registration-delay`, `cluster.services.shutdown-timeout`, `heartbeat.interval`, `heartbeat.timeout`, `high-availability.zookeeper.client.connection-timeout`, `high-availability.zookeeper.client.retry-wait`, `high-availability.zookeeper.client.session-timeout`, `historyserver.archive.fs.refresh-interval`, `historyserver.web.refresh-interval`, `metrics.fetcher.update-interval`, `metrics.latency.interval`, `metrics.reporter.influxdb.connectTimeout`, `metrics.reporter.influxdb.writeTimeout`, `metrics.system-resource-probing-interval`, `pekko.startup-timeout`, `pekko.tcp.timeout`, `resourcemanager.job.timeout`, `resourcemanager.standalone.start-up-time`, `resourcemanager.taskmanager-timeout`, `rest.await-leader-timeout`, `rest.connection-timeout`, `rest.idleness-timeout`, `rest.retry.delay`, `slot.idle.timeout`, `slot.request.timeout`, `task.cancellation.interval`, `task.cancellation.timeout`, `task.cancellation.timers.timeout`, `taskmanager.debug.memory.log-interval`, `web.refresh-interval`, `web.timeout`, `yarn.heartbeat.container-request-interval`. - The `taskmanager.network.compression.codec` and `table.optimizer.agg-phase-strategy` configurations have been updated to the Enum type in a backward-compatible manner. - The `yarn.application-attempts` configuration has been updated to the Int type in a backward-compatible manner. was: - The following configurations have been updated to the Duration type in a backward-compatible manner: `client.heartbeat.interval`, `client.heartbeat.timeout`, `cluster.registration.error-delay`, `cluster.registration.initial-timeout`, `cluster.registration.max-timeout`, `cluster.registration.refused-registration-delay`, `cluster.services.shutdown-timeout`, `heartbeat.interval`, `heartbeat.timeout`, `high-availability.zookeeper.client.connection-timeout`, `high-availability.zookeeper.client.retry-wait`, `high-availability.zookeeper.client.session-timeout`, `historyserver.archive.fs.refresh-interval`, `historyserver.web.refresh-interval`, `metrics.fetcher.update-interval`, `metrics.latency.interval`, `metrics.reporter.influxdb.connectTimeout`, `metrics.reporter.influxdb.writeTimeout`, `metrics.system-resource-probing-interval`, `pekko.startup-timeout`, `pekko.tcp.timeout`, `resourcemanager.job.timeout`, `resourcemanager.standalone.start-up-time`, `resourcemanager.taskmanager-timeout`, `rest.await-leader-timeout`, `rest.connection-timeout`, `rest.idleness-timeout`, `rest.retry.delay`, `slot.idle.timeout`, `slot.request.timeout`, `task.cancellation.interval`, `task.cancellation.timeout`, `task.cancellation.timers.timeout`, `taskmanager.debug.memory.log-interval`, `web.refresh-interval`, `web.timeout`, `yarn.heartbeat.container-request-interval` - The `taskmanager.network.compression.codec` and `table.optimizer.agg-phase-strategy` configurations have been updated to the Enum type in a backward-compatible manner. - The `yarn.application-attempts` configuration has been updated to the Int type in a backward-compatible manner. > General Improvement to Configuration for Flink 2.0 > -- > > Key: FLINK-35359 > URL: https://issues.apache.org/jira/browse/FLINK-35359 > Project: Flink > Issue Type: Improvement > Components: Runtime / Configuration >Reporter: Xuannan Su >Assignee: Xuannan Su >Priority: Major > Labels: pull-request-available > > As Flink moves toward version 2.0, we want to provide users with a better > experience with the existing configuration. In this FLIP, we outline several > general improvements to the current configuration: > * Ensure all the ConfigOptions are properly annotated > * Ensure all user-facing configurations are included in the documentation > generation process > * Make the existing ConfigOptions use the proper type > * Mark all internally used ConfigOptions with the @Internal annotation > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-442%3A+General+Improvement+to+Configuration+for+Flink+2.0 > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35359) General Improvement to Configuration for Flink 2.0
[ https://issues.apache.org/jira/browse/FLINK-35359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xuannan Su updated FLINK-35359: --- Release Note: - The following configurations have been updated to the Duration type in a backward-compatible manner: `client.heartbeat.interval`, `client.heartbeat.timeout`, `cluster.registration.error-delay`, `cluster.registration.initial-timeout`, `cluster.registration.max-timeout`, `cluster.registration.refused-registration-delay`, `cluster.services.shutdown-timeout`, `heartbeat.interval`, `heartbeat.timeout`, `high-availability.zookeeper.client.connection-timeout`, `high-availability.zookeeper.client.retry-wait`, `high-availability.zookeeper.client.session-timeout`, `historyserver.archive.fs.refresh-interval`, `historyserver.web.refresh-interval`, `metrics.fetcher.update-interval`, `metrics.latency.interval`, `metrics.reporter.influxdb.connectTimeout`, `metrics.reporter.influxdb.writeTimeout`, `metrics.system-resource-probing-interval`, `pekko.startup-timeout`, `pekko.tcp.timeout`, `resourcemanager.job.timeout`, `resourcemanager.standalone.start-up-time`, `resourcemanager.taskmanager-timeout`, `rest.await-leader-timeout`, `rest.connection-timeout`, `rest.idleness-timeout`, `rest.retry.delay`, `slot.idle.timeout`, `slot.request.timeout`, `task.cancellation.interval`, `task.cancellation.timeout`, `task.cancellation.timers.timeout`, `taskmanager.debug.memory.log-interval`, `web.refresh-interval`, `web.timeout`, `yarn.heartbeat.container-request-interval` - The `taskmanager.network.compression.codec` and `table.optimizer.agg-phase-strategy` configurations have been updated to the Enum type in a backward-compatible manner. - The `yarn.application-attempts` configuration has been updated to the Int type in a backward-compatible manner. was: - The following configurations have been updated to the Duration type in a backward-compatible manner:: `client.heartbeat.interval`, `client.heartbeat.timeout`, `cluster.registration.error-delay`, `cluster.registration.initial-timeout`, `cluster.registration.max-timeout`, `cluster.registration.refused-registration-delay`, `cluster.services.shutdown-timeout`, `heartbeat.interval`, `heartbeat.timeout`, `high-availability.zookeeper.client.connection-timeout`, `high-availability.zookeeper.client.retry-wait`, `high-availability.zookeeper.client.session-timeout`, `historyserver.archive.fs.refresh-interval`, `historyserver.web.refresh-interval`, `metrics.fetcher.update-interval`, `metrics.latency.interval`, `metrics.reporter.influxdb.connectTimeout`, `metrics.reporter.influxdb.writeTimeout`, `metrics.system-resource-probing-interval`, `pekko.startup-timeout`, `pekko.tcp.timeout`, `resourcemanager.job.timeout`, `resourcemanager.standalone.start-up-time`, `resourcemanager.taskmanager-timeout`, `rest.await-leader-timeout`, `rest.connection-timeout`, `rest.idleness-timeout`, `rest.retry.delay`, `slot.idle.timeout`, `slot.request.timeout`, `task.cancellation.interval`, `task.cancellation.timeout`, `task.cancellation.timers.timeout`, `taskmanager.debug.memory.log-interval`, `web.refresh-interval`, `web.timeout`, `yarn.heartbeat.container-request-interval`, - The `taskmanager.network.compression.codec` and `table.optimizer.agg-phase-strategy` configurations have been updated to the Enum type in a backward-compatible manner. - The `yarn.application-attempts` configuration has been updated to the Int type in a backward-compatible manner. > General Improvement to Configuration for Flink 2.0 > -- > > Key: FLINK-35359 > URL: https://issues.apache.org/jira/browse/FLINK-35359 > Project: Flink > Issue Type: Improvement > Components: Runtime / Configuration >Reporter: Xuannan Su >Assignee: Xuannan Su >Priority: Major > Labels: pull-request-available > > As Flink moves toward version 2.0, we want to provide users with a better > experience with the existing configuration. In this FLIP, we outline several > general improvements to the current configuration: > * Ensure all the ConfigOptions are properly annotated > * Ensure all user-facing configurations are included in the documentation > generation process > * Make the existing ConfigOptions use the proper type > * Mark all internally used ConfigOptions with the @Internal annotation > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-442%3A+General+Improvement+to+Configuration+for+Flink+2.0 > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35359) General Improvement to Configuration for Flink 2.0
[ https://issues.apache.org/jira/browse/FLINK-35359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xuannan Su updated FLINK-35359: --- Release Note: - The following configurations have been updated to the Duration type in a backward-compatible manner:: `client.heartbeat.interval`, `client.heartbeat.timeout`, `cluster.registration.error-delay`, `cluster.registration.initial-timeout`, `cluster.registration.max-timeout`, `cluster.registration.refused-registration-delay`, `cluster.services.shutdown-timeout`, `heartbeat.interval`, `heartbeat.timeout`, `high-availability.zookeeper.client.connection-timeout`, `high-availability.zookeeper.client.retry-wait`, `high-availability.zookeeper.client.session-timeout`, `historyserver.archive.fs.refresh-interval`, `historyserver.web.refresh-interval`, `metrics.fetcher.update-interval`, `metrics.latency.interval`, `metrics.reporter.influxdb.connectTimeout`, `metrics.reporter.influxdb.writeTimeout`, `metrics.system-resource-probing-interval`, `pekko.startup-timeout`, `pekko.tcp.timeout`, `resourcemanager.job.timeout`, `resourcemanager.standalone.start-up-time`, `resourcemanager.taskmanager-timeout`, `rest.await-leader-timeout`, `rest.connection-timeout`, `rest.idleness-timeout`, `rest.retry.delay`, `slot.idle.timeout`, `slot.request.timeout`, `task.cancellation.interval`, `task.cancellation.timeout`, `task.cancellation.timers.timeout`, `taskmanager.debug.memory.log-interval`, `web.refresh-interval`, `web.timeout`, `yarn.heartbeat.container-request-interval`, - The `taskmanager.network.compression.codec` and `table.optimizer.agg-phase-strategy` configurations have been updated to the Enum type in a backward-compatible manner. - The `yarn.application-attempts` configuration has been updated to the Int type in a backward-compatible manner. > General Improvement to Configuration for Flink 2.0 > -- > > Key: FLINK-35359 > URL: https://issues.apache.org/jira/browse/FLINK-35359 > Project: Flink > Issue Type: Improvement > Components: Runtime / Configuration >Reporter: Xuannan Su >Assignee: Xuannan Su >Priority: Major > Labels: pull-request-available > > As Flink moves toward version 2.0, we want to provide users with a better > experience with the existing configuration. In this FLIP, we outline several > general improvements to the current configuration: > * Ensure all the ConfigOptions are properly annotated > * Ensure all user-facing configurations are included in the documentation > generation process > * Make the existing ConfigOptions use the proper type > * Mark all internally used ConfigOptions with the @Internal annotation > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-442%3A+General+Improvement+to+Configuration+for+Flink+2.0 > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35359) General Improvement to Configuration for Flink 2.0
Xuannan Su created FLINK-35359: -- Summary: General Improvement to Configuration for Flink 2.0 Key: FLINK-35359 URL: https://issues.apache.org/jira/browse/FLINK-35359 Project: Flink Issue Type: Improvement Components: Runtime / Configuration Reporter: Xuannan Su As Flink moves toward version 2.0, we want to provide users with a better experience with the existing configuration. In this FLIP, we outline several general improvements to the current configuration: * Ensure all the ConfigOptions are properly annotated * Ensure all user-facing configurations are included in the documentation generation process * Make the existing ConfigOptions use the proper type * Mark all internally used ConfigOptions with the @Internal annotation https://cwiki.apache.org/confluence/display/FLINK/FLIP-442%3A+General+Improvement+to+Configuration+for+Flink+2.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35089) Two input AbstractStreamOperator may throw NPE when receiving RecordAttributes
[ https://issues.apache.org/jira/browse/FLINK-35089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xuannan Su updated FLINK-35089: --- Description: Currently the `lastRecordAttributes1` and `lastRecordAttributes2` in the `AbstractStreamOperator` are transient. The two fields will be null when it is deserialized in TaskManager, which may cause an NPE. To fix it, we will initialize the two fields in the {{setup}} method. was: Currently the `lastRecordAttributes1` and `lastRecordAttributes2` in the `AbstractStreamOperator` are transient. The two fields will be null when it is deserialized in TaskManager, which may cause an NPE. To fix it, we propose to make the RecordAttributes serializable and these fields non-transient. > Two input AbstractStreamOperator may throw NPE when receiving RecordAttributes > -- > > Key: FLINK-35089 > URL: https://issues.apache.org/jira/browse/FLINK-35089 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.19.0 >Reporter: Xuannan Su >Assignee: Xuannan Su >Priority: Major > Labels: pull-request-available > > Currently the `lastRecordAttributes1` and `lastRecordAttributes2` in the > `AbstractStreamOperator` are transient. The two fields will be null when it > is deserialized in TaskManager, which may cause an NPE. > To fix it, we will initialize the two fields in the {{setup}} method. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35089) Two input AbstractStreamOperator may throw NPE when receiving RecordAttributes
[ https://issues.apache.org/jira/browse/FLINK-35089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xuannan Su updated FLINK-35089: --- Description: Currently the `lastRecordAttributes1` and `lastRecordAttributes2` in the `AbstractStreamOperator` are transient. The two fields will be null when it is deserialized in TaskManager, which may cause an NPE. To fix it, we propose to make the RecordAttributes serializable and these fields non-transient. was: Currently the `lastRecordAttributes1` and `lastRecordAttributes2` in the `AbstractStreamOperator` are transient. The two fields will be null when it is deserialized in TaskManager, which may cause an NPE. To fix it, we propose to make the RecordAttributes serialization and these fields non-transient. > Two input AbstractStreamOperator may throw NPE when receiving RecordAttributes > -- > > Key: FLINK-35089 > URL: https://issues.apache.org/jira/browse/FLINK-35089 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.19.0 >Reporter: Xuannan Su >Priority: Major > > Currently the `lastRecordAttributes1` and `lastRecordAttributes2` in the > `AbstractStreamOperator` are transient. The two fields will be null when it > is deserialized in TaskManager, which may cause an NPE. > To fix it, we propose to make the RecordAttributes serializable and these > fields non-transient. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35089) Two input AbstractStreamOperator may throw NPE when receiving RecordAttributes
Xuannan Su created FLINK-35089: -- Summary: Two input AbstractStreamOperator may throw NPE when receiving RecordAttributes Key: FLINK-35089 URL: https://issues.apache.org/jira/browse/FLINK-35089 Project: Flink Issue Type: Bug Components: Runtime / Task Affects Versions: 1.19.0 Reporter: Xuannan Su Currently the `lastRecordAttributes1` and `lastRecordAttributes2` in the `AbstractStreamOperator` are transient. The two fields will be null when it is deserialized in TaskManager, which may cause an NPE. To fix it, we propose to make the RecordAttributes serialization and these fields non-transient. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34297) Release Testing Instructions: Verify FLINK-34079 Migrate string configuration key to ConfigOption
[ https://issues.apache.org/jira/browse/FLINK-34297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17812649#comment-17812649 ] Xuannan Su commented on FLINK-34297: [~lincoln.86xy] This change doesn't need crossteam testing. I think we can close the ticket. > Release Testing Instructions: Verify FLINK-34079 Migrate string configuration > key to ConfigOption > - > > Key: FLINK-34297 > URL: https://issues.apache.org/jira/browse/FLINK-34297 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Configuration >Affects Versions: 1.19.0 >Reporter: lincoln lee >Assignee: Xuannan Su >Priority: Blocker > Labels: release-testing > Fix For: 1.19.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34085) Remove deprecated string configuration keys in Flink 2.0
Xuannan Su created FLINK-34085: -- Summary: Remove deprecated string configuration keys in Flink 2.0 Key: FLINK-34085 URL: https://issues.apache.org/jira/browse/FLINK-34085 Project: Flink Issue Type: Sub-task Components: Runtime / Configuration Reporter: Xuannan Su Fix For: 2.0.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34084) Deprecate unused configuration in BinaryInput/OutputFormat and FileInput/OutputFormat
Xuannan Su created FLINK-34084: -- Summary: Deprecate unused configuration in BinaryInput/OutputFormat and FileInput/OutputFormat Key: FLINK-34084 URL: https://issues.apache.org/jira/browse/FLINK-34084 Project: Flink Issue Type: Sub-task Components: Runtime / Configuration Reporter: Xuannan Su Fix For: 1.19.0 Update FileInputFormat.java, FileOutputFormat.java, BinaryInputFormat.java, and BinaryOutputFormat.java to deprecate unused string configuration keys. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34083) Deprecate string configuration keys and unused constants in ConfigConstants
Xuannan Su created FLINK-34083: -- Summary: Deprecate string configuration keys and unused constants in ConfigConstants Key: FLINK-34083 URL: https://issues.apache.org/jira/browse/FLINK-34083 Project: Flink Issue Type: Sub-task Components: Runtime / Configuration Reporter: Xuannan Su Fix For: 1.19.0 * Update ConfigConstants.java to deprecate and replace string configuration keys * Mark unused constants in ConfigConstants.java as deprecated -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33890) Determine the initial status before receiving the first RecordAttributes
Xuannan Su created FLINK-33890: -- Summary: Determine the initial status before receiving the first RecordAttributes Key: FLINK-33890 URL: https://issues.apache.org/jira/browse/FLINK-33890 Project: Flink Issue Type: Sub-task Components: Runtime / Task Reporter: Xuannan Su Currently, all the operators are initialized with non-backlog mode. Ideally, we should determine the initial status before receiving the first {{RecordAttributes}} so that we don't have to initialize the operator in non-backlog mode and immediately switch to backlog mode before processing any records. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33810) Propagate RecordAttributes that contains isProcessingBacklog status
Xuannan Su created FLINK-33810: -- Summary: Propagate RecordAttributes that contains isProcessingBacklog status Key: FLINK-33810 URL: https://issues.apache.org/jira/browse/FLINK-33810 Project: Flink Issue Type: Sub-task Components: Runtime / Task Reporter: Xuannan Su -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33399) Support switching from batch to stream mode for KeyedCoProcessOperator and IntervalJoinOperator
Xuannan Su created FLINK-33399: -- Summary: Support switching from batch to stream mode for KeyedCoProcessOperator and IntervalJoinOperator Key: FLINK-33399 URL: https://issues.apache.org/jira/browse/FLINK-33399 Project: Flink Issue Type: Sub-task Components: Runtime / Task Reporter: Xuannan Su Support switching from batch to stream mode for KeyedCoProcessOperator and IntervalJoinOperator. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33398) Support switching from batch to stream mode for one input stream operator
Xuannan Su created FLINK-33398: -- Summary: Support switching from batch to stream mode for one input stream operator Key: FLINK-33398 URL: https://issues.apache.org/jira/browse/FLINK-33398 Project: Flink Issue Type: Sub-task Components: Runtime / Task Reporter: Xuannan Su Introduce the infra to support switching from batch to stream mode for one input stream operator. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33184) HybridShuffleITCase fails with exception in resource cleanup of task Map on AZP
[ https://issues.apache.org/jira/browse/FLINK-33184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17775983#comment-17775983 ] Xuannan Su commented on FLINK-33184: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=53752=logs=5c8e7682-d68f-54d1-16a2-a09310218a49=86f654fa-ab48-5c1a-25f4-7e7f6afb9bba=9031 > HybridShuffleITCase fails with exception in resource cleanup of task Map on > AZP > --- > > Key: FLINK-33184 > URL: https://issues.apache.org/jira/browse/FLINK-33184 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.19.0 >Reporter: Sergey Nuyanzin >Priority: Critical > Labels: test-stability > > This build fails > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=53548=logs=baf26b34-3c6a-54e8-f93f-cf269b32f802=8c9d126d-57d2-5a9e-a8c8-ff53f7b35cd9=8710 > {noformat} > Map (5/10)#0] ERROR org.apache.flink.runtime.taskmanager.Task >[] - FATAL - exception in resource cleanup of task Map (5/10)#0 > (159f887fbd200ea7cfa4aaeb1127c4ab_0a448493b4782967b150582570326227_4_0) > . > java.lang.IllegalStateException: Leaking buffers. > at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) > ~[flink-core-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at > org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManagerImpl.release(TieredStorageMemoryManagerImpl.java:236) > ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at java.util.ArrayList.forEach(ArrayList.java:1259) ~[?:1.8.0_292] > at > org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry.clearResourceFor(TieredStorageResourceRegistry.java:59) > ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at > org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredResultPartition.releaseInternal(TieredResultPartition.java:195) > ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at > org.apache.flink.runtime.io.network.partition.ResultPartition.release(ResultPartition.java:262) > ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at > org.apache.flink.runtime.io.network.partition.ResultPartitionManager.releasePartition(ResultPartitionManager.java:88) > ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at > org.apache.flink.runtime.io.network.partition.ResultPartition.fail(ResultPartition.java:284) > ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at > org.apache.flink.runtime.taskmanager.Task.failAllResultPartitions(Task.java:1004) > ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at > org.apache.flink.runtime.taskmanager.Task.releaseResources(Task.java:990) > ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:838) > [flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > [flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292] > 01:17:22,375 [flink-pekko.actor.default-dispatcher-5] INFO > org.apache.flink.runtime.taskmanager.Task[] - Task Sink: > Unnamed (3/10)#0 is already in state CANCELING > 01:17:22,375 [Map (5/10)#0] ERROR > org.apache.flink.runtime.taskexecutor.TaskExecutor [] - FATAL - > exception in resource cleanup of task Map (5/10)#0 > (159f887fbd200ea7cfa4aaeb1127c4ab_0a448493b4782967b150582570326227_4_0) > . > java.lang.IllegalStateException: Leaking buffers. > at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) > ~[flink-core-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at > org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManagerImpl.release(TieredStorageMemoryManagerImpl.java:236) > ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at java.util.ArrayList.forEach(ArrayList.java:1259) ~[?:1.8.0_292] > at > org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry.clearResourceFor(TieredStorageResourceRegistry.java:59) > ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at > org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredResultPartition.releaseInternal(TieredResultPartition.java:195) > ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at > org.apache.flink.runtime.io.network.partition.ResultPartition.release(ResultPartition.java:262) > ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] > at >
[jira] [Created] (FLINK-33202) FLIP-327: Support switching from batch to stream mode to improve throughput when processing backlog data
Xuannan Su created FLINK-33202: -- Summary: FLIP-327: Support switching from batch to stream mode to improve throughput when processing backlog data Key: FLINK-33202 URL: https://issues.apache.org/jira/browse/FLINK-33202 Project: Flink Issue Type: New Feature Components: Runtime / Task Reporter: Xuannan Su Fix For: 1.19.0 Umbrella issue for [https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+switching+from+batch+to+stream+mode+to+improve+throughput+when+processing+backlog+data|https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog] h4. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32476) Support configuring object-reuse for internal operators
Xuannan Su created FLINK-32476: -- Summary: Support configuring object-reuse for internal operators Key: FLINK-32476 URL: https://issues.apache.org/jira/browse/FLINK-32476 Project: Flink Issue Type: New Feature Components: Runtime / Task Reporter: Xuannan Su Currently, object reuse is disabled by default for streaming jobs in order to prevent unexpected behavior. Object reuse becomes problematic when the upstream operator stores its output while the downstream operator modifies the input. However, many operators implemented by Flink, such as Flink SQL operators, do not modify the input. This implies that it is safe to reuse the input object in such cases. Therefore, we intend to enable object reuse specifically for operators that do not modify the input. As the first step, we will focus on the operators implemented within Flink. We will create the FLIP to introduce the API that allows user-defined operators to enable object reuse in the future. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32008) Protobuf format throws exception with Map datatype
Xuannan Su created FLINK-32008: -- Summary: Protobuf format throws exception with Map datatype Key: FLINK-32008 URL: https://issues.apache.org/jira/browse/FLINK-32008 Project: Flink Issue Type: Bug Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Affects Versions: 1.17.0 Reporter: Xuannan Su Attachments: flink-protobuf-example.zip The protobuf format throws exception when working with Map data type. I uploaded a example project to reproduce the problem. {code:java} Caused by: java.lang.RuntimeException: One or more fetchers have encountered exception at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:261) at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169) at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:131) at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:417) at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more Caused by: java.io.IOException: Failed to deserialize PB object. at org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.deserialize(PbRowDataDeserializationSchema.java:75) at org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.deserialize(PbRowDataDeserializationSchema.java:42) at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) at org.apache.flink.connector.file.table.DeserializationSchemaAdapter$LineBytesInputFormat.readRecord(DeserializationSchemaAdapter.java:197) at org.apache.flink.connector.file.table.DeserializationSchemaAdapter$LineBytesInputFormat.nextRecord(DeserializationSchemaAdapter.java:210) at org.apache.flink.connector.file.table.DeserializationSchemaAdapter$Reader.readBatch(DeserializationSchemaAdapter.java:124) at org.apache.flink.connector.file.src.util.RecordMapperWrapperRecordIterator$1.readBatch(RecordMapperWrapperRecordIterator.java:82) at org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:67) at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162) ... 6 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.convertProtoBinaryToRow(ProtoToRowConverter.java:129) at org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.deserialize(PbRowDataDeserializationSchema.java:70) ... 15 more Caused by: com.google.protobuf.InvalidProtocolBufferException: While parsing a protocol message, the input ended unexpectedly in the middle of a field. This could mean either that the input has been truncated or that an embedded
[jira] [Created] (FLINK-31944) Protobuf format throw com.google.protobuf.InvalidProtocolBufferException
Xuannan Su created FLINK-31944: -- Summary: Protobuf format throw com.google.protobuf.InvalidProtocolBufferException Key: FLINK-31944 URL: https://issues.apache.org/jira/browse/FLINK-31944 Project: Flink Issue Type: Bug Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Affects Versions: 1.17.0 Reporter: Xuannan Su Attachments: flink-protobuf-example.zip It seems that protobuf format throws the following exception when the first field of the message is string type. This may also occur for other types. I uploaded the maven project to reproduce the problem. {code:java} Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more Caused by: java.io.IOException: Failed to deserialize PB object. at org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.deserialize(PbRowDataDeserializationSchema.java:75) at org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.deserialize(PbRowDataDeserializationSchema.java:42) at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) at org.apache.flink.connector.file.table.DeserializationSchemaAdapter$LineBytesInputFormat.readRecord(DeserializationSchemaAdapter.java:197) at org.apache.flink.connector.file.table.DeserializationSchemaAdapter$LineBytesInputFormat.nextRecord(DeserializationSchemaAdapter.java:210) at org.apache.flink.connector.file.table.DeserializationSchemaAdapter$Reader.readBatch(DeserializationSchemaAdapter.java:124) at org.apache.flink.connector.file.src.util.RecordMapperWrapperRecordIterator$1.readBatch(RecordMapperWrapperRecordIterator.java:82) at org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:67) at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162) ... 6 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.convertProtoBinaryToRow(ProtoToRowConverter.java:129) at org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.deserialize(PbRowDataDeserializationSchema.java:70) ... 15 more Caused by: com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero). at com.google.protobuf.InvalidProtocolBufferException.invalidTag(InvalidProtocolBufferException.java:133) at com.google.protobuf.CodedInputStream$ArrayDecoder.readTag(CodedInputStream.java:633) at com.example.proto.Message.(Message.java:47) at com.example.proto.Message.(Message.java:9) at com.example.proto.Message$1.parsePartialFrom(Message.java:540) at com.example.proto.Message$1.parsePartialFrom(Message.java:534) at com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:158) at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:191) at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:203) at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:208) at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:48) at com.example.proto.Message.parseFrom(Message.java:218) ... 21 more {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31943) Multiple t_env cause class loading problem
Xuannan Su created FLINK-31943: -- Summary: Multiple t_env cause class loading problem Key: FLINK-31943 URL: https://issues.apache.org/jira/browse/FLINK-31943 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.17.0 Reporter: Xuannan Su Attachments: flink-sql-connector-kafka-1.17.0.jar, pyflink_classloader.py When a PyFlink process creates multiple StreamTableEnvironment with different EnvironmentSettings and sets the "pipeline.jars" at the first created t_env, it appears that the jar is not added to the classloader of the first created t_env. After digging a little bit, the reason may be that when creating the second table environment with a new EnvironmentSettings, the context classloader overwrites by a new classloader, see `EnvironmentSettings.Builder.build` method. I uploaded the script to reproduce the problem. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30607) Table.to_pandas doesn't support Map type
[ https://issues.apache.org/jira/browse/FLINK-30607?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17706208#comment-17706208 ] Xuannan Su commented on FLINK-30607: [~dianfu] Thanks for the patch! This is very useful in our use case. May I ask if we have a plan to backport this to version 1.16? > Table.to_pandas doesn't support Map type > > > Key: FLINK-30607 > URL: https://issues.apache.org/jira/browse/FLINK-30607 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.15.3 >Reporter: Xuannan Su >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0 > > > It seems that the Table#to_pandas method in PyFlink doesn't support Map type. > It throws the following exception. > {code:java} > py4j.protocol.Py4JJavaError: An error occurred while calling > z:org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame. > : java.lang.UnsupportedOperationException: Python vectorized UDF doesn't > support logical type MAP currently. > at > org.apache.flink.table.runtime.arrow.ArrowUtils$LogicalTypeToArrowTypeConverter.defaultMethod(ArrowUtils.java:743) > at > org.apache.flink.table.runtime.arrow.ArrowUtils$LogicalTypeToArrowTypeConverter.defaultMethod(ArrowUtils.java:617) > at > org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:167) > at org.apache.flink.table.types.logical.MapType.accept(MapType.java:115) > at > org.apache.flink.table.runtime.arrow.ArrowUtils.toArrowField(ArrowUtils.java:189) > at > org.apache.flink.table.runtime.arrow.ArrowUtils.lambda$toArrowSchema$0(ArrowUtils.java:180) > at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) > at > org.apache.flink.table.runtime.arrow.ArrowUtils.toArrowSchema(ArrowUtils.java:181) > at > org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame(ArrowUtils.java:483) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at > org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > at > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > at > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > at java.lang.Thread.run(Thread.java:748) {code} > This can be reproduced with the following code. > {code:java} > env = StreamExecutionEnvironment.get_execution_environment() > t_env = StreamTableEnvironment.create(env) > table = t_env.from_descriptor( > TableDescriptor.for_connector("datagen") > .schema( > Schema.new_builder() > .column("val", DataTypes.MAP(DataTypes.INT(), DataTypes.INT())) > .build() > ) > .option("number-of-rows", "10") > .build() > ) > df = table.to_pandas() > print(df) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-24456) Support bounded offset in the Kafka table connector
[ https://issues.apache.org/jira/browse/FLINK-24456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17706184#comment-17706184 ] Xuannan Su commented on FLINK-24456: Thanks for the effort! This is a particularly useful feature in our use case. May I ask if we plan to backport this patch to 1.16? > Support bounded offset in the Kafka table connector > --- > > Key: FLINK-24456 > URL: https://issues.apache.org/jira/browse/FLINK-24456 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka, Table SQL / Ecosystem >Reporter: Haohui Mai >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available, stale-assigned > Fix For: 1.17.0 > > > The {{setBounded}} API in the DataStream connector of Kafka is particularly > useful when writing tests. Unfortunately the table connector of Kafka lacks > the same API. > It would be good to have this API added. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30607) Table.to_pandas doesn't support Map type
Xuannan Su created FLINK-30607: -- Summary: Table.to_pandas doesn't support Map type Key: FLINK-30607 URL: https://issues.apache.org/jira/browse/FLINK-30607 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.15.3 Reporter: Xuannan Su It seems that the Table#to_pandas method in PyFlink doesn't support Map type. It throws the following exception. {code:java} py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame. : java.lang.UnsupportedOperationException: Python vectorized UDF doesn't support logical type MAP currently. at org.apache.flink.table.runtime.arrow.ArrowUtils$LogicalTypeToArrowTypeConverter.defaultMethod(ArrowUtils.java:743) at org.apache.flink.table.runtime.arrow.ArrowUtils$LogicalTypeToArrowTypeConverter.defaultMethod(ArrowUtils.java:617) at org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:167) at org.apache.flink.table.types.logical.MapType.accept(MapType.java:115) at org.apache.flink.table.runtime.arrow.ArrowUtils.toArrowField(ArrowUtils.java:189) at org.apache.flink.table.runtime.arrow.ArrowUtils.lambda$toArrowSchema$0(ArrowUtils.java:180) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) at org.apache.flink.table.runtime.arrow.ArrowUtils.toArrowSchema(ArrowUtils.java:181) at org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame(ArrowUtils.java:483) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) {code} This can be reproduced with the following code. {code:java} env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(env) table = t_env.from_descriptor( TableDescriptor.for_connector("datagen") .schema( Schema.new_builder() .column("val", DataTypes.MAP(DataTypes.INT(), DataTypes.INT())) .build() ) .option("number-of-rows", "10") .build() ) df = table.to_pandas() print(df) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30258) PyFlink supports closing loopback server
[ https://issues.apache.org/jira/browse/FLINK-30258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xuannan Su updated FLINK-30258: --- Summary: PyFlink supports closing loopback server (was: PyFlink supports closing loop back server) > PyFlink supports closing loopback server > > > Key: FLINK-30258 > URL: https://issues.apache.org/jira/browse/FLINK-30258 > Project: Flink > Issue Type: Improvement > Components: API / Python >Affects Versions: 1.16.0 >Reporter: Xuannan Su >Priority: Major > > Currently, a loopback server will be started whenever a > StreamExecutionEnvironment or StreamTableEnvironment is created. The loopback > server can only be closed after the process exit. This might not be a problem > for regular uses where only one environment object is used. > However, when running tests, such as the unit tests for PyFlink itself, as > the environment objects are created, the process starts more and more > loopback servers and takes more and more resources. > Therefore, we want to support closing the loopback server. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30258) PyFlink supports closing loop back server
[ https://issues.apache.org/jira/browse/FLINK-30258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xuannan Su updated FLINK-30258: --- Description: Currently, a loopback server will be started whenever a StreamExecutionEnvironment or StreamTableEnvironment is created. The loopback server can only be closed after the process exit. This might not be a problem for regular uses where only one environment object is used. However, when running tests, such as the unit tests for PyFlink itself, as the environment objects are created, the process starts more and more loopback servers and takes more and more resources. Therefore, we want to support closing the loopback server. was: Currently, a loopback server will be started whenever a StreamExecutionEnvironment or StreamTableEnvironment is created. The loopback server can only be closed after the process exit. This might not be a problem for regular uses where only one environment object is used. However, when running tests, such as the unit tests for PyFlink itself, as the environment objects are created, the process starts more and more loopback servers and takes more and more resources. > PyFlink supports closing loop back server > - > > Key: FLINK-30258 > URL: https://issues.apache.org/jira/browse/FLINK-30258 > Project: Flink > Issue Type: Improvement > Components: API / Python >Affects Versions: 1.16.0 >Reporter: Xuannan Su >Priority: Major > > Currently, a loopback server will be started whenever a > StreamExecutionEnvironment or StreamTableEnvironment is created. The loopback > server can only be closed after the process exit. This might not be a problem > for regular uses where only one environment object is used. > However, when running tests, such as the unit tests for PyFlink itself, as > the environment objects are created, the process starts more and more > loopback servers and takes more and more resources. > Therefore, we want to support closing the loopback server. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30258) PyFlink supports closing loop back server
Xuannan Su created FLINK-30258: -- Summary: PyFlink supports closing loop back server Key: FLINK-30258 URL: https://issues.apache.org/jira/browse/FLINK-30258 Project: Flink Issue Type: Improvement Components: API / Python Affects Versions: 1.16.0 Reporter: Xuannan Su Currently, a loopback server will be started whenever a StreamExecutionEnvironment or StreamTableEnvironment is created. The loopback server can only be closed after the process exit. This might not be a problem for regular uses where only one environment object is used. However, when running tests, such as the unit tests for PyFlink itself, as the environment objects are created, the process starts more and more loopback servers and takes more and more resources. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30078) Temporal join should finish when the left table is bounded and finished
Xuannan Su created FLINK-30078: -- Summary: Temporal join should finish when the left table is bounded and finished Key: FLINK-30078 URL: https://issues.apache.org/jira/browse/FLINK-30078 Project: Flink Issue Type: Improvement Components: Table SQL / Runtime Affects Versions: 1.16.0 Reporter: Xuannan Su Currently, temporal join with a bounded left table and an unbounded right table keeps running even when the left table is finished. >From the user's perspective, when the left table is finished, the result of >the temporal join is finalized so the temporal join should finish and the job >may also finish depending on the job topology. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-19200) UNIX_TIMESTAMP function support return in millisecond
[ https://issues.apache.org/jira/browse/FLINK-19200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17632108#comment-17632108 ] Xuannan Su commented on FLINK-19200: Any update on this issue? I am also looking for a way to convert timestamp to millisecond epoch. > UNIX_TIMESTAMP function support return in millisecond > - > > Key: FLINK-19200 > URL: https://issues.apache.org/jira/browse/FLINK-19200 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: leslieyuan >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor > > Now i use Flink1.10.0, i found that: > time = "2020-09-11 13:14:29.153" > UNIX_TIMESTAMP(time) return 1599801269 > UNIX_TIMESTAMP(time, '-MM-dd HH:mm:ss.SSS') also return 1599801269 > Yes, i see the Official website description, this function return in seconds, > but i think if i had given the format as above, which means that i need the > millisecond. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29951) Kafka SQL connector supports setting end offset/timestamp
Xuannan Su created FLINK-29951: -- Summary: Kafka SQL connector supports setting end offset/timestamp Key: FLINK-29951 URL: https://issues.apache.org/jira/browse/FLINK-29951 Project: Flink Issue Type: Improvement Components: Connectors / Kafka Affects Versions: 1.17.0 Reporter: Xuannan Su Currently, KafkaSource at DataStream API allows specifying end offset/timestamp by `KafkaSourceBuilder#setBounded`, while Kafka SQL connector has no way to do that. To better align the functionality with DataStream and support bounded stream, we want to support setting end offset/timestamp in Kafka SQL connector. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29753) FileSource throws exception reading file with name that ends with xz
Xuannan Su created FLINK-29753: -- Summary: FileSource throws exception reading file with name that ends with xz Key: FLINK-29753 URL: https://issues.apache.org/jira/browse/FLINK-29753 Project: Flink Issue Type: Bug Components: Connectors / FileSystem Affects Versions: 1.15.2 Reporter: Xuannan Su FileSource throws the following exception reading file with a name that ends with xz {code:java} Caused by: java.lang.RuntimeException: One or more fetchers have encountered exception at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225) at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169) at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130) at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385) at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NoClassDefFoundError: org/tukaani/xz/XZInputStream at org.apache.flink.api.common.io.compression.XZInputStreamFactory.create(XZInputStreamFactory.java:42) at org.apache.flink.api.common.io.compression.XZInputStreamFactory.create(XZInputStreamFactory.java:31) at org.apache.flink.connector.file.src.impl.StreamFormatAdapter.lambda$openStream$3(StreamFormatAdapter.java:178) at org.apache.flink.connector.file.src.util.Utils.doWithCleanupOnException(Utils.java:45) at org.apache.flink.connector.file.src.impl.StreamFormatAdapter.openStream(StreamFormatAdapter.java:172) at org.apache.flink.connector.file.src.impl.StreamFormatAdapter.createReader(StreamFormatAdapter.java:70) at org.apache.flink.connector.file.src.impl.FileSourceSplitReader.checkSplitOrStartNext(FileSourceSplitReader.java:112) at org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:65) at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more Caused by: java.lang.ClassNotFoundException: org.tukaani.xz.XZInputStream at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 16 more {code} The code to reproduce the error: {code:java} public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final FileSource source = FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path("/tmp/abcxz")) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "source").print(); env.execute(); } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29339) JobMasterPartitionTrackerImpl#requestShuffleDescriptorsFromResourceManager blocks main thread
[ https://issues.apache.org/jira/browse/FLINK-29339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17606950#comment-17606950 ] Xuannan Su commented on FLINK-29339: [~gaoyunhaii] I will take a look. Could you assign the ticket to me? > JobMasterPartitionTrackerImpl#requestShuffleDescriptorsFromResourceManager > blocks main thread > - > > Key: FLINK-29339 > URL: https://issues.apache.org/jira/browse/FLINK-29339 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.16.0 >Reporter: Chesnay Schepler >Priority: Critical > > {code:java} > private List requestShuffleDescriptorsFromResourceManager( > IntermediateDataSetID intermediateDataSetID) { > Preconditions.checkNotNull( > resourceManagerGateway, "JobMaster is not connected to > ResourceManager"); > try { > return this.resourceManagerGateway > .getClusterPartitionsShuffleDescriptors(intermediateDataSetID) > .get(); // <-- there's your problem > } catch (Throwable e) { > throw new RuntimeException( > String.format( > "Failed to get shuffle descriptors of intermediate > dataset %s from ResourceManager", > intermediateDataSetID), > e); > } > } > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29231) PyFlink udtaf produces different results in the same sliding window
Xuannan Su created FLINK-29231: -- Summary: PyFlink udtaf produces different results in the same sliding window Key: FLINK-29231 URL: https://issues.apache.org/jira/browse/FLINK-29231 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.15.2 Reporter: Xuannan Su Attachments: image-2022-09-08-17-20-06-296.png, input, test_agg.py It seems that PyFlink udtaf produces different results in the same sliding window. It can be reproduced with the given code and input. It is not always happening but the possibility is relatively high. The incorrect output is the following: !image-2022-09-08-17-20-06-296.png! We can see that the output contains different `val_sum` at `window_time` 2022-01-01 00:01:59.999. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-28964) Release Testing: Verify FLIP-205 Cache in DataStream for Batch Processing
[ https://issues.apache.org/jira/browse/FLINK-28964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xuannan Su resolved FLINK-28964. Resolution: Fixed > Release Testing: Verify FLIP-205 Cache in DataStream for Batch Processing > - > > Key: FLINK-28964 > URL: https://issues.apache.org/jira/browse/FLINK-28964 > Project: Flink > Issue Type: Sub-task > Components: API / DataStream >Reporter: Xuannan Su >Assignee: jiaxiong >Priority: Blocker > Labels: release-testing > Fix For: 1.16.0 > > > DataStream API provides the `cache` method to cache the result of a > DataStream and reuse it in later jobs with batch execution mode. > I think we should verify: > # Follow the doc to write a Flink job that produces cache and a job that > consumes cache and submit it to a session cluster(standalone or yarn). > # You can remove the source physically after the cache-producing job is > finished to verify that the cache-consuming job is not reading from the > source. For example, delete the file in the filesystem if you are using a > file source. > # You can restart the TaskManager after the cache-producing job is finished > to verify that the cache-consuming job will re-compute the result. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28964) Release Testing: Verify FLIP-205 Cache in DataStream for Batch Processing
[ https://issues.apache.org/jira/browse/FLINK-28964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17597462#comment-17597462 ] Xuannan Su commented on FLINK-28964: Thank [~jessionX] for your testing. I think you have run the test successfully. If there are no more problems till tomorrow, I'd like to close the ticket. > Release Testing: Verify FLIP-205 Cache in DataStream for Batch Processing > - > > Key: FLINK-28964 > URL: https://issues.apache.org/jira/browse/FLINK-28964 > Project: Flink > Issue Type: Sub-task > Components: API / DataStream >Reporter: Xuannan Su >Assignee: jiaxiong >Priority: Blocker > Labels: release-testing > Fix For: 1.16.0 > > > DataStream API provides the `cache` method to cache the result of a > DataStream and reuse it in later jobs with batch execution mode. > I think we should verify: > # Follow the doc to write a Flink job that produces cache and a job that > consumes cache and submit it to a session cluster(standalone or yarn). > # You can remove the source physically after the cache-producing job is > finished to verify that the cache-consuming job is not reading from the > source. For example, delete the file in the filesystem if you are using a > file source. > # You can restart the TaskManager after the cache-producing job is finished > to verify that the cache-consuming job will re-compute the result. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28988) Incorrect result for filter after temporal join
[ https://issues.apache.org/jira/browse/FLINK-28988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xuannan Su updated FLINK-28988: --- Description: The following code can reproduce the case {code:java} public class TemporalJoinSQLExample1 { public static void main(String[] args) throws Exception { // set up the Java DataStream API final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // set up the Java Table API final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); final DataStreamSource> ds = env.fromElements( new Tuple3<>(0, "online", Instant.ofEpochMilli(0)), new Tuple3<>(0, "offline", Instant.ofEpochMilli(10)), new Tuple3<>(0, "online", Instant.ofEpochMilli(20))); final Table table = tableEnv.fromDataStream( ds, Schema.newBuilder() .column("f0", DataTypes.INT()) .column("f1", DataTypes.STRING()) .column("f2", DataTypes.TIMESTAMP_LTZ(3)) .watermark("f2", "f2 - INTERVAL '2' SECONDS") .build()) .as("id", "state", "ts"); tableEnv.createTemporaryView("source_table", table); final Table dedupeTable = tableEnv.sqlQuery( "SELECT * FROM (" + " SELECT *, ROW_NUMBER() OVER (PARTITION BY id ORDER BY ts DESC) AS row_num FROM source_table" + ") WHERE row_num = 1"); tableEnv.createTemporaryView("versioned_table", dedupeTable); DataStreamSource> event = env.fromElements( new Tuple2<>(0, Instant.ofEpochMilli(0)), new Tuple2<>(0, Instant.ofEpochMilli(5)), new Tuple2<>(0, Instant.ofEpochMilli(10)), new Tuple2<>(0, Instant.ofEpochMilli(15)), new Tuple2<>(0, Instant.ofEpochMilli(20)), new Tuple2<>(0, Instant.ofEpochMilli(25))); final Table eventTable = tableEnv.fromDataStream( event, Schema.newBuilder() .column("f0", DataTypes.INT()) .column("f1", DataTypes.TIMESTAMP_LTZ(3)) .watermark("f1", "f1 - INTERVAL '2' SECONDS") .build()) .as("id", "ts"); tableEnv.createTemporaryView("event_table", eventTable); final Table result = tableEnv.sqlQuery( "SELECT * FROM event_table" + " LEFT JOIN versioned_table FOR SYSTEM_TIME AS OF event_table.ts" + " ON event_table.id = versioned_table.id"); result.execute().print(); result.filter($("state").isEqual("online")).execute().print(); } } {code} The result of temporal join is the following: |op| id| ts| id0| state| ts0| row_num| |+I| 0|1970-01-01 08:00:00.000| 0| online|1970-01-01 08:00:00.000| 1| |+I| 0|1970-01-01 08:00:00.005| 0| online|1970-01-01 08:00:00.000| 1| |+I| 0|1970-01-01 08:00:00.010| 0| offline|1970-01-01 08:00:00.010| 1| |+I| 0|1970-01-01 08:00:00.015| 0| offline|1970-01-01 08:00:00.010| 1| |+I| 0|1970-01-01 08:00:00.020| 0| online|1970-01-01 08:00:00.020| 1| |+I| 0|1970-01-01 08:00:00.025| 0| online|1970-01-01 08:00:00.020| 1| After filtering with predicate state = 'online', I expect only the two rows with state offline will be filtered out. But I got the following result: |op| id| ts| id0| state| ts0| row_num| |+I| 0|1970-01-01 08:00:00.020| 0| online|1970-01-01 08:00:00.020| 1| |+I| 0|1970-01-01 08:00:00.025| 0| online|1970-01-01 08:00:00.020| 1| was: The following code can reproduce the case {code:java} public class
[jira] [Created] (FLINK-28988) Incorrect result for filter after temporal join
Xuannan Su created FLINK-28988: -- Summary: Incorrect result for filter after temporal join Key: FLINK-28988 URL: https://issues.apache.org/jira/browse/FLINK-28988 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.15.1 Reporter: Xuannan Su The following code can reproduce the case {code:java} public class TemporalJoinSQLExample1 { public static void main(String[] args) throws Exception { // set up the Java DataStream API final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // set up the Java Table API final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); final DataStreamSource> ds = env.fromElements( new Tuple3<>(0, "online", Instant.ofEpochMilli(0)), new Tuple3<>(0, "offline", Instant.ofEpochMilli(10)), new Tuple3<>(0, "online", Instant.ofEpochMilli(20))); final Table table = tableEnv.fromDataStream( ds, Schema.newBuilder() .column("f0", DataTypes.INT()) .column("f1", DataTypes.STRING()) .column("f2", DataTypes.TIMESTAMP_LTZ(3)) .watermark("f2", "f2 - INTERVAL '2' SECONDS") .build()) .as("id", "state", "ts"); tableEnv.createTemporaryView("source_table", table); final Table dedupeTable = tableEnv.sqlQuery( "SELECT * FROM (" + " SELECT *, ROW_NUMBER() OVER (PARTITION BY id ORDER BY ts DESC) AS row_num FROM source_table" + ") WHERE row_num = 1"); tableEnv.createTemporaryView("versioned_table", dedupeTable); DataStreamSource> event = env.fromElements( new Tuple2<>(0, Instant.ofEpochMilli(0)), new Tuple2<>(0, Instant.ofEpochMilli(5)), new Tuple2<>(0, Instant.ofEpochMilli(10)), new Tuple2<>(0, Instant.ofEpochMilli(15)), new Tuple2<>(0, Instant.ofEpochMilli(20)), new Tuple2<>(0, Instant.ofEpochMilli(25))); final Table eventTable = tableEnv.fromDataStream( event, Schema.newBuilder() .column("f0", DataTypes.INT()) .column("f1", DataTypes.TIMESTAMP_LTZ(3)) .watermark("f1", "f1 - INTERVAL '2' SECONDS") .build()) .as("id", "ts"); tableEnv.createTemporaryView("event_table", eventTable); final Table result = tableEnv.sqlQuery( "SELECT * FROM event_table" + " LEFT JOIN versioned_table FOR SYSTEM_TIME AS OF event_table.ts" + " ON event_table.id = versioned_table.id"); result.execute().print(); result.filter($("state").isEqual("online")).execute().print(); } } {code} The result of temporal join is the following: ++-+-+-++-+--+ | op | id | ts | id0 | state | ts0 | row_num | ++-+-+-++-+--+ | +I | 0 | 1970-01-01 08:00:00.000 | 0 | online | 1970-01-01 08:00:00.000 | 1 | | +I | 0 | 1970-01-01 08:00:00.005 | 0 | online | 1970-01-01 08:00:00.000 | 1 | | +I | 0 | 1970-01-01 08:00:00.010 | 0 | offline | 1970-01-01 08:00:00.010 | 1 | | +I | 0 | 1970-01-01 08:00:00.015 | 0 | offline | 1970-01-01 08:00:00.010 | 1 | | +I | 0 | 1970-01-01 08:00:00.020 | 0 | online | 1970-01-01 08:00:00.020 | 1 | | +I | 0 | 1970-01-01 08:00:00.025 | 0 | online | 1970-01-01 08:00:00.020 | 1 |
[jira] [Created] (FLINK-28964) Release Testing: Verify FLIP-205 Cache in DataStream for Batch Processing
Xuannan Su created FLINK-28964: -- Summary: Release Testing: Verify FLIP-205 Cache in DataStream for Batch Processing Key: FLINK-28964 URL: https://issues.apache.org/jira/browse/FLINK-28964 Project: Flink Issue Type: Sub-task Components: API / DataStream Reporter: Xuannan Su Fix For: 1.16.0 DataStream API provides the `cache` method to cache the result of a DataStream and reuse it in later jobs with batch execution mode. I think we should verify: # Follow the doc to write a Flink job that produces cache and a job that consumes cache and submit it to a session cluster(standalone or yarn). # You can remove the source physically after the cache-producing job is finished to verify that the cache-consuming job is not reading from the source. For example, delete the file in the filesystem if you are using a file source. # You can restart the TaskManager after the cache-producing job is finished to verify that the cache-consuming job will re-compute the result. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28860) CacheITCase.testBatchProduceCacheStreamConsume failed
[ https://issues.apache.org/jira/browse/FLINK-28860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17577084#comment-17577084 ] Xuannan Su commented on FLINK-28860: [~hxbks2ks]I will take a look at it. Could you assign the ticket to me? > CacheITCase.testBatchProduceCacheStreamConsume failed > - > > Key: FLINK-28860 > URL: https://issues.apache.org/jira/browse/FLINK-28860 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.16.0 >Reporter: Huang Xingbo >Priority: Critical > Labels: test-stability > Fix For: 1.16.0 > > > {code:java} > 2022-08-08T03:27:22.1988575Z Aug 08 03:27:22 [ERROR] > org.apache.flink.test.streaming.runtime.CacheITCase.testBatchProduceCacheStreamConsume(Path) > Time elapsed: 0.593 s <<< ERROR! > 2022-08-08T03:27:22.1989338Z Aug 08 03:27:22 java.lang.RuntimeException: > Producing cache IntermediateResult is not supported in streaming mode > 2022-08-08T03:27:22.1990401Z Aug 08 03:27:22 at > org.apache.flink.streaming.runtime.translators.CacheTransformationTranslator.translateForStreamingInternal(CacheTransformationTranslator.java:75) > 2022-08-08T03:27:22.1991511Z Aug 08 03:27:22 at > org.apache.flink.streaming.runtime.translators.CacheTransformationTranslator.translateForStreamingInternal(CacheTransformationTranslator.java:42) > 2022-08-08T03:27:22.1993671Z Aug 08 03:27:22 at > org.apache.flink.streaming.api.graph.SimpleTransformationTranslator.translateForStreaming(SimpleTransformationTranslator.java:62) > 2022-08-08T03:27:22.1994900Z Aug 08 03:27:22 at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:830) > 2022-08-08T03:27:22.1995748Z Aug 08 03:27:22 at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:560) > 2022-08-08T03:27:22.1996932Z Aug 08 03:27:22 at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.getParentInputIds(StreamGraphGenerator.java:851) > 2022-08-08T03:27:22.1998562Z Aug 08 03:27:22 at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:809) > 2022-08-08T03:27:22.1999581Z Aug 08 03:27:22 at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:560) > 2022-08-08T03:27:22.2000376Z Aug 08 03:27:22 at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:319) > 2022-08-08T03:27:22.2001359Z Aug 08 03:27:22 at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2250) > 2022-08-08T03:27:22.2002767Z Aug 08 03:27:22 at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2241) > 2022-08-08T03:27:22.2004121Z Aug 08 03:27:22 at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2227) > 2022-08-08T03:27:22.2005059Z Aug 08 03:27:22 at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2178) > 2022-08-08T03:27:22.2005939Z Aug 08 03:27:22 at > org.apache.flink.streaming.api.datastream.DataStream.executeAndCollectWithClient(DataStream.java:1469) > 2022-08-08T03:27:22.2006735Z Aug 08 03:27:22 at > org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1334) > 2022-08-08T03:27:22.2007500Z Aug 08 03:27:22 at > org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1320) > 2022-08-08T03:27:22.2008315Z Aug 08 03:27:22 at > org.apache.flink.test.streaming.runtime.CacheITCase.testBatchProduceCacheStreamConsume(CacheITCase.java:190) > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=39518=logs=a57e0635-3fad-5b08-57c7-a4142d7d6fa9=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28857) Add Document for DataStream Cache API
Xuannan Su created FLINK-28857: -- Summary: Add Document for DataStream Cache API Key: FLINK-28857 URL: https://issues.apache.org/jira/browse/FLINK-28857 Project: Flink Issue Type: Sub-task Reporter: Xuannan Su -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28742) Table.to_pandas fails with lit("xxx")
Xuannan Su created FLINK-28742: -- Summary: Table.to_pandas fails with lit("xxx") Key: FLINK-28742 URL: https://issues.apache.org/jira/browse/FLINK-28742 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.15.1 Reporter: Xuannan Su Table.to_pandas method throws the following exception when the table contains lit("anyString"). {code:none} py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame. : java.lang.UnsupportedOperationException: Python vectorized UDF doesn't support logical type CHAR(3) NOT NULL currently. at org.apache.flink.table.runtime.arrow.ArrowUtils$LogicalTypeToArrowTypeConverter.defaultMethod(ArrowUtils.java:743) at org.apache.flink.table.runtime.arrow.ArrowUtils$LogicalTypeToArrowTypeConverter.defaultMethod(ArrowUtils.java:617) at org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:62) at org.apache.flink.table.types.logical.CharType.accept(CharType.java:148) at org.apache.flink.table.runtime.arrow.ArrowUtils.toArrowField(ArrowUtils.java:189) at org.apache.flink.table.runtime.arrow.ArrowUtils.lambda$toArrowSchema$0(ArrowUtils.java:180) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) at org.apache.flink.table.runtime.arrow.ArrowUtils.toArrowSchema(ArrowUtils.java:181) at org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame(ArrowUtils.java:483) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) {code} The code to reproduce the problem {code:python} env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(env) src_table = t_env.from_data_stream( env.from_collection([1, 2], type_info=BasicTypeInfo.INT_TYPE_INFO()) ) table = src_table.select(expr.lit("123")) # table.execute().print() print(table.to_pandas()){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28528) Table.getSchema fails on table with watermark
[ https://issues.apache.org/jira/browse/FLINK-28528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xuannan Su updated FLINK-28528: --- Description: The bug can be reproduced with the following test. The test can pass if we use the commented way to define the watermark. {code:python} def test_flink_2(self): env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(env) table = t_env.from_descriptor( TableDescriptor.for_connector("filesystem") .schema( Schema.new_builder() .column("name", DataTypes.STRING()) .column("cost", DataTypes.INT()) .column("distance", DataTypes.INT()) .column("time", DataTypes.TIMESTAMP(3)) .watermark("time", expr.col("time") - expr.lit(60).seconds) # .watermark("time", "`time` - INTERVAL '60' SECOND") .build() ) .format("csv") .option("path", "./input.csv") .build() ) print(table.get_schema()) {code} It causes the following exception {code:none} E pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Expression 'minus(time, 6)' is not string serializable. Currently, only expressions that originated from a SQL expression have a well-defined string representation. E at org.apache.flink.table.expressions.ResolvedExpression.asSerializableString(ResolvedExpression.java:51) E at org.apache.flink.table.api.TableSchema.lambda$fromResolvedSchema$13(TableSchema.java:455) E at java.util.Collections$SingletonList.forEach(Collections.java:4824) E at org.apache.flink.table.api.TableSchema.fromResolvedSchema(TableSchema.java:451) E at org.apache.flink.table.api.Table.getSchema(Table.java:101) E at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) E at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) E at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) E at java.lang.reflect.Method.invoke(Method.java:498) E at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) E at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) E at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) E at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) E at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) E at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) E at java.lang.Thread.run(Thread.java:748) {code} was: The bug can be reproduced with the following test. The test can pass if we use the commented way to define the watermark. {code:python} def test_flink_2(self): env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(env) table = t_env.from_descriptor( TableDescriptor.for_connector("filesystem") .schema( Schema.new_builder() .column("name", DataTypes.STRING()) .column("cost", DataTypes.INT()) .column("distance", DataTypes.INT()) .column("time", DataTypes.TIMESTAMP(3)) .watermark("time", expr.col("time") - expr.lit(60).seconds) # .watermark("time", "`time` - INTERVAL '60' SECOND") .build() ) .format("csv") .option("path", "./input.csv") .build() ) print(table.get_schema()) {code} It causes the following exception {code:none} // Some comments here E pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Expression 'minus(time, 6)' is not string serializable. Currently, only expressions that originated from a SQL expression have a well-defined string representation. E at org.apache.flink.table.expressions.ResolvedExpression.asSerializableString(ResolvedExpression.java:51) E at org.apache.flink.table.api.TableSchema.lambda$fromResolvedSchema$13(TableSchema.java:455) E at java.util.Collections$SingletonList.forEach(Collections.java:4824) E at org.apache.flink.table.api.TableSchema.fromResolvedSchema(TableSchema.java:451) E at org.apache.flink.table.api.Table.getSchema(Table.java:101) E at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) E at
[jira] [Created] (FLINK-28528) Table.getSchema fails on table with watermark
Xuannan Su created FLINK-28528: -- Summary: Table.getSchema fails on table with watermark Key: FLINK-28528 URL: https://issues.apache.org/jira/browse/FLINK-28528 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.15.1 Reporter: Xuannan Su The bug can be reproduced with the following test. The test can pass if we use the commented way to define the watermark. {code:python} def test_flink_2(self): env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(env) table = t_env.from_descriptor( TableDescriptor.for_connector("filesystem") .schema( Schema.new_builder() .column("name", DataTypes.STRING()) .column("cost", DataTypes.INT()) .column("distance", DataTypes.INT()) .column("time", DataTypes.TIMESTAMP(3)) .watermark("time", expr.col("time") - expr.lit(60).seconds) # .watermark("time", "`time` - INTERVAL '60' SECOND") .build() ) .format("csv") .option("path", "./input.csv") .build() ) print(table.get_schema()) {code} It causes the following exception {code:none} // Some comments here E pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Expression 'minus(time, 6)' is not string serializable. Currently, only expressions that originated from a SQL expression have a well-defined string representation. E at org.apache.flink.table.expressions.ResolvedExpression.asSerializableString(ResolvedExpression.java:51) E at org.apache.flink.table.api.TableSchema.lambda$fromResolvedSchema$13(TableSchema.java:455) E at java.util.Collections$SingletonList.forEach(Collections.java:4824) E at org.apache.flink.table.api.TableSchema.fromResolvedSchema(TableSchema.java:451) E at org.apache.flink.table.api.Table.getSchema(Table.java:101) E at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) E at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) E at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) E at java.lang.reflect.Method.invoke(Method.java:498) E at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) E at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) E at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) E at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) E at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) E at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) E at java.lang.Thread.run(Thread.java:748) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28527) Fail to lateral join with UDTF from Table with timstamp column
Xuannan Su created FLINK-28527: -- Summary: Fail to lateral join with UDTF from Table with timstamp column Key: FLINK-28527 URL: https://issues.apache.org/jira/browse/FLINK-28527 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.15.1 Reporter: Xuannan Su The bug can be reproduced with the following test {code:python} def test_flink(self): env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(env) table = t_env.from_descriptor( TableDescriptor.for_connector("filesystem") .schema( Schema.new_builder() .column("name", DataTypes.STRING()) .column("cost", DataTypes.INT()) .column("distance", DataTypes.INT()) .column("time", DataTypes.TIMESTAMP(3)) .watermark("time", "`time` - INTERVAL '60' SECOND") .build() ) .format("csv") .option("path", "./input.csv") .build() ) @udtf(result_types=DataTypes.INT()) def table_func(row: Row): return row.cost + row.distance table = table.join_lateral(table_func.alias("cost_times_distance")) table.execute().print() {code} It causes the following exception {code:none} E pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Unsupported Python SqlFunction CAST. E at org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil.createPythonFunctionInfo(CommonPythonUtil.java:146) E at org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil.createPythonFunctionInfo(CommonPythonUtil.java:429) E at org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil.createPythonFunctionInfo(CommonPythonUtil.java:135) E at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCorrelate.extractPythonTableFunctionInfo(CommonExecPythonCorrelate.java:133) E at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCorrelate.createPythonOneInputTransformation(CommonExecPythonCorrelate.java:106) E at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCorrelate.translateToPlanInternal(CommonExecPythonCorrelate.java:95) E at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:148) E at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:249) E at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:136) E at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:148) E at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:79) E at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) E at scala.collection.Iterator.foreach(Iterator.scala:937) E at scala.collection.Iterator.foreach$(Iterator.scala:937) E at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) E at scala.collection.IterableLike.foreach(IterableLike.scala:70) E at scala.collection.IterableLike.foreach$(IterableLike.scala:69) E at scala.collection.AbstractIterable.foreach(Iterable.scala:54) E at scala.collection.TraversableLike.map(TraversableLike.scala:233) E at scala.collection.TraversableLike.map$(TraversableLike.scala:226) E at scala.collection.AbstractTraversable.map(Traversable.scala:104) E at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:78) E at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:181) E at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1656) E at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:828) E at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1317) E at org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:605) E at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) E at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) E at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) E at
[jira] [Created] (FLINK-28526) Fail to lateral join with UDTF from Table with timstamp column
Xuannan Su created FLINK-28526: -- Summary: Fail to lateral join with UDTF from Table with timstamp column Key: FLINK-28526 URL: https://issues.apache.org/jira/browse/FLINK-28526 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.15.1 Reporter: Xuannan Su The bug can be reproduced with the following test {code:python} def test_flink(self): env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(env) table = t_env.from_descriptor( TableDescriptor.for_connector("filesystem") .schema( Schema.new_builder() .column("name", DataTypes.STRING()) .column("cost", DataTypes.INT()) .column("distance", DataTypes.INT()) .column("time", DataTypes.TIMESTAMP(3)) .watermark("time", "`time` - INTERVAL '60' SECOND") .build() ) .format("csv") .option("path", "./input.csv") .build() ) @udtf(result_types=DataTypes.INT()) def table_func(row: Row): return row.cost + row.distance table = table.join_lateral(table_func.alias("cost_times_distance")) table.execute().print() {code} It causes the following exception {code:none} E pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Unsupported Python SqlFunction CAST. E at org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil.createPythonFunctionInfo(CommonPythonUtil.java:146) E at org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil.createPythonFunctionInfo(CommonPythonUtil.java:429) E at org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil.createPythonFunctionInfo(CommonPythonUtil.java:135) E at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCorrelate.extractPythonTableFunctionInfo(CommonExecPythonCorrelate.java:133) E at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCorrelate.createPythonOneInputTransformation(CommonExecPythonCorrelate.java:106) E at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCorrelate.translateToPlanInternal(CommonExecPythonCorrelate.java:95) E at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:148) E at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:249) E at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:136) E at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:148) E at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:79) E at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) E at scala.collection.Iterator.foreach(Iterator.scala:937) E at scala.collection.Iterator.foreach$(Iterator.scala:937) E at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) E at scala.collection.IterableLike.foreach(IterableLike.scala:70) E at scala.collection.IterableLike.foreach$(IterableLike.scala:69) E at scala.collection.AbstractIterable.foreach(Iterable.scala:54) E at scala.collection.TraversableLike.map(TraversableLike.scala:233) E at scala.collection.TraversableLike.map$(TraversableLike.scala:226) E at scala.collection.AbstractTraversable.map(Traversable.scala:104) E at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:78) E at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:181) E at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1656) E at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:828) E at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1317) E at org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:605) E at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) E at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) E at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) E at
[jira] [Created] (FLINK-28461) PyFlink Table should add get_resolved_schema method
Xuannan Su created FLINK-28461: -- Summary: PyFlink Table should add get_resolved_schema method Key: FLINK-28461 URL: https://issues.apache.org/jira/browse/FLINK-28461 Project: Flink Issue Type: Improvement Components: API / Python Affects Versions: 1.15.1 Reporter: Xuannan Su The Table#getSchema method is deprecated and replaced with the Table#getResolvedSchema. We should add the get_resolved_schema method to the PyFlink Table. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28443) Provide official Flink image for PyFlink
Xuannan Su created FLINK-28443: -- Summary: Provide official Flink image for PyFlink Key: FLINK-28443 URL: https://issues.apache.org/jira/browse/FLINK-28443 Project: Flink Issue Type: Improvement Components: API / Python Affects Versions: 1.15.0 Reporter: Xuannan Su Users must build a custom Flink image to include python and pyFlink to run a pyFlink job with Native Kubernetes application mode. I think we can improve the user experience by providing an official image that pre-installs python and pyFlink. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28294) Some metric reporters don't allow register same metric with different names
Title: Message Title Xuannan Su created an issue Flink / FLINK-28294 Some metric reporters don't allow register same metric with different names Issue Type: Bug Affects Versions: 1.15.0 Assignee: Unassigned Components: Runtime / Metrics Created: 29/Jun/22 06:50 Priority: Major Reporter: Xuannan Su Currently, some metric reporters keep an internal Map keyed by the metric object to keep track of the registered metric. The problem with the map with metric object as the key is that when the same metric object is registered with different names or metric groups, only the last registered metric will be reported. If I understand correctly, we do not forbid registering the same metric with different names or metric groups. For example, in `SchedulerBase#registerJobMetrics`, we register `numberOfRestarts` with two names, "numRestarts" and "fullRestarts". Unfortunately, in this case, the metric reporter will only report the fullRestarts metric, which is deprecated. I found that the following metric reporters have the problem, Influxdb metric reporter, Datadog metric reporter, and Dropwizard metric reporter. One possible fix is to swap the key and value of the internal map in the metric reporter.
[jira] [Created] (FLINK-28126) Iteration gets stuck when replayable datastream and its downstream operator have different parallelism
Xuannan Su created FLINK-28126: -- Summary: Iteration gets stuck when replayable datastream and its downstream operator have different parallelism Key: FLINK-28126 URL: https://issues.apache.org/jira/browse/FLINK-28126 Project: Flink Issue Type: Bug Components: Library / Machine Learning Affects Versions: ml-2.0.0 Reporter: Xuannan Su Iteration gets stuck when replayable datastream and its downstream operator have different parallelism. It can be reproduced with the following code snippet. {code:java} @Test public void testIteration() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); final SingleOutputStreamOperator variable = env.fromElements(0).name("i"); final SingleOutputStreamOperator data = env.fromElements(1, 2).name("inc") .map(x -> x).setParallelism(1); // test can pass if parallelism is 2. final IterationConfig config = IterationConfig.newBuilder().build(); Iterations.iterateBoundedStreamsUntilTermination( DataStreamList.of(variable), ReplayableDataStreamList.replay(data), config, (IterationBody) (variableStreams, dataStreams) -> { final DataStream sample = dataStreams.get(0); final SingleOutputStreamOperator trainOutput = sample .transform( "iter", TypeInformation.of(Integer.class), new IterTransform()) .setParallelism(2) .map((MapFunction) integer -> integer) .setParallelism(1); return new IterationBodyResult( DataStreamList.of(trainOutput), DataStreamList.of(trainOutput)); }); env.execute(); } public static class IterTransform extends AbstractStreamOperator implements OneInputStreamOperator, IterationListener { @Override public void processElement(StreamRecord element) throws Exception { LOG.info("Processing element: {}", element); } @Override public void onEpochWatermarkIncremented( int epochWatermark, Context context, Collector collector) throws Exception { LOG.info("onEpochWatermarkIncremented: {}", epochWatermark); if (epochWatermark >= 10) { return; } collector.collect(0); } @Override public void onIterationTerminated(Context context, Collector collector) throws Exception { LOG.info("onIterationTerminated"); } } {code} After digging into the code, I found that the `ReplayOperator` doesn't emit the epoch watermark with a broadcast output. [~gaoyunhaii], could you look to see if this is the case? -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27524) Introduce cache API to DataStream
Xuannan Su created FLINK-27524: -- Summary: Introduce cache API to DataStream Key: FLINK-27524 URL: https://issues.apache.org/jira/browse/FLINK-27524 Project: Flink Issue Type: Sub-task Components: API / DataStream Affects Versions: 1.16.0 Reporter: Xuannan Su -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27523) Runtime supports producing and consuming cached intermediate result
Xuannan Su created FLINK-27523: -- Summary: Runtime supports producing and consuming cached intermediate result Key: FLINK-27523 URL: https://issues.apache.org/jira/browse/FLINK-27523 Project: Flink Issue Type: Sub-task Components: API / DataStream Affects Versions: 1.16.0 Reporter: Xuannan Su -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27521) FLIP-205: Support Cache in DataStream for Batch Processing
Xuannan Su created FLINK-27521: -- Summary: FLIP-205: Support Cache in DataStream for Batch Processing Key: FLINK-27521 URL: https://issues.apache.org/jira/browse/FLINK-27521 Project: Flink Issue Type: New Feature Components: API / DataStream Affects Versions: 1.16.0 Reporter: Xuannan Su As the DataStream API now supports batch execution mode, we see users using the DataStream API to run batch jobs. Interactive programming is an important use case of Flink batch processing. And the ability to cache intermediate results of a DataStream is crucial to the interactive programming experience. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-26171) pyFlink -py options has issue with path that starts with //
[ https://issues.apache.org/jira/browse/FLINK-26171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17493008#comment-17493008 ] Xuannan Su commented on FLINK-26171: After some research, I found that the leading two slashes are not necessarily equal to a single slash, according to [here|https://unix.stackexchange.com/a/1919]. > pyFlink -py options has issue with path that starts with // > --- > > Key: FLINK-26171 > URL: https://issues.apache.org/jira/browse/FLINK-26171 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.14.3 >Reporter: Xuannan Su >Assignee: Huang Xingbo >Priority: Major > > It seems that when submitting a pyFlink job with the `flink run` command, the > -py option has issues with a path that start with double slash "//". > You can reproduce the problem with the wordcount python example. > {code:bash} > # Job successfully submitted > ./bin/flink run -py "${PWD}"/examples/python/datastream/word_count.py > Job has been submitted with JobID 54ab79a32b4e441ee80bda92be7cb547 > # Fail to submit the job if the path start with double slash > ./bin/flink run -py /"${PWD}"/examples/python/datastream/word_count.py > org.apache.flink.client.program.ProgramAbortException: > java.nio.file.NoSuchFileException: > /var/folders/j4/td7bvghd1tg4tb7ty_7lk6z0gp/T/pyflink/1bba3878-9ed4-4d4e-a5ed-1208c1ee9425/c9ece3de-8ff2-40dc-8436-e3377e540baf/word_count.py > at > org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:140) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) > at > org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) > at > org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) > Caused by: java.nio.file.NoSuchFileException: > /var/folders/j4/td7bvghd1tg4tb7ty_7lk6z0gp/T/pyflink/1bba3878-9ed4-4d4e-a5ed-1208c1ee9425/c9ece3de-8ff2-40dc-8436-e3377e540baf/word_count.py > at > sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) > at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) > at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) > at sun.nio.fs.UnixPath.toRealPath(UnixPath.java:837) > at > org.apache.flink.client.python.PythonEnvUtils.addToPythonPath(PythonEnvUtils.java:278) > at > org.apache.flink.client.python.PythonEnvUtils.preparePythonEnvironment(PythonEnvUtils.java:195) > at > org.apache.flink.client.python.PythonEnvUtils.launchPy4jPythonClient(PythonEnvUtils.java:456) > at > org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:92) > ... 13 more > # Job successfully submitted if path starts with three slash > ./bin/flink run -py //"${PWD}"/examples/python/datastream/word_count.py > Job has been submitted with JobID 270d07407b297b566c6903c451a834cb > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26171) pyFlink -py options has issue with path that starts with //
Xuannan Su created FLINK-26171: -- Summary: pyFlink -py options has issue with path that starts with // Key: FLINK-26171 URL: https://issues.apache.org/jira/browse/FLINK-26171 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.14.3 Reporter: Xuannan Su It seems that when submitting a pyFlink job with the `flink run` command, the -py option has issues with a path that start with double slash "//". You can reproduce the problem with the wordcount python example. {code:bash} # Job successfully submitted ./bin/flink run -py "${PWD}"/examples/python/datastream/word_count.py Job has been submitted with JobID 54ab79a32b4e441ee80bda92be7cb547 # Fail to submit the job if the path start with double slash ./bin/flink run -py /"${PWD}"/examples/python/datastream/word_count.py org.apache.flink.client.program.ProgramAbortException: java.nio.file.NoSuchFileException: /var/folders/j4/td7bvghd1tg4tb7ty_7lk6z0gp/T/pyflink/1bba3878-9ed4-4d4e-a5ed-1208c1ee9425/c9ece3de-8ff2-40dc-8436-e3377e540baf/word_count.py at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:140) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) Caused by: java.nio.file.NoSuchFileException: /var/folders/j4/td7bvghd1tg4tb7ty_7lk6z0gp/T/pyflink/1bba3878-9ed4-4d4e-a5ed-1208c1ee9425/c9ece3de-8ff2-40dc-8436-e3377e540baf/word_count.py at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) at sun.nio.fs.UnixPath.toRealPath(UnixPath.java:837) at org.apache.flink.client.python.PythonEnvUtils.addToPythonPath(PythonEnvUtils.java:278) at org.apache.flink.client.python.PythonEnvUtils.preparePythonEnvironment(PythonEnvUtils.java:195) at org.apache.flink.client.python.PythonEnvUtils.launchPy4jPythonClient(PythonEnvUtils.java:456) at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:92) ... 13 more # Job successfully submitted if path starts with three slash ./bin/flink run -py //"${PWD}"/examples/python/datastream/word_count.py Job has been submitted with JobID 270d07407b297b566c6903c451a834cb {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25685) RestClusterClient gets stuck on submitting job with local user artifact
[ https://issues.apache.org/jira/browse/FLINK-25685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17479869#comment-17479869 ] Xuannan Su commented on FLINK-25685: [~wangyang0918] You are right. (y) The fix is to use artifactFilePath.getPath() > RestClusterClient gets stuck on submitting job with local user artifact > --- > > Key: FLINK-25685 > URL: https://issues.apache.org/jira/browse/FLINK-25685 > Project: Flink > Issue Type: Bug > Components: Runtime / REST >Affects Versions: 1.15.0, 1.14.3 >Reporter: Xuannan Su >Priority: Major > Labels: pull-request-available > > I found that a job submission gets stuck if > StreamExecutionEnvironment#registerCachedFile is called with a local file. > After some digging, I found that it gets stuck when the RestClusterClient > sends the job-submission request to the JobManager. > Below is the unit test added to the `RestClusterClientTest` to reproduce the > problem on my local machine. > {code:java} > @Test > public void testJobSubmissionWithUserArtifact() throws Exception { > try (final TestRestServerEndpoint restServerEndpoint = > createRestServerEndpoint(new TestJobSubmitHandler())) { > try (RestClusterClient restClusterClient = > > createRestClusterClient(restServerEndpoint.getServerAddress().getPort())) { > TemporaryFolder temporaryFolder = new TemporaryFolder(); > temporaryFolder.create(); > File file = temporaryFolder.newFile(); > Files.write(file.toPath(), "hello > world".getBytes(ConfigConstants.DEFAULT_CHARSET)); > jobGraph.addUserArtifact("file", > new > DistributedCache.DistributedCacheEntry(file.toURI().toString(), > false)); > restClusterClient > .submitJob(jobGraph) > .get(); > } > } > } > {code} > The test can pass if the `jobGraph.addUserArtifact` is not called. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25685) RestClusterClient gets stuck on submitting job with local user artifact
[ https://issues.apache.org/jira/browse/FLINK-25685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xuannan Su updated FLINK-25685: --- Affects Version/s: 1.15.0 > RestClusterClient gets stuck on submitting job with local user artifact > --- > > Key: FLINK-25685 > URL: https://issues.apache.org/jira/browse/FLINK-25685 > Project: Flink > Issue Type: Bug > Components: Runtime / REST >Affects Versions: 1.15.0, 1.14.3 >Reporter: Xuannan Su >Priority: Major > Labels: pull-request-available > > I found that a job submission gets stuck if > StreamExecutionEnvironment#registerCachedFile is called with a local file. > After some digging, I found that it gets stuck when the RestClusterClient > sends the job-submission request to the JobManager. > Below is the unit test added to the `RestClusterClientTest` to reproduce the > problem on my local machine. > {code:java} > @Test > public void testJobSubmissionWithUserArtifact() throws Exception { > try (final TestRestServerEndpoint restServerEndpoint = > createRestServerEndpoint(new TestJobSubmitHandler())) { > try (RestClusterClient restClusterClient = > > createRestClusterClient(restServerEndpoint.getServerAddress().getPort())) { > TemporaryFolder temporaryFolder = new TemporaryFolder(); > temporaryFolder.create(); > File file = temporaryFolder.newFile(); > Files.write(file.toPath(), "hello > world".getBytes(ConfigConstants.DEFAULT_CHARSET)); > jobGraph.addUserArtifact("file", > new > DistributedCache.DistributedCacheEntry(file.toURI().toString(), > false)); > restClusterClient > .submitJob(jobGraph) > .get(); > } > } > } > {code} > The test can pass if the `jobGraph.addUserArtifact` is not called. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25685) RestClusterClient gets stuck on submitting job with local user artifact
[ https://issues.apache.org/jira/browse/FLINK-25685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17479802#comment-17479802 ] Xuannan Su commented on FLINK-25685: [~chesnay], I created a PR with unit test and the fix. May you assign the ticket to me and help review the PR? > RestClusterClient gets stuck on submitting job with local user artifact > --- > > Key: FLINK-25685 > URL: https://issues.apache.org/jira/browse/FLINK-25685 > Project: Flink > Issue Type: Bug > Components: Runtime / REST >Affects Versions: 1.14.3 >Reporter: Xuannan Su >Priority: Major > Labels: pull-request-available > > I found that a job submission gets stuck if > StreamExecutionEnvironment#registerCachedFile is called with a local file. > After some digging, I found that it gets stuck when the RestClusterClient > sends the job-submission request to the JobManager. > Below is the unit test added to the `RestClusterClientTest` to reproduce the > problem on my local machine. > {code:java} > @Test > public void testJobSubmissionWithUserArtifact() throws Exception { > try (final TestRestServerEndpoint restServerEndpoint = > createRestServerEndpoint(new TestJobSubmitHandler())) { > try (RestClusterClient restClusterClient = > > createRestClusterClient(restServerEndpoint.getServerAddress().getPort())) { > TemporaryFolder temporaryFolder = new TemporaryFolder(); > temporaryFolder.create(); > File file = temporaryFolder.newFile(); > Files.write(file.toPath(), "hello > world".getBytes(ConfigConstants.DEFAULT_CHARSET)); > jobGraph.addUserArtifact("file", > new > DistributedCache.DistributedCacheEntry(file.toURI().toString(), > false)); > restClusterClient > .submitJob(jobGraph) > .get(); > } > } > } > {code} > The test can pass if the `jobGraph.addUserArtifact` is not called. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25685) RestClusterClient gets stuck on submitting job with local user artifact
[ https://issues.apache.org/jira/browse/FLINK-25685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17478023#comment-17478023 ] Xuannan Su commented on FLINK-25685: I create a pr that adds the unit tests with and without `jobGraph.addUserArtifact`. The test with `jobGraph.addUserArtifact` fail while the other pass. https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=29653=results [~chesnay] Could you have a look at the CI run? > RestClusterClient gets stuck on submitting job with local user artifact > --- > > Key: FLINK-25685 > URL: https://issues.apache.org/jira/browse/FLINK-25685 > Project: Flink > Issue Type: Bug > Components: Runtime / REST >Affects Versions: 1.14.3 >Reporter: Xuannan Su >Priority: Major > Labels: pull-request-available > > I found that a job submission gets stuck if > StreamExecutionEnvironment#registerCachedFile is called with a local file. > After some digging, I found that it gets stuck when the RestClusterClient > sends the job-submission request to the JobManager. > Below is the unit test added to the `RestClusterClientTest` to reproduce the > problem on my local machine. > {code:java} > @Test > public void testJobSubmissionWithUserArtifact() throws Exception { > try (final TestRestServerEndpoint restServerEndpoint = > createRestServerEndpoint(new TestJobSubmitHandler())) { > try (RestClusterClient restClusterClient = > > createRestClusterClient(restServerEndpoint.getServerAddress().getPort())) { > TemporaryFolder temporaryFolder = new TemporaryFolder(); > temporaryFolder.create(); > File file = temporaryFolder.newFile(); > Files.write(file.toPath(), "hello > world".getBytes(ConfigConstants.DEFAULT_CHARSET)); > jobGraph.addUserArtifact("file", > new > DistributedCache.DistributedCacheEntry(file.toURI().toString(), > false)); > restClusterClient > .submitJob(jobGraph) > .get(); > } > } > } > {code} > The test can pass if the `jobGraph.addUserArtifact` is not called. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25685) RestClusterClient gets stuck on submitting job with local user artifact
Xuannan Su created FLINK-25685: -- Summary: RestClusterClient gets stuck on submitting job with local user artifact Key: FLINK-25685 URL: https://issues.apache.org/jira/browse/FLINK-25685 Project: Flink Issue Type: Bug Components: Runtime / REST Affects Versions: 1.14.3 Reporter: Xuannan Su I found that a job submission gets stuck if StreamExecutionEnvironment#registerCachedFile is called with a local file. After some digging, I found that it gets stuck when the RestClusterClient sends the job-submission request to the JobManager. Below is the unit test added to the `RestClusterClientTest` to reproduce the problem on my local machine. {code:java} @Test public void testJobSubmissionWithUserArtifact() throws Exception { try (final TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(new TestJobSubmitHandler())) { try (RestClusterClient restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort())) { TemporaryFolder temporaryFolder = new TemporaryFolder(); temporaryFolder.create(); File file = temporaryFolder.newFile(); Files.write(file.toPath(), "hello world".getBytes(ConfigConstants.DEFAULT_CHARSET)); jobGraph.addUserArtifact("file", new DistributedCache.DistributedCacheEntry(file.toURI().toString(), false)); restClusterClient .submitJob(jobGraph) .get(); } } } {code} The test can pass if the `jobGraph.addUserArtifact` is not called. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-19343) FLIP-36: Support Interactive Programming in Flink
[ https://issues.apache.org/jira/browse/FLINK-19343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241382#comment-17241382 ] Xuannan Su commented on FLINK-19343: Hi, Jin Xing. Thanks for your comments. 1. I agree with you that it is indeed weird for the "Interactive" relies on the clusterPartitionTracker for managing the lifecycle of the cluster partition, which is also a form of shuffle data. And to support cache across jobs, we need to have a component whose lifecycle outlive the job to manage the shuffle data. 2. TBH, also, I wouldn't say I like the idea to pass the ShuffleDescriptor back to the client-side. But at that time being, as you say, we do not have a separate component to manage the lifecycle of shuffle data across jobs in the runtime. Therefore, the decision is made to support share shuffle data across jobs. To support caching in DataStream, it is a more clear design to have a component at runtime scope to manage the shuffle data across jobs. 3. I don't think the remote shuffle data should be managed by the PartitionTracker as well. Instead, I think the ClusterPartition is just a kind of shuffle data and therefore should be managed by the ShuffleService. I am pulling in [~chesnay]. He may have more insight from the ClusterPartition perspective. > FLIP-36: Support Interactive Programming in Flink > - > > Key: FLINK-19343 > URL: https://issues.apache.org/jira/browse/FLINK-19343 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API >Reporter: Xuannan Su >Priority: Major > > Please refer to the FLIP for any details: > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19253) SourceReaderTestBase.testAddSplitToExistingFetcher hangs
[ https://issues.apache.org/jira/browse/FLINK-19253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17230756#comment-17230756 ] Xuannan Su commented on FLINK-19253: Thanks for letting me know. I will stay tuned until we can backport. > SourceReaderTestBase.testAddSplitToExistingFetcher hangs > > > Key: FLINK-19253 > URL: https://issues.apache.org/jira/browse/FLINK-19253 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.12.0 >Reporter: Dian Fu >Assignee: Xuannan Su >Priority: Major > Labels: pull-request-available, test-stability > Fix For: 1.12.0, 1.11.3 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=6521=logs=fc5181b0-e452-5c8f-68de-1097947f6483=62110053-334f-5295-a0ab-80dd7e2babbf > {code} > 2020-09-15T10:51:35.5236837Z "SourceFetcher" #39 prio=5 os_prio=0 > tid=0x7f70d0a57000 nid=0x858 in Object.wait() [0x7f6fd81f] > 2020-09-15T10:51:35.5237447Zjava.lang.Thread.State: WAITING (on object > monitor) > 2020-09-15T10:51:35.5237962Z at java.lang.Object.wait(Native Method) > 2020-09-15T10:51:35.5238886Z - waiting on <0xc27f5be8> (a > java.util.ArrayDeque) > 2020-09-15T10:51:35.5239380Z at java.lang.Object.wait(Object.java:502) > 2020-09-15T10:51:35.5240401Z at > org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader.fetch(TestingSplitReader.java:52) > 2020-09-15T10:51:35.5241471Z - locked <0xc27f5be8> (a > java.util.ArrayDeque) > 2020-09-15T10:51:35.5242180Z at > org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) > 2020-09-15T10:51:35.5243245Z at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:128) > 2020-09-15T10:51:35.5244263Z at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:95) > 2020-09-15T10:51:35.5245128Z at > org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42) > 2020-09-15T10:51:35.5245973Z at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > 2020-09-15T10:51:35.5247081Z at > java.util.concurrent.FutureTask.run(FutureTask.java:266) > 2020-09-15T10:51:35.5247816Z at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > 2020-09-15T10:51:35.5248809Z at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > 2020-09-15T10:51:35.5249463Z at java.lang.Thread.run(Thread.java:748) > 2020-09-15T10:51:35.5249827Z > 2020-09-15T10:51:35.5250383Z "SourceFetcher" #37 prio=5 os_prio=0 > tid=0x7f70d0a4b000 nid=0x856 in Object.wait() [0x7f6f80cfa000] > 2020-09-15T10:51:35.5251124Zjava.lang.Thread.State: WAITING (on object > monitor) > 2020-09-15T10:51:35.5251636Z at java.lang.Object.wait(Native Method) > 2020-09-15T10:51:35.5252767Z - waiting on <0xc298d0b8> (a > java.util.ArrayDeque) > 2020-09-15T10:51:35.5253336Z at java.lang.Object.wait(Object.java:502) > 2020-09-15T10:51:35.5254184Z at > org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader.fetch(TestingSplitReader.java:52) > 2020-09-15T10:51:35.5255220Z - locked <0xc298d0b8> (a > java.util.ArrayDeque) > 2020-09-15T10:51:35.5255678Z at > org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) > 2020-09-15T10:51:35.5256235Z at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:128) > 2020-09-15T10:51:35.5256803Z at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:95) > 2020-09-15T10:51:35.5257351Z at > org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42) > 2020-09-15T10:51:35.5257838Z at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > 2020-09-15T10:51:35.5258284Z at > java.util.concurrent.FutureTask.run(FutureTask.java:266) > 2020-09-15T10:51:35.5258856Z at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > 2020-09-15T10:51:35.5259350Z at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > 2020-09-15T10:51:35.5260011Z at java.lang.Thread.run(Thread.java:748) > 2020-09-15T10:51:35.5260211Z > 2020-09-15T10:51:35.5260574Z "process reaper" #24 daemon prio=10 os_prio=0 > tid=0x7f6f70042000 nid=0x844 waiting on condition [0x7f6fd832a000] > 2020-09-15T10:51:35.5261036Zjava.lang.Thread.State: TIMED_WAITING > (parking) > 2020-09-15T10:51:35.5261342Z at sun.misc.Unsafe.park(Native Method) > 2020-09-15T10:51:35.5261972Z - parking to wait for <0x815d0810> (a >
[jira] [Commented] (FLINK-19253) SourceReaderTestBase.testAddSplitToExistingFetcher hangs
[ https://issues.apache.org/jira/browse/FLINK-19253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17230377#comment-17230377 ] Xuannan Su commented on FLINK-19253: [~becket_qin] I think https://issues.apache.org/jira/browse/FLINK-19448 has fixed the problem. Can you take a look? > SourceReaderTestBase.testAddSplitToExistingFetcher hangs > > > Key: FLINK-19253 > URL: https://issues.apache.org/jira/browse/FLINK-19253 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.12.0 >Reporter: Dian Fu >Assignee: Xuannan Su >Priority: Major > Labels: pull-request-available, test-stability > Fix For: 1.12.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=6521=logs=fc5181b0-e452-5c8f-68de-1097947f6483=62110053-334f-5295-a0ab-80dd7e2babbf > {code} > 2020-09-15T10:51:35.5236837Z "SourceFetcher" #39 prio=5 os_prio=0 > tid=0x7f70d0a57000 nid=0x858 in Object.wait() [0x7f6fd81f] > 2020-09-15T10:51:35.5237447Zjava.lang.Thread.State: WAITING (on object > monitor) > 2020-09-15T10:51:35.5237962Z at java.lang.Object.wait(Native Method) > 2020-09-15T10:51:35.5238886Z - waiting on <0xc27f5be8> (a > java.util.ArrayDeque) > 2020-09-15T10:51:35.5239380Z at java.lang.Object.wait(Object.java:502) > 2020-09-15T10:51:35.5240401Z at > org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader.fetch(TestingSplitReader.java:52) > 2020-09-15T10:51:35.5241471Z - locked <0xc27f5be8> (a > java.util.ArrayDeque) > 2020-09-15T10:51:35.5242180Z at > org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) > 2020-09-15T10:51:35.5243245Z at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:128) > 2020-09-15T10:51:35.5244263Z at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:95) > 2020-09-15T10:51:35.5245128Z at > org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42) > 2020-09-15T10:51:35.5245973Z at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > 2020-09-15T10:51:35.5247081Z at > java.util.concurrent.FutureTask.run(FutureTask.java:266) > 2020-09-15T10:51:35.5247816Z at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > 2020-09-15T10:51:35.5248809Z at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > 2020-09-15T10:51:35.5249463Z at java.lang.Thread.run(Thread.java:748) > 2020-09-15T10:51:35.5249827Z > 2020-09-15T10:51:35.5250383Z "SourceFetcher" #37 prio=5 os_prio=0 > tid=0x7f70d0a4b000 nid=0x856 in Object.wait() [0x7f6f80cfa000] > 2020-09-15T10:51:35.5251124Zjava.lang.Thread.State: WAITING (on object > monitor) > 2020-09-15T10:51:35.5251636Z at java.lang.Object.wait(Native Method) > 2020-09-15T10:51:35.5252767Z - waiting on <0xc298d0b8> (a > java.util.ArrayDeque) > 2020-09-15T10:51:35.5253336Z at java.lang.Object.wait(Object.java:502) > 2020-09-15T10:51:35.5254184Z at > org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader.fetch(TestingSplitReader.java:52) > 2020-09-15T10:51:35.5255220Z - locked <0xc298d0b8> (a > java.util.ArrayDeque) > 2020-09-15T10:51:35.5255678Z at > org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) > 2020-09-15T10:51:35.5256235Z at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:128) > 2020-09-15T10:51:35.5256803Z at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:95) > 2020-09-15T10:51:35.5257351Z at > org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42) > 2020-09-15T10:51:35.5257838Z at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > 2020-09-15T10:51:35.5258284Z at > java.util.concurrent.FutureTask.run(FutureTask.java:266) > 2020-09-15T10:51:35.5258856Z at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > 2020-09-15T10:51:35.5259350Z at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > 2020-09-15T10:51:35.5260011Z at java.lang.Thread.run(Thread.java:748) > 2020-09-15T10:51:35.5260211Z > 2020-09-15T10:51:35.5260574Z "process reaper" #24 daemon prio=10 os_prio=0 > tid=0x7f6f70042000 nid=0x844 waiting on condition [0x7f6fd832a000] > 2020-09-15T10:51:35.5261036Zjava.lang.Thread.State: TIMED_WAITING > (parking) > 2020-09-15T10:51:35.5261342Z at sun.misc.Unsafe.park(Native Method) > 2020-09-15T10:51:35.5261972Z - parking to wait for <0x815d0810> (a >
[jira] [Commented] (FLINK-19253) SourceReaderTestBase.testAddSplitToExistingFetcher hangs
[ https://issues.apache.org/jira/browse/FLINK-19253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17229980#comment-17229980 ] Xuannan Su commented on FLINK-19253: [~becket_qin] You are right. This indeed can cause IllegalStateException to be thrown. After closely looking at the code, I think it made more sense to return MORE_AVAILABLE instead of throwing the IllegalStateException at that point. The rationale is shown in the comment of the code below. And we might want to change the name of the method from `finishedOrAvailableLater` to something like `getInputStatus`. What do you think? {code:java} private InputStatus finishedOrAvailableLater() { // assumption: elementQueue is empty (might not be valid already) final boolean allFetchersHaveShutdown = splitFetcherManager.maybeShutdownFinishedFetchers(); if (!(noMoreSplitsAssignment && allFetchersHaveShutdown)) { return InputStatus.NOTHING_AVAILABLE; } // at this point we are guarantee that no more split will be assigned // and all the SplitFetchers are idle and has been shutdown if (elementsQueue.isEmpty()) { splitFetcherManager.checkErrors(); return InputStatus.END_OF_INPUT; } else { // if we see a non-empty elementsQueue here, it means that our assumption that the // elementsQueue is empty is violated. This indicates that after our last check on // the elementsQueue, all SplitFetchers fetch all the elements, put them in the // element queue and closed, which means there are element available before // we can return END_OF_INPUT. Therefore, we return MORE_AVAILABLE // instead of throwing exception. return InputStatus.MORE_AVAILABLE; // throw new IllegalStateException("Called 'finishedOrAvailableLater()' with shut-down fetchers but non-empty queue"); } } {code} > SourceReaderTestBase.testAddSplitToExistingFetcher hangs > > > Key: FLINK-19253 > URL: https://issues.apache.org/jira/browse/FLINK-19253 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.12.0 >Reporter: Dian Fu >Assignee: Xuannan Su >Priority: Major > Labels: pull-request-available, test-stability > Fix For: 1.12.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=6521=logs=fc5181b0-e452-5c8f-68de-1097947f6483=62110053-334f-5295-a0ab-80dd7e2babbf > {code} > 2020-09-15T10:51:35.5236837Z "SourceFetcher" #39 prio=5 os_prio=0 > tid=0x7f70d0a57000 nid=0x858 in Object.wait() [0x7f6fd81f] > 2020-09-15T10:51:35.5237447Zjava.lang.Thread.State: WAITING (on object > monitor) > 2020-09-15T10:51:35.5237962Z at java.lang.Object.wait(Native Method) > 2020-09-15T10:51:35.5238886Z - waiting on <0xc27f5be8> (a > java.util.ArrayDeque) > 2020-09-15T10:51:35.5239380Z at java.lang.Object.wait(Object.java:502) > 2020-09-15T10:51:35.5240401Z at > org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader.fetch(TestingSplitReader.java:52) > 2020-09-15T10:51:35.5241471Z - locked <0xc27f5be8> (a > java.util.ArrayDeque) > 2020-09-15T10:51:35.5242180Z at > org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) > 2020-09-15T10:51:35.5243245Z at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:128) > 2020-09-15T10:51:35.5244263Z at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:95) > 2020-09-15T10:51:35.5245128Z at > org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42) > 2020-09-15T10:51:35.5245973Z at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > 2020-09-15T10:51:35.5247081Z at > java.util.concurrent.FutureTask.run(FutureTask.java:266) > 2020-09-15T10:51:35.5247816Z at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > 2020-09-15T10:51:35.5248809Z at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > 2020-09-15T10:51:35.5249463Z at java.lang.Thread.run(Thread.java:748) > 2020-09-15T10:51:35.5249827Z > 2020-09-15T10:51:35.5250383Z "SourceFetcher" #37 prio=5 os_prio=0 > tid=0x7f70d0a4b000 nid=0x856 in Object.wait() [0x7f6f80cfa000] > 2020-09-15T10:51:35.5251124Zjava.lang.Thread.State: WAITING (on object > monitor) > 2020-09-15T10:51:35.5251636Z at
[jira] [Commented] (FLINK-20068) KafkaSubscriberTest.testTopicPatternSubscriber failed with unexpected results
[ https://issues.apache.org/jira/browse/FLINK-20068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17229079#comment-17229079 ] Xuannan Su commented on FLINK-20068: Another instance https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9388=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5 > KafkaSubscriberTest.testTopicPatternSubscriber failed with unexpected results > - > > Key: FLINK-20068 > URL: https://issues.apache.org/jira/browse/FLINK-20068 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.12.0 >Reporter: Dian Fu >Priority: Critical > Labels: test-stability > Fix For: 1.12.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9365=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5 > {code} > 2020-11-10T00:14:22.7658242Z [ERROR] > testTopicPatternSubscriber(org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberTest) > Time elapsed: 0.012 s <<< FAILURE! > 2020-11-10T00:14:22.7659838Z java.lang.AssertionError: > expected:<[pattern-topic-5, pattern-topic-4, pattern-topic-7, > pattern-topic-6, pattern-topic-9, pattern-topic-8, pattern-topic-1, > pattern-topic-0, pattern-topic-3]> but was:<[]> > 2020-11-10T00:14:22.7660740Z at org.junit.Assert.fail(Assert.java:88) > 2020-11-10T00:14:22.7661245Z at > org.junit.Assert.failNotEquals(Assert.java:834) > 2020-11-10T00:14:22.7661788Z at > org.junit.Assert.assertEquals(Assert.java:118) > 2020-11-10T00:14:22.7662312Z at > org.junit.Assert.assertEquals(Assert.java:144) > 2020-11-10T00:14:22.7663051Z at > org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberTest.testTopicPatternSubscriber(KafkaSubscriberTest.java:94) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19253) SourceReaderTestBase.testAddSplitToExistingFetcher hangs
[ https://issues.apache.org/jira/browse/FLINK-19253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17229013#comment-17229013 ] Xuannan Su commented on FLINK-19253: [~becket_qin] I'd love to submit a fix. Could you assign the ticket to me? > SourceReaderTestBase.testAddSplitToExistingFetcher hangs > > > Key: FLINK-19253 > URL: https://issues.apache.org/jira/browse/FLINK-19253 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.12.0 >Reporter: Dian Fu >Priority: Major > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=6521=logs=fc5181b0-e452-5c8f-68de-1097947f6483=62110053-334f-5295-a0ab-80dd7e2babbf > {code} > 2020-09-15T10:51:35.5236837Z "SourceFetcher" #39 prio=5 os_prio=0 > tid=0x7f70d0a57000 nid=0x858 in Object.wait() [0x7f6fd81f] > 2020-09-15T10:51:35.5237447Zjava.lang.Thread.State: WAITING (on object > monitor) > 2020-09-15T10:51:35.5237962Z at java.lang.Object.wait(Native Method) > 2020-09-15T10:51:35.5238886Z - waiting on <0xc27f5be8> (a > java.util.ArrayDeque) > 2020-09-15T10:51:35.5239380Z at java.lang.Object.wait(Object.java:502) > 2020-09-15T10:51:35.5240401Z at > org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader.fetch(TestingSplitReader.java:52) > 2020-09-15T10:51:35.5241471Z - locked <0xc27f5be8> (a > java.util.ArrayDeque) > 2020-09-15T10:51:35.5242180Z at > org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) > 2020-09-15T10:51:35.5243245Z at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:128) > 2020-09-15T10:51:35.5244263Z at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:95) > 2020-09-15T10:51:35.5245128Z at > org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42) > 2020-09-15T10:51:35.5245973Z at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > 2020-09-15T10:51:35.5247081Z at > java.util.concurrent.FutureTask.run(FutureTask.java:266) > 2020-09-15T10:51:35.5247816Z at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > 2020-09-15T10:51:35.5248809Z at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > 2020-09-15T10:51:35.5249463Z at java.lang.Thread.run(Thread.java:748) > 2020-09-15T10:51:35.5249827Z > 2020-09-15T10:51:35.5250383Z "SourceFetcher" #37 prio=5 os_prio=0 > tid=0x7f70d0a4b000 nid=0x856 in Object.wait() [0x7f6f80cfa000] > 2020-09-15T10:51:35.5251124Zjava.lang.Thread.State: WAITING (on object > monitor) > 2020-09-15T10:51:35.5251636Z at java.lang.Object.wait(Native Method) > 2020-09-15T10:51:35.5252767Z - waiting on <0xc298d0b8> (a > java.util.ArrayDeque) > 2020-09-15T10:51:35.5253336Z at java.lang.Object.wait(Object.java:502) > 2020-09-15T10:51:35.5254184Z at > org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader.fetch(TestingSplitReader.java:52) > 2020-09-15T10:51:35.5255220Z - locked <0xc298d0b8> (a > java.util.ArrayDeque) > 2020-09-15T10:51:35.5255678Z at > org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) > 2020-09-15T10:51:35.5256235Z at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:128) > 2020-09-15T10:51:35.5256803Z at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:95) > 2020-09-15T10:51:35.5257351Z at > org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42) > 2020-09-15T10:51:35.5257838Z at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > 2020-09-15T10:51:35.5258284Z at > java.util.concurrent.FutureTask.run(FutureTask.java:266) > 2020-09-15T10:51:35.5258856Z at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > 2020-09-15T10:51:35.5259350Z at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > 2020-09-15T10:51:35.5260011Z at java.lang.Thread.run(Thread.java:748) > 2020-09-15T10:51:35.5260211Z > 2020-09-15T10:51:35.5260574Z "process reaper" #24 daemon prio=10 os_prio=0 > tid=0x7f6f70042000 nid=0x844 waiting on condition [0x7f6fd832a000] > 2020-09-15T10:51:35.5261036Zjava.lang.Thread.State: TIMED_WAITING > (parking) > 2020-09-15T10:51:35.5261342Z at sun.misc.Unsafe.park(Native Method) > 2020-09-15T10:51:35.5261972Z - parking to wait for <0x815d0810> (a > java.util.concurrent.SynchronousQueue$TransferStack) > 2020-09-15T10:51:35.5262456Z at >
[jira] [Commented] (FLINK-19253) SourceReaderTestBase.testAddSplitToExistingFetcher hangs
[ https://issues.apache.org/jira/browse/FLINK-19253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17228958#comment-17228958 ] Xuannan Su commented on FLINK-19253: I think it is a potential race condition in SplitFetcher. It occurs when one thread executing the SplitFetcher#checkAndSetIdle method and the other thread adding split to the SplitFetcher with SplitFetcher#addSplits. i.e., the checkAndSetIdle first check if it should go to idle and set the isIdle flag. Between these two steps, another thread could call the addSplits, which put a new task into the taskQueue and set the isIdle flag to false. Then, the first thread set the isIdle flag to true. We need to sychronize the thread that modifying the isIdle flag. cc [~becket_qin] > SourceReaderTestBase.testAddSplitToExistingFetcher hangs > > > Key: FLINK-19253 > URL: https://issues.apache.org/jira/browse/FLINK-19253 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.12.0 >Reporter: Dian Fu >Priority: Major > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=6521=logs=fc5181b0-e452-5c8f-68de-1097947f6483=62110053-334f-5295-a0ab-80dd7e2babbf > {code} > 2020-09-15T10:51:35.5236837Z "SourceFetcher" #39 prio=5 os_prio=0 > tid=0x7f70d0a57000 nid=0x858 in Object.wait() [0x7f6fd81f] > 2020-09-15T10:51:35.5237447Zjava.lang.Thread.State: WAITING (on object > monitor) > 2020-09-15T10:51:35.5237962Z at java.lang.Object.wait(Native Method) > 2020-09-15T10:51:35.5238886Z - waiting on <0xc27f5be8> (a > java.util.ArrayDeque) > 2020-09-15T10:51:35.5239380Z at java.lang.Object.wait(Object.java:502) > 2020-09-15T10:51:35.5240401Z at > org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader.fetch(TestingSplitReader.java:52) > 2020-09-15T10:51:35.5241471Z - locked <0xc27f5be8> (a > java.util.ArrayDeque) > 2020-09-15T10:51:35.5242180Z at > org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) > 2020-09-15T10:51:35.5243245Z at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:128) > 2020-09-15T10:51:35.5244263Z at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:95) > 2020-09-15T10:51:35.5245128Z at > org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42) > 2020-09-15T10:51:35.5245973Z at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > 2020-09-15T10:51:35.5247081Z at > java.util.concurrent.FutureTask.run(FutureTask.java:266) > 2020-09-15T10:51:35.5247816Z at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > 2020-09-15T10:51:35.5248809Z at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > 2020-09-15T10:51:35.5249463Z at java.lang.Thread.run(Thread.java:748) > 2020-09-15T10:51:35.5249827Z > 2020-09-15T10:51:35.5250383Z "SourceFetcher" #37 prio=5 os_prio=0 > tid=0x7f70d0a4b000 nid=0x856 in Object.wait() [0x7f6f80cfa000] > 2020-09-15T10:51:35.5251124Zjava.lang.Thread.State: WAITING (on object > monitor) > 2020-09-15T10:51:35.5251636Z at java.lang.Object.wait(Native Method) > 2020-09-15T10:51:35.5252767Z - waiting on <0xc298d0b8> (a > java.util.ArrayDeque) > 2020-09-15T10:51:35.5253336Z at java.lang.Object.wait(Object.java:502) > 2020-09-15T10:51:35.5254184Z at > org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader.fetch(TestingSplitReader.java:52) > 2020-09-15T10:51:35.5255220Z - locked <0xc298d0b8> (a > java.util.ArrayDeque) > 2020-09-15T10:51:35.5255678Z at > org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) > 2020-09-15T10:51:35.5256235Z at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:128) > 2020-09-15T10:51:35.5256803Z at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:95) > 2020-09-15T10:51:35.5257351Z at > org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42) > 2020-09-15T10:51:35.5257838Z at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > 2020-09-15T10:51:35.5258284Z at > java.util.concurrent.FutureTask.run(FutureTask.java:266) > 2020-09-15T10:51:35.5258856Z at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > 2020-09-15T10:51:35.5259350Z at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > 2020-09-15T10:51:35.5260011Z at java.lang.Thread.run(Thread.java:748) > 2020-09-15T10:51:35.5260211Z >
[jira] [Commented] (FLINK-20050) SourceCoordinatorProviderTest.testCheckpointAndReset failed with NullPointerException
[ https://issues.apache.org/jira/browse/FLINK-20050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17228464#comment-17228464 ] Xuannan Su commented on FLINK-20050: I think it is introduced by this commit [d5b5103c|https://github.com/apache/flink/commit/d5b5103c1ed855b45fd08b1738f044b01864de58]. cc [~becket_qin] > SourceCoordinatorProviderTest.testCheckpointAndReset failed with > NullPointerException > - > > Key: FLINK-20050 > URL: https://issues.apache.org/jira/browse/FLINK-20050 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.12.0 >Reporter: Dian Fu >Priority: Critical > Labels: test-stability > Fix For: 1.12.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9322=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=7c61167f-30b3-5893-cc38-a9e3d057e392 > {code} > 2020-11-08T22:24:39.5642544Z [ERROR] > testCheckpointAndReset(org.apache.flink.runtime.source.coordinator.SourceCoordinatorProviderTest) > Time elapsed: 0.954 s <<< ERROR! > 2020-11-08T22:24:39.5643055Z java.lang.NullPointerException > 2020-11-08T22:24:39.5643578Z at > org.apache.flink.runtime.source.coordinator.SourceCoordinatorProviderTest.testCheckpointAndReset(SourceCoordinatorProviderTest.java:94) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20018) pipeline.cached-files option cannot escape ':' in path
Xuannan Su created FLINK-20018: -- Summary: pipeline.cached-files option cannot escape ':' in path Key: FLINK-20018 URL: https://issues.apache.org/jira/browse/FLINK-20018 Project: Flink Issue Type: Bug Reporter: Xuannan Su pipeline.cached-files option cannot escape ':' in path e.g. When setting it to `name:file1,path:oss://bucket/file1` The path of file1 is parsed as oss. And there are no way to escape the ':' in path. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19761) Add lookup method for registered ShuffleDescriptor in ShuffleMaster
[ https://issues.apache.org/jira/browse/FLINK-19761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17220452#comment-17220452 ] Xuannan Su commented on FLINK-19761: You are right that not returning all the information it needs to consume the result partition will weaken the functionality when sharing cluster partition across clusters. In that case, I agree to send the shuffle descriptor back to the client. However, I feel like it is still missing something without the look-up method. e.g., If the user indeed using an external shuffle service, and some partition is promoted so that it remains after the job is finished. In that case, the external shuffle service should not rely on the client to keep the information to consume the partition. > Add lookup method for registered ShuffleDescriptor in ShuffleMaster > --- > > Key: FLINK-19761 > URL: https://issues.apache.org/jira/browse/FLINK-19761 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Reporter: Xuannan Su >Priority: Major > > Currently, the ShuffleMaster can register a partition and get the shuffle > descriptor. However, it lacks the ability to look up the registered > ShuffleDescriptors belongs to an IntermediateResult by the > IntermediateDataSetID. > Adding the lookup method to the ShuffleMaster can make reusing the cluster > partition more easily. For example, we don't have to return the > ShuffleDescriptor to the client just so that the other job can somehow encode > the ShuffleDescriptor in the JobGraph to consume the cluster partition. > Instead, we only need to return the IntermediateDatSetID and use it to lookup > the ShuffleDescriptor by another job. > By adding the lookup method in ShuffleMaster, if we have an external shuffle > service and the lifecycle of the IntermediateResult is not bounded to the > cluster, we can look up the ShuffleDescriptor and reuse the > IntermediateResult by a job running on another cluster even if the cluster > that produced the IntermediateResult is shutdown. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19761) Add lookup method for registered ShuffleDescriptor in ShuffleMaster
[ https://issues.apache.org/jira/browse/FLINK-19761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218891#comment-17218891 ] Xuannan Su commented on FLINK-19761: [~trohrmann] The assumption is that the ShuffleMaster should know about the cluster partition you are asking for. Meaning if NettyShuffleService is used, the cluster partition should only be accessible in the same cluster. If you want to share the intermediate result across different clusters, you need to have an external shuffle service whose lifecycle is not bound to the cluster. If the client tries to ask for a cluster partition that the shuffle master doesn't know of, the job will fail, and it is up to the client-side to decide what to do. For the cache table on Table API, it can re-execute the original graph that produces the intermediate result. > Add lookup method for registered ShuffleDescriptor in ShuffleMaster > --- > > Key: FLINK-19761 > URL: https://issues.apache.org/jira/browse/FLINK-19761 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Reporter: Xuannan Su >Priority: Major > > Currently, the ShuffleMaster can register a partition and get the shuffle > descriptor. However, it lacks the ability to look up the registered > ShuffleDescriptors belongs to an IntermediateResult by the > IntermediateDataSetID. > Adding the lookup method to the ShuffleMaster can make reusing the cluster > partition more easily. For example, we don't have to return the > ShuffleDescriptor to the client just so that the other job can somehow encode > the ShuffleDescriptor in the JobGraph to consume the cluster partition. > Instead, we only need to return the IntermediateDatSetID and use it to lookup > the ShuffleDescriptor by another job. > By adding the lookup method in ShuffleMaster, if we have an external shuffle > service and the lifecycle of the IntermediateResult is not bounded to the > cluster, we can look up the ShuffleDescriptor and reuse the > IntermediateResult by a job running on another cluster even if the cluster > that produced the IntermediateResult is shutdown. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19761) Add lookup method for registered ShuffleDescriptor in ShuffleMaster
[ https://issues.apache.org/jira/browse/FLINK-19761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218826#comment-17218826 ] Xuannan Su commented on FLINK-19761: cc [~trohrmann] [~azagrebin] [~zjwang] I'd love to hear your thought on this. > Add lookup method for registered ShuffleDescriptor in ShuffleMaster > --- > > Key: FLINK-19761 > URL: https://issues.apache.org/jira/browse/FLINK-19761 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Reporter: Xuannan Su >Priority: Major > > Currently, the ShuffleMaster can register a partition and get the shuffle > descriptor. However, it lacks the ability to look up the registered > ShuffleDescriptors belongs to an IntermediateResult by the > IntermediateDataSetID. > Adding the lookup method to the ShuffleMaster can make reusing the cluster > partition more easily. For example, we don't have to return the > ShuffleDescriptor to the client just so that the other job can somehow encode > the ShuffleDescriptor in the JobGraph to consume the cluster partition. > Instead, we only need to return the IntermediateDatSetID and use it to lookup > the ShuffleDescriptor by another job. > By adding the lookup method in ShuffleMaster, if we have an external shuffle > service and the lifecycle of the IntermediateResult is not bounded to the > cluster, we can look up the ShuffleDescriptor and reuse the > IntermediateResult by a job running on another cluster even if the cluster > that produced the IntermediateResult is shutdown. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19761) Add lookup method for registered ShuffleDescriptor in ShuffleMaster
Xuannan Su created FLINK-19761: -- Summary: Add lookup method for registered ShuffleDescriptor in ShuffleMaster Key: FLINK-19761 URL: https://issues.apache.org/jira/browse/FLINK-19761 Project: Flink Issue Type: Improvement Components: Runtime / Network Reporter: Xuannan Su Currently, the ShuffleMaster can register a partition and get the shuffle descriptor. However, it lacks the ability to look up the registered ShuffleDescriptors belongs to an IntermediateResult by the IntermediateDataSetID. Adding the lookup method to the ShuffleMaster can make reusing the cluster partition more easily. For example, we don't have to return the ShuffleDescriptor to the client just so that the other job can somehow encode the ShuffleDescriptor in the JobGraph to consume the cluster partition. Instead, we only need to return the IntermediateDatSetID and use it to lookup the ShuffleDescriptor by another job. By adding the lookup method in ShuffleMaster, if we have an external shuffle service and the lifecycle of the IntermediateResult is not bounded to the cluster, we can look up the ShuffleDescriptor and reuse the IntermediateResult by a job running on another cluster even if the cluster that produced the IntermediateResult is shutdown. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19669) PipelinedRegionSchedulingStrategy#init ResultPartitionType blocking check should use isBlocking method
Xuannan Su created FLINK-19669: -- Summary: PipelinedRegionSchedulingStrategy#init ResultPartitionType blocking check should use isBlocking method Key: FLINK-19669 URL: https://issues.apache.org/jira/browse/FLINK-19669 Project: Flink Issue Type: Bug Reporter: Xuannan Su PipelinedRegionSchedulingStrategy#init ResultPartitionType blocking check should use isBlocking method instead of checking of the enum type. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19508) Add collect() operation on DataStream
[ https://issues.apache.org/jira/browse/FLINK-19508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17214408#comment-17214408 ] Xuannan Su commented on FLINK-19508: This is a very useful feature to have. Thanks [~sjwiesman] for picking up the ticket and working on this. Could you let me know the timeline when the work can be done? And I'd like to help to work on this if needed. > Add collect() operation on DataStream > - > > Key: FLINK-19508 > URL: https://issues.apache.org/jira/browse/FLINK-19508 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Reporter: Aljoscha Krettek >Assignee: Seth Wiesman >Priority: Major > > With the recent changes/additions to {{DataStreamUtils.collect()}} that make > it more robust by using the regular REST client to fetch results from > operators it might make sense to add a {{collect()}} operation right on > {{DataStream}}. > This operation is still not meant for big data volumes but I think it can be > useful for debugging and fetching small amounts of messages to the client. > When we do this, we can also think about changing {{print()}} to print on the > client instead of to the {{TaskManager}} stdout. I think the current > behaviour of this operation is mostly confusing for users. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19346) Generate and put ClusterPartitionDescriptor of ClusterPartition in JobResult when job finishes
[ https://issues.apache.org/jira/browse/FLINK-19346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xuannan Su updated FLINK-19346: --- Component/s: Runtime / Coordination > Generate and put ClusterPartitionDescriptor of ClusterPartition in JobResult > when job finishes > -- > > Key: FLINK-19346 > URL: https://issues.apache.org/jira/browse/FLINK-19346 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination, Table SQL / API >Reporter: Xuannan Su >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19343) FLIP-36: Support Interactive Programming in Flink
[ https://issues.apache.org/jira/browse/FLINK-19343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xuannan Su updated FLINK-19343: --- Component/s: (was: Table SQL / API) > FLIP-36: Support Interactive Programming in Flink > - > > Key: FLINK-19343 > URL: https://issues.apache.org/jira/browse/FLINK-19343 > Project: Flink > Issue Type: New Feature > Components: Runtime / Coordination >Reporter: Xuannan Su >Priority: Major > > Please refer to the FLIP for any details: > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19343) FLIP-36: Support Interactive Programming in Flink
[ https://issues.apache.org/jira/browse/FLINK-19343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xuannan Su updated FLINK-19343: --- Component/s: (was: Runtime / Coordination) Table SQL / API > FLIP-36: Support Interactive Programming in Flink > - > > Key: FLINK-19343 > URL: https://issues.apache.org/jira/browse/FLINK-19343 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API >Reporter: Xuannan Su >Priority: Major > > Please refer to the FLIP for any details: > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19343) FLIP-36: Support Interactive Programming in Flink
[ https://issues.apache.org/jira/browse/FLINK-19343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xuannan Su updated FLINK-19343: --- Component/s: Runtime / Coordination > FLIP-36: Support Interactive Programming in Flink > - > > Key: FLINK-19343 > URL: https://issues.apache.org/jira/browse/FLINK-19343 > Project: Flink > Issue Type: New Feature > Components: Runtime / Coordination, Table SQL / API >Reporter: Xuannan Su >Priority: Major > > Please refer to the FLIP for any details: > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19476) Introduce CacheManager in CatalogManager to keep track of the ClusterPartitionDescriptor
Xuannan Su created FLINK-19476: -- Summary: Introduce CacheManager in CatalogManager to keep track of the ClusterPartitionDescriptor Key: FLINK-19476 URL: https://issues.apache.org/jira/browse/FLINK-19476 Project: Flink Issue Type: Sub-task Reporter: Xuannan Su -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19355) Add close() method to TableEnvironment
Xuannan Su created FLINK-19355: -- Summary: Add close() method to TableEnvironment Key: FLINK-19355 URL: https://issues.apache.org/jira/browse/FLINK-19355 Project: Flink Issue Type: Sub-task Reporter: Xuannan Su -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19353) BlinkPlanner translate and optimize CacheOperation
Xuannan Su created FLINK-19353: -- Summary: BlinkPlanner translate and optimize CacheOperation Key: FLINK-19353 URL: https://issues.apache.org/jira/browse/FLINK-19353 Project: Flink Issue Type: Sub-task Reporter: Xuannan Su -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19354) Add invalidateCache() method in CachedTable
Xuannan Su created FLINK-19354: -- Summary: Add invalidateCache() method in CachedTable Key: FLINK-19354 URL: https://issues.apache.org/jira/browse/FLINK-19354 Project: Flink Issue Type: Sub-task Reporter: Xuannan Su -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19352) Add cache() method to Table
Xuannan Su created FLINK-19352: -- Summary: Add cache() method to Table Key: FLINK-19352 URL: https://issues.apache.org/jira/browse/FLINK-19352 Project: Flink Issue Type: Sub-task Reporter: Xuannan Su -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19350) StreamingJobGraphGenerator add the ClusterPartitionDescriptor of cached node to JobVertex
[ https://issues.apache.org/jira/browse/FLINK-19350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xuannan Su updated FLINK-19350: --- Summary: StreamingJobGraphGenerator add the ClusterPartitionDescriptor of cached node to JobVertex (was: StreamingJobGraphGenerator generate job graph with cached node) > StreamingJobGraphGenerator add the ClusterPartitionDescriptor of cached node > to JobVertex > -- > > Key: FLINK-19350 > URL: https://issues.apache.org/jira/browse/FLINK-19350 > Project: Flink > Issue Type: Sub-task >Reporter: Xuannan Su >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19349) StreamGraphGenerator handle CacheSource and CacheSink
[ https://issues.apache.org/jira/browse/FLINK-19349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xuannan Su updated FLINK-19349: --- Summary: StreamGraphGenerator handle CacheSource and CacheSink (was: StreamGraph handle CacheSource and CacheSink) > StreamGraphGenerator handle CacheSource and CacheSink > - > > Key: FLINK-19349 > URL: https://issues.apache.org/jira/browse/FLINK-19349 > Project: Flink > Issue Type: Sub-task >Reporter: Xuannan Su >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19351) StreamingJobGraphGenerator set the caching node to BoundedBlockingType
Xuannan Su created FLINK-19351: -- Summary: StreamingJobGraphGenerator set the caching node to BoundedBlockingType Key: FLINK-19351 URL: https://issues.apache.org/jira/browse/FLINK-19351 Project: Flink Issue Type: Sub-task Reporter: Xuannan Su -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19350) StreamingJobGraphGenerator generate job graph with cached node
Xuannan Su created FLINK-19350: -- Summary: StreamingJobGraphGenerator generate job graph with cached node Key: FLINK-19350 URL: https://issues.apache.org/jira/browse/FLINK-19350 Project: Flink Issue Type: Sub-task Reporter: Xuannan Su -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19348) Introduce CacheSource and CacheSink
Xuannan Su created FLINK-19348: -- Summary: Introduce CacheSource and CacheSink Key: FLINK-19348 URL: https://issues.apache.org/jira/browse/FLINK-19348 Project: Flink Issue Type: Sub-task Reporter: Xuannan Su -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19349) StreamGraph handle CacheSource and CacheSink
Xuannan Su created FLINK-19349: -- Summary: StreamGraph handle CacheSource and CacheSink Key: FLINK-19349 URL: https://issues.apache.org/jira/browse/FLINK-19349 Project: Flink Issue Type: Sub-task Reporter: Xuannan Su -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19346) Generate and put ClusterPartitionDescriptor of ClusterPartition in JobResult when job finishes
Xuannan Su created FLINK-19346: -- Summary: Generate and put ClusterPartitionDescriptor of ClusterPartition in JobResult when job finishes Key: FLINK-19346 URL: https://issues.apache.org/jira/browse/FLINK-19346 Project: Flink Issue Type: Sub-task Reporter: Xuannan Su -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19347) Generate InputGateDeploymentDescriptor from a JobVertex with ClusterPartitionDescriptor
Xuannan Su created FLINK-19347: -- Summary: Generate InputGateDeploymentDescriptor from a JobVertex with ClusterPartitionDescriptor Key: FLINK-19347 URL: https://issues.apache.org/jira/browse/FLINK-19347 Project: Flink Issue Type: Sub-task Reporter: Xuannan Su -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19343) FLIP-36: Support Interactive Programming in Flink
[ https://issues.apache.org/jira/browse/FLINK-19343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xuannan Su updated FLINK-19343: --- Component/s: Table SQL / API > FLIP-36: Support Interactive Programming in Flink > - > > Key: FLINK-19343 > URL: https://issues.apache.org/jira/browse/FLINK-19343 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API >Reporter: Xuannan Su >Priority: Major > > Please refer to the FLIP for any details: > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19343) FLIP-36: Support Interactive Programming in Flink
Xuannan Su created FLINK-19343: -- Summary: FLIP-36: Support Interactive Programming in Flink Key: FLINK-19343 URL: https://issues.apache.org/jira/browse/FLINK-19343 Project: Flink Issue Type: New Feature Reporter: Xuannan Su Please refer to the FLIP for any details: https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-18820) SourceOperator should send MAX_WATERMARK to downstream operator when closed
[ https://issues.apache.org/jira/browse/FLINK-18820?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xuannan Su updated FLINK-18820: --- Description: SourceOperator should send MAX_WATERMARK to the downstream operator when closed. Otherwise, the watermark of the downstream operator may not advance. (was: SourceOperator should send MAX_WATERMARK to the downstream operator when closed. Otherwise, the watermark of the downstream operator will not advance. ) > SourceOperator should send MAX_WATERMARK to downstream operator when closed > --- > > Key: FLINK-18820 > URL: https://issues.apache.org/jira/browse/FLINK-18820 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Reporter: Xuannan Su >Priority: Major > Labels: pull-request-available > > SourceOperator should send MAX_WATERMARK to the downstream operator when > closed. Otherwise, the watermark of the downstream operator may not advance. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18820) SourceOperator should send MAX_WATERMARK to downstream operator when closed
Xuannan Su created FLINK-18820: -- Summary: SourceOperator should send MAX_WATERMARK to downstream operator when closed Key: FLINK-18820 URL: https://issues.apache.org/jira/browse/FLINK-18820 Project: Flink Issue Type: Bug Components: API / DataStream Reporter: Xuannan Su SourceOperator should send MAX_WATERMARK to the downstream operator when closed. Otherwise, the watermark of the downstream operator will not advance. -- This message was sent by Atlassian Jira (v8.3.4#803005)