[jira] [Commented] (FLINK-10918) incremental Keyed state with RocksDB throws cannot create directory error in windows
[ https://issues.apache.org/jira/browse/FLINK-10918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16698092#comment-16698092 ] Amit commented on FLINK-10918: -- Un-related streaming test cases are failing, while the forked branch built without any errors > incremental Keyed state with RocksDB throws cannot create directory error in > windows > > > Key: FLINK-10918 > URL: https://issues.apache.org/jira/browse/FLINK-10918 > Project: Flink > Issue Type: Bug > Components: Queryable State, State Backends, Checkpointing >Affects Versions: 1.6.2 > Environment: windows > {code:java} > val env = StreamExecutionEnvironment.getExecutionEnvironment > val rocksdb = new RocksDBStateBackend("file:\\C:\\rocksdb\\checkpoint",true) > rocksdb.setDbStoragePath("file:\\C:\\rocksdb\\storage") > env.setStateBackend(rocksdb) > env.enableCheckpointing(10) > {code} > >Reporter: Amit >Priority: Major > Labels: pull-request-available > Attachments: > 0001-FLINK-10918-Fix-for-checkpoint-creation-on-windows-1.patch > > > Facing error while enabling keyed state with RocksDBBackend with > checkpointing to a local windows directory > > {code:java} > Caused by: org.rocksdb.RocksDBException: Failed to create dir: > /c:/tmp/data/job_dbe01128760d4d5cb90809cd94c2a936_op_StreamMap_b5c8d46f3e7b141acf271f12622e752b__3_8__uuid_45c1f62b-a198-44f5-add5-7683079b03f8/chk-1.tmp: > Invalid argument > at org.rocksdb.Checkpoint.createCheckpoint(Native Method) > at org.rocksdb.Checkpoint.createCheckpoint(Checkpoint.java:51) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.takeSnapshot(RocksDBKeyedStateBackend.java:2549) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$IncrementalSnapshotStrategy.performSnapshot(RocksDBKeyedStateBackend.java:2008) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:498) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:406) > ... 13 more > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10918) incremental Keyed state with RocksDB throws cannot create directory error in windows
[ https://issues.apache.org/jira/browse/FLINK-10918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16698071#comment-16698071 ] ASF GitHub Bot commented on FLINK-10918: imamitsehgal commented on issue #7171: FLINK-10918 Fix error while checkpointing on window keyed state rock … URL: https://github.com/apache/flink/pull/7171#issuecomment-441417072 The build is passing on the forked branch, the failure here is unrelated https://travis-ci.org/imamitsehgal/flink/builds/459154746 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > incremental Keyed state with RocksDB throws cannot create directory error in > windows > > > Key: FLINK-10918 > URL: https://issues.apache.org/jira/browse/FLINK-10918 > Project: Flink > Issue Type: Bug > Components: Queryable State, State Backends, Checkpointing >Affects Versions: 1.6.2 > Environment: windows > {code:java} > val env = StreamExecutionEnvironment.getExecutionEnvironment > val rocksdb = new RocksDBStateBackend("file:\\C:\\rocksdb\\checkpoint",true) > rocksdb.setDbStoragePath("file:\\C:\\rocksdb\\storage") > env.setStateBackend(rocksdb) > env.enableCheckpointing(10) > {code} > >Reporter: Amit >Priority: Major > Labels: pull-request-available > Attachments: > 0001-FLINK-10918-Fix-for-checkpoint-creation-on-windows-1.patch > > > Facing error while enabling keyed state with RocksDBBackend with > checkpointing to a local windows directory > > {code:java} > Caused by: org.rocksdb.RocksDBException: Failed to create dir: > /c:/tmp/data/job_dbe01128760d4d5cb90809cd94c2a936_op_StreamMap_b5c8d46f3e7b141acf271f12622e752b__3_8__uuid_45c1f62b-a198-44f5-add5-7683079b03f8/chk-1.tmp: > Invalid argument > at org.rocksdb.Checkpoint.createCheckpoint(Native Method) > at org.rocksdb.Checkpoint.createCheckpoint(Checkpoint.java:51) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.takeSnapshot(RocksDBKeyedStateBackend.java:2549) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$IncrementalSnapshotStrategy.performSnapshot(RocksDBKeyedStateBackend.java:2008) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:498) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:406) > ... 13 more > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] imamitsehgal commented on issue #7171: FLINK-10918 Fix error while checkpointing on window keyed state rock …
imamitsehgal commented on issue #7171: FLINK-10918 Fix error while checkpointing on window keyed state rock … URL: https://github.com/apache/flink/pull/7171#issuecomment-441417072 The build is passing on the forked branch, the failure here is unrelated https://travis-ci.org/imamitsehgal/flink/builds/459154746 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] xueyumusic commented on issue #7059: [FLINK-10689][table] Port UDFs in Table API extension points to flink-table-common
xueyumusic commented on issue #7059: [FLINK-10689][table] Port UDFs in Table API extension points to flink-table-common URL: https://github.com/apache/flink/pull/7059#issuecomment-441416404 Thanks for review, @twalthr , I updated the codes. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10689) Port UDFs in Table API extension points to flink-table-common
[ https://issues.apache.org/jira/browse/FLINK-10689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16698067#comment-16698067 ] ASF GitHub Bot commented on FLINK-10689: xueyumusic commented on issue #7059: [FLINK-10689][table] Port UDFs in Table API extension points to flink-table-common URL: https://github.com/apache/flink/pull/7059#issuecomment-441416404 Thanks for review, @twalthr , I updated the codes. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Port UDFs in Table API extension points to flink-table-common > - > > Key: FLINK-10689 > URL: https://issues.apache.org/jira/browse/FLINK-10689 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: xueyu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > After FLINK-10687 and FLINK-10688 have been resolved, we should also port the > remaining extension points of the Table API to flink-table-common. This > includes interfaces for UDFs and the external catalog interface. > This ticket is for porting UDFs. This jira does NOT depend on FLINK-16088 so > it can be started at anytime. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10884) Flink on yarn TM container will be killed by nodemanager because of the exceeded physical memory.
[ https://issues.apache.org/jira/browse/FLINK-10884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16698043#comment-16698043 ] wgcn commented on FLINK-10884: -- I think the memory assigned to jvm not only contains " heap memory" and "off heap memory" in container. Do you mean that the offHeapSizeMB should be assigned to " (containerMemoryMB - heapSizeMB) *(1-cutoffFactor)" > Flink on yarn TM container will be killed by nodemanager because of the > exceeded physical memory. > > > Key: FLINK-10884 > URL: https://issues.apache.org/jira/browse/FLINK-10884 > Project: Flink > Issue Type: Bug > Components: Cluster Management, Core >Affects Versions: 1.5.5, 1.6.2, 1.7.0 > Environment: version : 1.6.2 > module : flink on yarn > centos jdk1.8 > hadoop 2.7 >Reporter: wgcn >Assignee: wgcn >Priority: Major > Labels: yarn > > TM container will be killed by nodemanager because of the exceeded > [physical|http://www.baidu.com/link?url=Y4LyfMDH59n9-Ey16Fo6EFAYltN1e9anB3y2ynhVmdvuIBCkJGdH0hTExKDZRvXNr6hqhwIXs8JjYqesYbx0BOpQDD0o1VjbVQlOC-9MgXi] > memory. I found the lanuch context lanuching TM container that > "container memory = heap memory+ offHeapSizeMB" at the class > org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters > from line 160 to 166 I set a safety margin for the whole memory container > using. For example if the container limit 3g memory, the sum memory that > "heap memory+ offHeapSizeMB" is equal to 2.4g to prevent the container > being killed.Do we have the > [ready-made|http://www.baidu.com/link?url=ylC8cEafGU6DWAdU9ADcJPNugkjbx6IjtqIIxJ9foX4_Yfgc7ctWmpEpQRettVmBiOy7Wfph7S1UvN5LiJj-G1Rsb--oDw4Z2OEbA5Fj0bC] > solution or I can commit my solution -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7883) Make savepoints atomic with respect to state and side effects
[ https://issues.apache.org/jira/browse/FLINK-7883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16697931#comment-16697931 ] Steven Zhen Wu commented on FLINK-7883: --- We would love to see this happening. it is the "graceful" shutdown that we need to reduce/minimize duplicates if we are going to enable aggressive/frequent rescale events. otherwise, we are going to see frequent and significant duplicates. > Make savepoints atomic with respect to state and side effects > - > > Key: FLINK-7883 > URL: https://issues.apache.org/jira/browse/FLINK-7883 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Kafka Connector, State Backends, > Checkpointing >Affects Versions: 1.3.2, 1.4.0 >Reporter: Antoine Philippot >Priority: Major > > For a cancel with savepoint command, the JobManager trigger the cancel call > once the savepoint is finished, but during the savepoint execution, kafka > source continue to poll new messages which will not be part of the savepoint > and will be replayed on the next application start. > A solution could be to stop fetching the source stream task before triggering > the savepoint. > I suggest to add an interface {{StoppableFetchingSourceFunction}} with a > method {{stopFetching}} that existant SourceFunction implementations could > implement. > We can add a {{stopFetchingSource}} property in > {{CheckpointOptions}} class to pass the desired behaviour from > {{JobManager.handleMessage(CancelJobWithSavepoint)}} to > {{SourceStreamTask.triggerCheckpoint}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-10868) Flink's JobCluster ResourceManager doesn't use yarn.maximum-failed-containers as limit of resource acquirement
[ https://issues.apache.org/jira/browse/FLINK-10868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16697677#comment-16697677 ] Zhenqiu Huang edited comment on FLINK-10868 at 11/24/18 5:48 PM: - [~suez1224] [~till.rohrmann] Agree with Shuyi's proposal. As maximum-failed-containers is more a configuration for a job level rather than session cluster level. We may have a simple fix for Per Job cluster first to achieve feature parity with former release. 1) I will add a boolean parameter to createResourceManager function to distinguish whether it runs for a per job cluster or session cluster. And also pass LeaderGatewayRetriever dispatcherGatewayRetriever as one of parameters createResourceManager function in ResourceManagerFactory. 2) If it is per job cluster, One the threshold is hit, shutdownCluster by using DispatcherGateway. How do you think? was (Author: zhenqiuhuang): [~suez1224] [~till.rohrmann] Agree with Shuyi's proposal. As yarn.maximum-failed-containers is more a configuration for a job level rather than session cluster level. We may have a simple fix for Per Job cluster first to achieve feature parity with former release. 1) I will add a boolean parameter to YarnResourceManager to distinguish whether it runs for a per job cluster or session cluster. And also pass LeaderGatewayRetriever dispatcherGatewayRetriever as parameter of constructor of YarnResourceManager. 2) If it is per job cluster, One the threshold is hit, shutdownCluster by using DispatcherGateway. How do you think? > Flink's JobCluster ResourceManager doesn't use yarn.maximum-failed-containers > as limit of resource acquirement > -- > > Key: FLINK-10868 > URL: https://issues.apache.org/jira/browse/FLINK-10868 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.6.2, 1.7.0 >Reporter: Zhenqiu Huang >Assignee: Zhenqiu Huang >Priority: Major > > Currently, YarnResourceManager does use yarn.maximum-failed-containers as > limit of resource acquirement. In worse case, when new start containers > consistently fail, YarnResourceManager will goes into an infinite resource > acquirement process without failing the job. Together with the > https://issues.apache.org/jira/browse/FLINK-10848, It will quick occupy all > resources of yarn queue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10868) Flink's JobCluster ResourceManager doesn't use yarn.maximum-failed-containers as limit of resource acquirement
[ https://issues.apache.org/jira/browse/FLINK-10868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhenqiu Huang updated FLINK-10868: -- Summary: Flink's JobCluster ResourceManager doesn't use yarn.maximum-failed-containers as limit of resource acquirement (was: Flink's Yarn ResourceManager doesn't use yarn.maximum-failed-containers as limit of resource acquirement) > Flink's JobCluster ResourceManager doesn't use yarn.maximum-failed-containers > as limit of resource acquirement > -- > > Key: FLINK-10868 > URL: https://issues.apache.org/jira/browse/FLINK-10868 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.6.2, 1.7.0 >Reporter: Zhenqiu Huang >Assignee: Zhenqiu Huang >Priority: Major > > Currently, YarnResourceManager does use yarn.maximum-failed-containers as > limit of resource acquirement. In worse case, when new start containers > consistently fail, YarnResourceManager will goes into an infinite resource > acquirement process without failing the job. Together with the > https://issues.apache.org/jira/browse/FLINK-10848, It will quick occupy all > resources of yarn queue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10918) incremental Keyed state with RocksDB throws cannot create directory error in windows
[ https://issues.apache.org/jira/browse/FLINK-10918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16697871#comment-16697871 ] ASF GitHub Bot commented on FLINK-10918: imamitsehgal commented on issue #7171: FLINK-10918 Fix error while checkpointing on window keyed state rock … URL: https://github.com/apache/flink/pull/7171#issuecomment-441378109 What is the purpose of the change? To fix the issue with creating directory while running on windows with keyed state and rocksdbbackend. Brief change log RockDBKeyedStateBackend class changes to remove leading '/' in the path string Verifying this change This change is already covered by existing tests, such as (please describe tests). Documentation Does this pull request introduce a new feature? NO This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > incremental Keyed state with RocksDB throws cannot create directory error in > windows > > > Key: FLINK-10918 > URL: https://issues.apache.org/jira/browse/FLINK-10918 > Project: Flink > Issue Type: Bug > Components: Queryable State, State Backends, Checkpointing >Affects Versions: 1.6.2 > Environment: windows > {code:java} > val env = StreamExecutionEnvironment.getExecutionEnvironment > val rocksdb = new RocksDBStateBackend("file:\\C:\\rocksdb\\checkpoint",true) > rocksdb.setDbStoragePath("file:\\C:\\rocksdb\\storage") > env.setStateBackend(rocksdb) > env.enableCheckpointing(10) > {code} > >Reporter: Amit >Priority: Major > Labels: pull-request-available > Attachments: > 0001-FLINK-10918-Fix-for-checkpoint-creation-on-windows-1.patch > > > Facing error while enabling keyed state with RocksDBBackend with > checkpointing to a local windows directory > > {code:java} > Caused by: org.rocksdb.RocksDBException: Failed to create dir: > /c:/tmp/data/job_dbe01128760d4d5cb90809cd94c2a936_op_StreamMap_b5c8d46f3e7b141acf271f12622e752b__3_8__uuid_45c1f62b-a198-44f5-add5-7683079b03f8/chk-1.tmp: > Invalid argument > at org.rocksdb.Checkpoint.createCheckpoint(Native Method) > at org.rocksdb.Checkpoint.createCheckpoint(Checkpoint.java:51) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.takeSnapshot(RocksDBKeyedStateBackend.java:2549) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$IncrementalSnapshotStrategy.performSnapshot(RocksDBKeyedStateBackend.java:2008) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:498) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:406) > ... 13 more > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] imamitsehgal commented on issue #7171: FLINK-10918 Fix error while checkpointing on window keyed state rock …
imamitsehgal commented on issue #7171: FLINK-10918 Fix error while checkpointing on window keyed state rock … URL: https://github.com/apache/flink/pull/7171#issuecomment-441378109 What is the purpose of the change? To fix the issue with creating directory while running on windows with keyed state and rocksdbbackend. Brief change log RockDBKeyedStateBackend class changes to remove leading '/' in the path string Verifying this change This change is already covered by existing tests, such as (please describe tests). Documentation Does this pull request introduce a new feature? NO This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10918) incremental Keyed state with RocksDB throws cannot create directory error in windows
[ https://issues.apache.org/jira/browse/FLINK-10918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16697868#comment-16697868 ] ASF GitHub Bot commented on FLINK-10918: imamitsehgal opened a new pull request #7171: FLINK-10918 Fix error while checkpointing on window keyed state rock … URL: https://github.com/apache/flink/pull/7171 …db backend ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > incremental Keyed state with RocksDB throws cannot create directory error in > windows > > > Key: FLINK-10918 > URL: https://issues.apache.org/jira/browse/FLINK-10918 > Project: Flink > Issue Type: Bug > Components: Queryable State, State Backends, Checkpointing >Affects Versions: 1.6.2 > Environment: windows > {code:java} > val env = StreamExecutionEnvironment.getExecutionEnvironment > val rocksdb = new RocksDBStateBackend("file:\\C:\\rocksdb\\checkpoint",true) > rocksdb.setDbStoragePath("file:\\C:\\rocksdb\\storage") > env.setStateBackend(rocksdb) > env.enableCheckpointing(10) > {code} > >Reporter: Amit >Priority: Major > Labels: pull-request-available > Attachments: > 0001-FLINK-10918-Fix-for-checkpoint-creation-on-windows-1.patch > > > Facing error while enabling keyed state with RocksDBBackend with > checkpointing to a local windows directory > > {code:java} > Caused by: org.rocksdb.RocksDBException: Failed to create dir: > /c:/tmp/data/job_dbe01128760d4d5cb90809cd94c2a936_op_StreamMap_b5c8d46f3e7b141acf271f12622e752b__3_8__uuid_45c1f62b-a198-44f5-add5-7683079b03f8/chk-1.tmp: > Invalid argument > at org.rocksdb.Checkpoint.createCheckpoint(Native Method) > at org.rocksdb.Checkpoint.createCheckpoint(Checkpoint.java:51) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.takeSnapshot(RocksDBKeyedStateBackend.java:2549) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$IncrementalSnapshotStrategy.performSnapshot(RocksDBKeyedStateBackend.java:2008) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:498) > at >
[GitHub] imamitsehgal opened a new pull request #7171: FLINK-10918 Fix error while checkpointing on window keyed state rock …
imamitsehgal opened a new pull request #7171: FLINK-10918 Fix error while checkpointing on window keyed state rock … URL: https://github.com/apache/flink/pull/7171 …db backend ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10918) incremental Keyed state with RocksDB throws cannot create directory error in windows
[ https://issues.apache.org/jira/browse/FLINK-10918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16697867#comment-16697867 ] ASF GitHub Bot commented on FLINK-10918: imamitsehgal closed pull request #7153: FLINK-10918 Fix the checkpoint dir creation error on Window for keyed rocksdb state URL: https://github.com/apache/flink/pull/7153 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 17ba985f883..4db32e1d2c8 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -557,6 +557,14 @@ private void createDB() throws IOException { this.defaultColumnFamily = columnFamilyHandles.get(0); } + private String getPathAsString(Path path) { + String pathString = path.getPath(); + if (path.hasWindowsDrive() && pathString.startsWith("/")) { + pathString = pathString.substring(1); + } + return pathString; + } + private RocksDB openDB( String path, List stateColumnFamilyDescriptors, @@ -575,7 +583,7 @@ private RocksDB openDB( try { dbRef = RocksDB.open( Preconditions.checkNotNull(dbOptions), - Preconditions.checkNotNull(path), + Preconditions.checkNotNull(getPathAsString(new Path(path))), columnFamilyDescriptors, stateColumnFamilyHandles); } catch (RocksDBException e) { @@ -2374,6 +2382,7 @@ private void closeLocalRegistry() { } } + /** * Encapsulates the process to perform an incremental snapshot of a RocksDBKeyedStateBackend. */ @@ -2546,7 +2555,9 @@ void takeSnapshot() throws Exception { // create hard links of living files in the snapshot path try (Checkpoint checkpoint = Checkpoint.create(stateBackend.db)) { - checkpoint.createCheckpoint(localBackupDirectory.getDirectory().getPath()); + String path = stateBackend.getPathAsString(localBackupDirectory.getDirectory()); + LOG.trace("Checkpoint path is {}.", path); + checkpoint.createCheckpoint(path); } } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > incremental Keyed state with RocksDB throws cannot create directory error in > windows > > > Key: FLINK-10918 > URL: https://issues.apache.org/jira/browse/FLINK-10918 > Project: Flink > Issue Type: Bug > Components: Queryable State, State Backends, Checkpointing >Affects Versions: 1.6.2 > Environment: windows > {code:java} > val env = StreamExecutionEnvironment.getExecutionEnvironment > val rocksdb = new RocksDBStateBackend("file:\\C:\\rocksdb\\checkpoint",true) > rocksdb.setDbStoragePath("file:\\C:\\rocksdb\\storage") > env.setStateBackend(rocksdb) > env.enableCheckpointing(10) > {code} > >Reporter: Amit >Priority: Major > Labels: pull-request-available > Attachments: > 0001-FLINK-10918-Fix-for-checkpoint-creation-on-windows-1.patch > > > Facing error while enabling keyed state with RocksDBBackend with > checkpointing to a local windows directory > > {code:java} > Caused by: org.rocksdb.RocksDBException: Failed to create dir: > /c:/tmp/data/job_dbe01128760d4d5cb90809cd94c2a936_op_StreamMap_b5c8d46f3e7b141acf271f12622e752b__3_8__uuid_45c1f62b-a198-44f5-add5-7683079b03f8/chk-1.tmp: > Invalid argument > at org.rocksdb.Checkpoint.createCheckpoint(Native Method) > at
[GitHub] imamitsehgal closed pull request #7153: FLINK-10918 Fix the checkpoint dir creation error on Window for keyed rocksdb state
imamitsehgal closed pull request #7153: FLINK-10918 Fix the checkpoint dir creation error on Window for keyed rocksdb state URL: https://github.com/apache/flink/pull/7153 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 17ba985f883..4db32e1d2c8 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -557,6 +557,14 @@ private void createDB() throws IOException { this.defaultColumnFamily = columnFamilyHandles.get(0); } + private String getPathAsString(Path path) { + String pathString = path.getPath(); + if (path.hasWindowsDrive() && pathString.startsWith("/")) { + pathString = pathString.substring(1); + } + return pathString; + } + private RocksDB openDB( String path, List stateColumnFamilyDescriptors, @@ -575,7 +583,7 @@ private RocksDB openDB( try { dbRef = RocksDB.open( Preconditions.checkNotNull(dbOptions), - Preconditions.checkNotNull(path), + Preconditions.checkNotNull(getPathAsString(new Path(path))), columnFamilyDescriptors, stateColumnFamilyHandles); } catch (RocksDBException e) { @@ -2374,6 +2382,7 @@ private void closeLocalRegistry() { } } + /** * Encapsulates the process to perform an incremental snapshot of a RocksDBKeyedStateBackend. */ @@ -2546,7 +2555,9 @@ void takeSnapshot() throws Exception { // create hard links of living files in the snapshot path try (Checkpoint checkpoint = Checkpoint.create(stateBackend.db)) { - checkpoint.createCheckpoint(localBackupDirectory.getDirectory().getPath()); + String path = stateBackend.getPathAsString(localBackupDirectory.getDirectory()); + LOG.trace("Checkpoint path is {}.", path); + checkpoint.createCheckpoint(path); } } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10918) incremental Keyed state with RocksDB throws cannot create directory error in windows
[ https://issues.apache.org/jira/browse/FLINK-10918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16697807#comment-16697807 ] ASF GitHub Bot commented on FLINK-10918: imamitsehgal commented on a change in pull request #7153: FLINK-10918 Fix the checkpoint dir creation error on Window for keyed rocksdb state URL: https://github.com/apache/flink/pull/7153#discussion_r236042030 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -557,6 +557,16 @@ private void createDB() throws IOException { this.defaultColumnFamily = columnFamilyHandles.get(0); } + private static class Utils { Review comment: valid point, I initially thought to create utility class, then decided to keep it local. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > incremental Keyed state with RocksDB throws cannot create directory error in > windows > > > Key: FLINK-10918 > URL: https://issues.apache.org/jira/browse/FLINK-10918 > Project: Flink > Issue Type: Bug > Components: Queryable State, State Backends, Checkpointing >Affects Versions: 1.6.2 > Environment: windows > {code:java} > val env = StreamExecutionEnvironment.getExecutionEnvironment > val rocksdb = new RocksDBStateBackend("file:\\C:\\rocksdb\\checkpoint",true) > rocksdb.setDbStoragePath("file:\\C:\\rocksdb\\storage") > env.setStateBackend(rocksdb) > env.enableCheckpointing(10) > {code} > >Reporter: Amit >Priority: Major > Labels: pull-request-available > Attachments: > 0001-FLINK-10918-Fix-for-checkpoint-creation-on-windows-1.patch > > > Facing error while enabling keyed state with RocksDBBackend with > checkpointing to a local windows directory > > {code:java} > Caused by: org.rocksdb.RocksDBException: Failed to create dir: > /c:/tmp/data/job_dbe01128760d4d5cb90809cd94c2a936_op_StreamMap_b5c8d46f3e7b141acf271f12622e752b__3_8__uuid_45c1f62b-a198-44f5-add5-7683079b03f8/chk-1.tmp: > Invalid argument > at org.rocksdb.Checkpoint.createCheckpoint(Native Method) > at org.rocksdb.Checkpoint.createCheckpoint(Checkpoint.java:51) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.takeSnapshot(RocksDBKeyedStateBackend.java:2549) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$IncrementalSnapshotStrategy.performSnapshot(RocksDBKeyedStateBackend.java:2008) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:498) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:406) > ... 13 more > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] imamitsehgal commented on a change in pull request #7153: FLINK-10918 Fix the checkpoint dir creation error on Window for keyed rocksdb state
imamitsehgal commented on a change in pull request #7153: FLINK-10918 Fix the checkpoint dir creation error on Window for keyed rocksdb state URL: https://github.com/apache/flink/pull/7153#discussion_r236042030 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -557,6 +557,16 @@ private void createDB() throws IOException { this.defaultColumnFamily = columnFamilyHandles.get(0); } + private static class Utils { Review comment: valid point, I initially thought to create utility class, then decided to keep it local. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10918) incremental Keyed state with RocksDB throws cannot create directory error in windows
[ https://issues.apache.org/jira/browse/FLINK-10918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16697806#comment-16697806 ] ASF GitHub Bot commented on FLINK-10918: imamitsehgal commented on a change in pull request #7153: FLINK-10918 Fix the checkpoint dir creation error on Window for keyed rocksdb state URL: https://github.com/apache/flink/pull/7153#discussion_r236042001 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -575,7 +585,7 @@ private RocksDB openDB( try { dbRef = RocksDB.open( Preconditions.checkNotNull(dbOptions), - Preconditions.checkNotNull(path), + Preconditions.checkNotNull(Utils.getPathAsString(new Path(path))), Review comment: If Path is null or "" then new Path(path) will throw runtime IllegalArgumentException. The path returned by getPathAsString will be checked again to be non-null. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > incremental Keyed state with RocksDB throws cannot create directory error in > windows > > > Key: FLINK-10918 > URL: https://issues.apache.org/jira/browse/FLINK-10918 > Project: Flink > Issue Type: Bug > Components: Queryable State, State Backends, Checkpointing >Affects Versions: 1.6.2 > Environment: windows > {code:java} > val env = StreamExecutionEnvironment.getExecutionEnvironment > val rocksdb = new RocksDBStateBackend("file:\\C:\\rocksdb\\checkpoint",true) > rocksdb.setDbStoragePath("file:\\C:\\rocksdb\\storage") > env.setStateBackend(rocksdb) > env.enableCheckpointing(10) > {code} > >Reporter: Amit >Priority: Major > Labels: pull-request-available > Attachments: > 0001-FLINK-10918-Fix-for-checkpoint-creation-on-windows-1.patch > > > Facing error while enabling keyed state with RocksDBBackend with > checkpointing to a local windows directory > > {code:java} > Caused by: org.rocksdb.RocksDBException: Failed to create dir: > /c:/tmp/data/job_dbe01128760d4d5cb90809cd94c2a936_op_StreamMap_b5c8d46f3e7b141acf271f12622e752b__3_8__uuid_45c1f62b-a198-44f5-add5-7683079b03f8/chk-1.tmp: > Invalid argument > at org.rocksdb.Checkpoint.createCheckpoint(Native Method) > at org.rocksdb.Checkpoint.createCheckpoint(Checkpoint.java:51) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.takeSnapshot(RocksDBKeyedStateBackend.java:2549) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$IncrementalSnapshotStrategy.performSnapshot(RocksDBKeyedStateBackend.java:2008) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:498) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:406) > ... 13 more > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] imamitsehgal commented on a change in pull request #7153: FLINK-10918 Fix the checkpoint dir creation error on Window for keyed rocksdb state
imamitsehgal commented on a change in pull request #7153: FLINK-10918 Fix the checkpoint dir creation error on Window for keyed rocksdb state URL: https://github.com/apache/flink/pull/7153#discussion_r236042001 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -575,7 +585,7 @@ private RocksDB openDB( try { dbRef = RocksDB.open( Preconditions.checkNotNull(dbOptions), - Preconditions.checkNotNull(path), + Preconditions.checkNotNull(Utils.getPathAsString(new Path(path))), Review comment: If Path is null or "" then new Path(path) will throw runtime IllegalArgumentException. The path returned by getPathAsString will be checked again to be non-null. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] imamitsehgal commented on a change in pull request #7153: FLINK-10918 Fix the checkpoint dir creation error on Window for keyed rocksdb state
imamitsehgal commented on a change in pull request #7153: FLINK-10918 Fix the checkpoint dir creation error on Window for keyed rocksdb state URL: https://github.com/apache/flink/pull/7153#discussion_r236041049 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -557,6 +557,16 @@ private void createDB() throws IOException { this.defaultColumnFamily = columnFamilyHandles.get(0); } + private static class Utils { + static String getPathAsString(Path path) { + String pathString = path.getPath(); Review comment: I tried that it kept failing the test cases. Event tried the toString method in Path class which removes the leading '/' even that fails the test cases This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10918) incremental Keyed state with RocksDB throws cannot create directory error in windows
[ https://issues.apache.org/jira/browse/FLINK-10918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16697781#comment-16697781 ] ASF GitHub Bot commented on FLINK-10918: imamitsehgal commented on a change in pull request #7153: FLINK-10918 Fix the checkpoint dir creation error on Window for keyed rocksdb state URL: https://github.com/apache/flink/pull/7153#discussion_r236041049 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -557,6 +557,16 @@ private void createDB() throws IOException { this.defaultColumnFamily = columnFamilyHandles.get(0); } + private static class Utils { + static String getPathAsString(Path path) { + String pathString = path.getPath(); Review comment: I tried that it kept failing the test cases. Event tried the toString method in Path class which removes the leading '/' even that fails the test cases This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > incremental Keyed state with RocksDB throws cannot create directory error in > windows > > > Key: FLINK-10918 > URL: https://issues.apache.org/jira/browse/FLINK-10918 > Project: Flink > Issue Type: Bug > Components: Queryable State, State Backends, Checkpointing >Affects Versions: 1.6.2 > Environment: windows > {code:java} > val env = StreamExecutionEnvironment.getExecutionEnvironment > val rocksdb = new RocksDBStateBackend("file:\\C:\\rocksdb\\checkpoint",true) > rocksdb.setDbStoragePath("file:\\C:\\rocksdb\\storage") > env.setStateBackend(rocksdb) > env.enableCheckpointing(10) > {code} > >Reporter: Amit >Priority: Major > Labels: pull-request-available > Attachments: > 0001-FLINK-10918-Fix-for-checkpoint-creation-on-windows-1.patch > > > Facing error while enabling keyed state with RocksDBBackend with > checkpointing to a local windows directory > > {code:java} > Caused by: org.rocksdb.RocksDBException: Failed to create dir: > /c:/tmp/data/job_dbe01128760d4d5cb90809cd94c2a936_op_StreamMap_b5c8d46f3e7b141acf271f12622e752b__3_8__uuid_45c1f62b-a198-44f5-add5-7683079b03f8/chk-1.tmp: > Invalid argument > at org.rocksdb.Checkpoint.createCheckpoint(Native Method) > at org.rocksdb.Checkpoint.createCheckpoint(Checkpoint.java:51) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.takeSnapshot(RocksDBKeyedStateBackend.java:2549) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$IncrementalSnapshotStrategy.performSnapshot(RocksDBKeyedStateBackend.java:2008) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:498) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:406) > ... 13 more > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10999) Adding time multiple times causes Runtime : java.sql.Timestamp cannot be cast to java.lang.Long
[ https://issues.apache.org/jira/browse/FLINK-10999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16697745#comment-16697745 ] ideal-hp commented on FLINK-10999: -- Thank you for your help! I updated the update in detail at https://github.com/harbby/sylph/issues/23 > Adding time multiple times causes Runtime : java.sql.Timestamp cannot be cast > to java.lang.Long > --- > > Key: FLINK-10999 > URL: https://issues.apache.org/jira/browse/FLINK-10999 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.5.5, 1.6.2 >Reporter: ideal-hp >Priority: Major > > Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be cast to > java.lang.Long > at > org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:27) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:95) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:46) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37) > at > org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28) > at DataStreamCalcRule$15.processElement(Unknown Source) > at > org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:66) > at > org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35) > at > org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10999) Adding time multiple times causes Runtime : java.sql.Timestamp cannot be cast to java.lang.Long
[ https://issues.apache.org/jira/browse/FLINK-10999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16697743#comment-16697743 ] xuqianjin commented on FLINK-10999: --- [~harbby] I understand your description. Thank you very much. I will try to track this problem. > Adding time multiple times causes Runtime : java.sql.Timestamp cannot be cast > to java.lang.Long > --- > > Key: FLINK-10999 > URL: https://issues.apache.org/jira/browse/FLINK-10999 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.5.5, 1.6.2 >Reporter: ideal-hp >Priority: Major > > Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be cast to > java.lang.Long > at > org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:27) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:95) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:46) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37) > at > org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28) > at DataStreamCalcRule$15.processElement(Unknown Source) > at > org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:66) > at > org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35) > at > org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11001) Window rowtime attribute can't be renamed in Java
Hequn Cheng created FLINK-11001: --- Summary: Window rowtime attribute can't be renamed in Java Key: FLINK-11001 URL: https://issues.apache.org/jira/browse/FLINK-11001 Project: Flink Issue Type: Bug Environment: Reporter: Hequn Cheng Assignee: Hequn Cheng Currently, we can rename window rowtime attribute like this in Scala: {code:java} table .window(Tumble over 2.millis on 'rowtime as 'w) .groupBy('w) .select('w.rowtime as 'rowtime, 'int.count as 'int) {code} However, an exception will be thrown if we use java(by changing the Expressions to String): {code:java} table .window(Tumble over 2.millis on 'rowtime as 'w) .groupBy('w) .select("w.rowtime as rowtime, int.count as int") {code} The Exception is: {code:java} org.apache.flink.table.api.ExpressionParserException: Could not parse expression at column 11: `,' expected but `a' found w.rowtime as rowtime, int.count as int {code} To solve the problem, we can add rename support in {{ExpressionParser}}. However, this may conflict with the design of source which use as before rowtime: {code:java} stream.toTable( tEnv, ExpressionParser.parseExpressionList("(b as b).rowtime, c as c, a as a"): _*) {code} Personally, I think we should keep the two consistent, so the final api would be: {code:java} // window case .select("w.rowtime as rowtime, int.count as int") // source case stream.toTable( tEnv, ExpressionParser.parseExpressionList("b.rowtime as b, c as c, a as a"): _*) {code} Any suggestions would be greatly appreciated! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10999) Adding time multiple times causes Runtime : java.sql.Timestamp cannot be cast to java.lang.Long
[ https://issues.apache.org/jira/browse/FLINK-10999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16697730#comment-16697730 ] ideal-hp commented on FLINK-10999: -- [~x1q1j1]: Hi, I am very sorry, my expression is not clear. I re-describe: When `tableEnv.registerDataStream` is used multiple times, specifying `proctime.proctime` or `rowtime.rowtime` will cause this error. I have a more detailed description and reproduction here. see: https://github.com/harbby/sylph/issues/23 > Adding time multiple times causes Runtime : java.sql.Timestamp cannot be cast > to java.lang.Long > --- > > Key: FLINK-10999 > URL: https://issues.apache.org/jira/browse/FLINK-10999 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.5.5, 1.6.2 >Reporter: ideal-hp >Priority: Major > > Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be cast to > java.lang.Long > at > org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:27) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:95) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:46) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37) > at > org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28) > at DataStreamCalcRule$15.processElement(Unknown Source) > at > org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:66) > at > org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35) > at > org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10999) Adding time multiple times causes Runtime : java.sql.Timestamp cannot be cast to java.lang.Long
[ https://issues.apache.org/jira/browse/FLINK-10999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16697701#comment-16697701 ] xuqianjin commented on FLINK-10999: --- [~harbby]It is not easy to understand your question from your description and log. > Adding time multiple times causes Runtime : java.sql.Timestamp cannot be cast > to java.lang.Long > --- > > Key: FLINK-10999 > URL: https://issues.apache.org/jira/browse/FLINK-10999 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.5.5, 1.6.2 >Reporter: ideal-hp >Priority: Major > > Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be cast to > java.lang.Long > at > org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:27) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:95) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:46) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37) > at > org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28) > at DataStreamCalcRule$15.processElement(Unknown Source) > at > org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:66) > at > org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35) > at > org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) -- This message was sent by Atlassian JIRA (v7.6.3#76005)