[GitHub] dianfu commented on issue #7322: [FLINK-11176] [table][tests] Improve the harness tests to use the code-generated operator
dianfu commented on issue #7322: [FLINK-11176] [table][tests] Improve the harness tests to use the code-generated operator URL: https://github.com/apache/flink/pull/7322#issuecomment-454683273 @sunjincheng121 Thanks in advance for the review. Have updated the PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11343) TaskExecutorTest is unstable on travis
[ https://issues.apache.org/jira/browse/FLINK-11343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11343: --- Labels: pull-request-available (was: ) > TaskExecutorTest is unstable on travis > -- > > Key: FLINK-11343 > URL: https://issues.apache.org/jira/browse/FLINK-11343 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.8.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > {code:java} > org.apache.flink.runtime.util.TestingFatalErrorHandler$TestingException: > java.lang.IllegalStateException: Memory manager has been shut down. > at > org.apache.flink.runtime.util.TestingFatalErrorHandler.rethrowError(TestingFatalErrorHandler.java:51) > at > org.apache.flink.runtime.taskexecutor.TaskExecutorTest.teardown(TaskExecutorTest.java:223) > Caused by: java.lang.IllegalStateException: Memory manager has been shut down. > at > org.apache.flink.runtime.memory.MemoryManager.releaseAll(MemoryManager.java:480) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:821) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun opened a new pull request #7501: [FLINK-11343] Omit actions in memory manager after shutdown instead o…
TisonKun opened a new pull request #7501: [FLINK-11343] Omit actions in memory manager after shutdown instead o… URL: https://github.com/apache/flink/pull/7501 …f IllegalStateException ## What is the purpose of the change >org.apache.flink.runtime.util.TestingFatalErrorHandler$TestingException: java.lang.IllegalStateException: Memory manager has been shut down. at org.apache.flink.runtime.util.TestingFatalErrorHandler.rethrowError(TestingFatalErrorHandler.java:51) at org.apache.flink.runtime.taskexecutor.TaskExecutorTest.teardown(TaskExecutorTest.java:223) Caused by: java.lang.IllegalStateException: Memory manager has been shut down. at org.apache.flink.runtime.memory.MemoryManager.releaseAll(MemoryManager.java:480) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:821) at java.lang.Thread.run(Thread.java:748) After an investigation, I notice that it occurred when 1. Task#run set task state as FINISHED 2. Test case get the FINISHED future, call taskExecutor#shutdown, so the MemoryManager shutdown. 3. In `finally` block of Task#run, call MemoryManager#releaseAll(owner) 4. cause `IllegalStateException` Here, I find that after MemoryManager#shutdown, `allocatedSegments` and `memoryPool` are released properly. Thus a following #releaseAll can be safely omitted and an `IllegalStateException` is no-need. ## Brief change log Omit actions in memory manager after shutdown instead of throwing IllegalStateException ## Verifying this change This change is already covered by existing tests, such as *TaskExecutorTest*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sunjincheng121 commented on issue #7322: [FLINK-11176] [table][tests] Improve the harness tests to use the code-generated operator
sunjincheng121 commented on issue #7322: [FLINK-11176] [table][tests] Improve the harness tests to use the code-generated operator URL: https://github.com/apache/flink/pull/7322#issuecomment-454677404 @dianfu thanks for the PR. I'll have look at the changes, after you rebase the code :-) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sunjincheng121 commented on issue #7286: [FLINK-8739] [table] Optimize DISTINCT aggregates to use the same distinct accumulator if possible
sunjincheng121 commented on issue #7286: [FLINK-8739] [table] Optimize DISTINCT aggregates to use the same distinct accumulator if possible URL: https://github.com/apache/flink/pull/7286#issuecomment-454676934 Hi @dianfu Thanks for the quickly update! LGTM. +1 to merged. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-11343) TaskExecutorTest is unstable on travis
TisonKun created FLINK-11343: Summary: TaskExecutorTest is unstable on travis Key: FLINK-11343 URL: https://issues.apache.org/jira/browse/FLINK-11343 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.8.0 Reporter: TisonKun Assignee: TisonKun Fix For: 1.8.0 {code:java} org.apache.flink.runtime.util.TestingFatalErrorHandler$TestingException: java.lang.IllegalStateException: Memory manager has been shut down. at org.apache.flink.runtime.util.TestingFatalErrorHandler.rethrowError(TestingFatalErrorHandler.java:51) at org.apache.flink.runtime.taskexecutor.TaskExecutorTest.teardown(TaskExecutorTest.java:223) Caused by: java.lang.IllegalStateException: Memory manager has been shut down. at org.apache.flink.runtime.memory.MemoryManager.releaseAll(MemoryManager.java:480) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:821) at java.lang.Thread.run(Thread.java:748) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11331) Fix errors in tableApi.md and functions.md
[ https://issues.apache.org/jira/browse/FLINK-11331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16743708#comment-16743708 ] sunjincheng commented on FLINK-11331: - Fixed in release-1.6: 9b7af8665e08f73e294159771ced711e44ea24b1 > Fix errors in tableApi.md and functions.md > -- > > Key: FLINK-11331 > URL: https://issues.apache.org/jira/browse/FLINK-11331 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Affects Versions: 1.7.0, 1.7.1 >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > Fix For: 1.7.2, 1.8.0 > > Time Spent: 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] intsmaze closed pull request #7490: restoreFunctionState() add parameter getOperatorStateBackend()
intsmaze closed pull request #7490: restoreFunctionState() add parameter getOperatorStateBackend() URL: https://github.com/apache/flink/pull/7490 As this is a foreign pull request (from a fork), the diff has been sent to your commit mailing list, comm...@flink.apache.org This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-11331) Fix errors in tableApi.md and functions.md
[ https://issues.apache.org/jira/browse/FLINK-11331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng closed FLINK-11331. --- Resolution: Fixed Fix Version/s: 1.8.0 1.7.2 Fixed in master: ea0f283ffdbb492ea7f2d22e79c57ca7ae6ca181 Fixed in release-1.7: aa728c53281b35aecd37d9a158ad61f1015f7a28 > Fix errors in tableApi.md and functions.md > -- > > Key: FLINK-11331 > URL: https://issues.apache.org/jira/browse/FLINK-11331 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Affects Versions: 1.7.0, 1.7.1 >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > Fix For: 1.7.2, 1.8.0 > > Time Spent: 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] asfgit closed pull request #7494: [FLINK-11331][table][docs] Fix errors in tableApi.md and functions.md
asfgit closed pull request #7494: [FLINK-11331][table][docs] Fix errors in tableApi.md and functions.md URL: https://github.com/apache/flink/pull/7494 As this is a foreign pull request (from a fork), the diff has been sent to your commit mailing list, comm...@flink.apache.org This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] eaglewatcherwb commented on issue #7474: [FLINK-11295][configuration] Rename configuration options of queryable-state from query.x to queryable-state.x
eaglewatcherwb commented on issue #7474: [FLINK-11295][configuration] Rename configuration options of queryable-state from query.x to queryable-state.x URL: https://github.com/apache/flink/pull/7474#issuecomment-454672144 > Ah, I notifed that some tests in `flink-end-to-end-tests` still use the old config options. This wouldn't break, because we have the deprecated options still but you should update them as well. OK, I update them in the whole project, including docs and template as well. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sunjincheng121 commented on issue #7494: [FLINK-11331][table][docs] Fix errors in tableApi.md and functions.md
sunjincheng121 commented on issue #7494: [FLINK-11331][table][docs] Fix errors in tableApi.md and functions.md URL: https://github.com/apache/flink/pull/7494#issuecomment-454670415 Merging... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Asura7969 edited a comment on issue #7500: [FLINK-11341][doc]Correct javadoc of AkkaUtils#getAkkaConfig
Asura7969 edited a comment on issue #7500: [FLINK-11341][doc]Correct javadoc of AkkaUtils#getAkkaConfig URL: https://github.com/apache/flink/pull/7500#issuecomment-454666854 @TisonKun thanks This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Asura7969 opened a new pull request #7500: [FLINK-11341][doc]Correct javadoc of AkkaUtils#getAkkaConfig
Asura7969 opened a new pull request #7500: [FLINK-11341][doc]Correct javadoc of AkkaUtils#getAkkaConfig URL: https://github.com/apache/flink/pull/7500 https://jira.apache.org/jira/browse/FLINK-11341 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Asura7969 removed a comment on issue #7500: [FLINK-11341][doc]Correct javadoc of AkkaUtils#getAkkaConfig
Asura7969 removed a comment on issue #7500: [FLINK-11341][doc]Correct javadoc of AkkaUtils#getAkkaConfig URL: https://github.com/apache/flink/pull/7500#issuecomment-454666894 thanks This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Asura7969 commented on issue #7500: [FLINK-11341][doc]Correct javadoc of AkkaUtils#getAkkaConfig
Asura7969 commented on issue #7500: [FLINK-11341][doc]Correct javadoc of AkkaUtils#getAkkaConfig URL: https://github.com/apache/flink/pull/7500#issuecomment-454666894 thanks This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Asura7969 closed pull request #7500: [FLINK-11341][doc]Correct javadoc of AkkaUtils#getAkkaConfig
Asura7969 closed pull request #7500: [FLINK-11341][doc]Correct javadoc of AkkaUtils#getAkkaConfig URL: https://github.com/apache/flink/pull/7500 As this is a foreign pull request (from a fork), the diff has been sent to your commit mailing list, comm...@flink.apache.org This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11341) Correct javadoc of AkkaUtils#getAkkaConfig
[ https://issues.apache.org/jira/browse/FLINK-11341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gongwenzhou updated FLINK-11341: Summary: Correct javadoc of AkkaUtils#getAkkaConfig (was: Javadoc description does not match the parameter name) > Correct javadoc of AkkaUtils#getAkkaConfig > -- > > Key: FLINK-11341 > URL: https://issues.apache.org/jira/browse/FLINK-11341 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Affects Versions: 1.7.1 >Reporter: Gongwenzhou >Priority: Trivial > Labels: pull-request-available > Fix For: 1.7.1 > > Time Spent: 20m > Remaining Estimate: 0h > > Javadoc description does not match the parameter name -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] Asura7969 commented on issue #7500: [FLINK-11341][doc]Correct javadoc of AkkaUtils#getAkkaConfig
Asura7969 commented on issue #7500: [FLINK-11341][doc]Correct javadoc of AkkaUtils#getAkkaConfig URL: https://github.com/apache/flink/pull/7500#issuecomment-454666854 thanks This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] TisonKun edited a comment on issue #7500: [FLINK-11341][doc]Javadoc description does not match the parameter name
TisonKun edited a comment on issue #7500: [FLINK-11341][doc]Javadoc description does not match the parameter name URL: https://github.com/apache/flink/pull/7500#issuecomment-454666377 Besides, you can concrete the title(also of the corresponding JIRA) as "Correct javadoc of AkkaUtils#getAkkaConfig". "Javadoc description does not match the parameter " is too general IMO This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] TisonKun commented on issue #7500: [FLINK-11341][doc]Javadoc description does not match the parameter name
TisonKun commented on issue #7500: [FLINK-11341][doc]Javadoc description does not match the parameter name URL: https://github.com/apache/flink/pull/7500#issuecomment-454666377 Besides, you can concrete the title(also of the corresponding JIRA) as "Correct javadoc of AkkaUtils.scala". "Javadoc description does not match the parameter " is too general IMO This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] TisonKun commented on issue #7500: [FLINK-11341][doc]Javadoc description does not match the parameter name
TisonKun commented on issue #7500: [FLINK-11341][doc]Javadoc description does not match the parameter name URL: https://github.com/apache/flink/pull/7500#issuecomment-454666220 @Asura7969 I think @klion26 means the description above, i.e., the GitHub PR description This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Asura7969 commented on issue #7500: [FLINK-11341][doc]Javadoc description does not match the parameter name
Asura7969 commented on issue #7500: [FLINK-11341][doc]Javadoc description does not match the parameter name URL: https://github.com/apache/flink/pull/7500#issuecomment-454665918 here? "Asura7969:master" or "Javadoc description does not match the parameter name" This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] klion26 closed pull request #6558: [FLINK-9116] Introduce getAll and removeAll for MapState
klion26 closed pull request #6558: [FLINK-9116] Introduce getAll and removeAll for MapState URL: https://github.com/apache/flink/pull/6558 As this is a foreign pull request (from a fork), the diff has been sent to your commit mailing list, comm...@flink.apache.org This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] klion26 commented on issue #7500: [FLINK-11341][doc]Javadoc description does not match the parameter name
klion26 commented on issue #7500: [FLINK-11341][doc]Javadoc description does not match the parameter name URL: https://github.com/apache/flink/pull/7500#issuecomment-454664407 @Asura7969 you can also update the description :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on issue #7494: [FLINK-11331][table][docs] Fix errors in tableApi.md and functions.md
hequn8128 commented on issue #7494: [FLINK-11331][table][docs] Fix errors in tableApi.md and functions.md URL: https://github.com/apache/flink/pull/7494#issuecomment-454661373 @sunjincheng121 Thanks for the nice suggestion. I create another [issue](https://issues.apache.org/jira/browse/FLINK-11342) to address the example problem of built-in udfs. I think it would be very helpful if we have example for every function. Looking forward to have your suggestions. Best, Hequn This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sunjincheng121 edited a comment on issue #7494: [FLINK-11331][table][docs] Fix errors in tableApi.md and functions.md
sunjincheng121 edited a comment on issue #7494: [FLINK-11331][table][docs] Fix errors in tableApi.md and functions.md URL: https://github.com/apache/flink/pull/7494#issuecomment-454660718 Thanks for the update! @hequn8128 Add thanks for create the FLINK-11342. pretty cool! this PR will be merged. Bests, Jincheng This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Asura7969 commented on issue #7500: [FLINK-11341]Javadoc description does not match the parameter name
Asura7969 commented on issue #7500: [FLINK-11341]Javadoc description does not match the parameter name URL: https://github.com/apache/flink/pull/7500#issuecomment-454661042 Does this meet the requirements? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Asura7969 opened a new pull request #7500: [FLINK-11341]Javadoc description does not match the parameter name
Asura7969 opened a new pull request #7500: [FLINK-11341]Javadoc description does not match the parameter name URL: https://github.com/apache/flink/pull/7500 https://jira.apache.org/jira/browse/FLINK-11341 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sunjincheng121 commented on issue #7494: [FLINK-11331][table][docs] Fix errors in tableApi.md and functions.md
sunjincheng121 commented on issue #7494: [FLINK-11331][table][docs] Fix errors in tableApi.md and functions.md URL: https://github.com/apache/flink/pull/7494#issuecomment-454660718 Thanks for the update! @hequn8128 will be merged. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11342) Add example for every built-In TableAPI Function
[ https://issues.apache.org/jira/browse/FLINK-11342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hequn Cheng updated FLINK-11342: Description: There are a lot of built-in tableApi functions: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions.html Take {{TEMPORAL.extract(TIMEINTERVALUNIT)}} as an example, we have an example for it. {code:java} E.g., '2006-06-05'.toDate.extract(DAY) returns 5; '2006-06-05'.toDate.extract(QUARTER) returns 2. {code} The example is very helpful for users who are not familiar with the udf. And I think it would be great if we can add an example for every built-in function. This Jira issue is an umbrella issue. As there are kinds of built-in functions, I think it would be better to create more subtasks for it. For example, add a subtask for adding examples for built-in Comparison Functions. was: There are a lot of built-in tableApi functions: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions.html Take {{TEMPORAL.extract(TIMEINTERVALUNIT)}} as an example, we have an example for it. {code:java} E.g., '2006-06-05'.toDate.extract(DAY) returns 5; '2006-06-05'.toDate.extract(QUARTER) returns 2. {code} The example is very helpful for users who are not familiar with the udf. And I think it would be great if we can add an example for every built-in function. > Add example for every built-In TableAPI Function > - > > Key: FLINK-11342 > URL: https://issues.apache.org/jira/browse/FLINK-11342 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Affects Versions: 1.7.0, 1.7.1 >Reporter: Hequn Cheng >Priority: Major > > There are a lot of built-in tableApi functions: > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions.html > Take {{TEMPORAL.extract(TIMEINTERVALUNIT)}} as an example, we have an example > for it. > {code:java} > E.g., '2006-06-05'.toDate.extract(DAY) returns 5; > '2006-06-05'.toDate.extract(QUARTER) returns 2. > {code} > The example is very helpful for users who are not familiar with the udf. And > I think it would be great if we can add an example for every built-in > function. > This Jira issue is an umbrella issue. As there are kinds of built-in > functions, I think it would be better to create more subtasks for it. For > example, add a subtask for adding examples for built-in Comparison Functions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] Asura7969 closed pull request #7500: [FLINK-11341]Javadoc description does not match the parameter name
Asura7969 closed pull request #7500: [FLINK-11341]Javadoc description does not match the parameter name URL: https://github.com/apache/flink/pull/7500 As this is a foreign pull request (from a fork), the diff has been sent to your commit mailing list, comm...@flink.apache.org This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-11342) Add example for every built-In TableAPI Function
Hequn Cheng created FLINK-11342: --- Summary: Add example for every built-In TableAPI Function Key: FLINK-11342 URL: https://issues.apache.org/jira/browse/FLINK-11342 Project: Flink Issue Type: Improvement Components: Table API SQL Affects Versions: 1.7.1, 1.7.0 Reporter: Hequn Cheng There are a lot of built-in tableApi functions: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions.html Take {{TEMPORAL.extract(TIMEINTERVALUNIT)}} as an example, we have an example for it. {code:java} E.g., '2006-06-05'.toDate.extract(DAY) returns 5; '2006-06-05'.toDate.extract(QUARTER) returns 2. {code} The example is very helpful for users who are not familiar with the udf. And I think it would be great if we can add an example for every built-in function. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11341) Javadoc description does not match the parameter name
[ https://issues.apache.org/jira/browse/FLINK-11341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gongwenzhou updated FLINK-11341: External issue URL: (was: https://github.com/apache/flink/pull/7500) > Javadoc description does not match the parameter name > - > > Key: FLINK-11341 > URL: https://issues.apache.org/jira/browse/FLINK-11341 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Affects Versions: 1.7.1 >Reporter: Gongwenzhou >Priority: Trivial > Fix For: 1.7.1 > > > Javadoc description does not match the parameter name -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11341) Javadoc description does not match the parameter name
[ https://issues.apache.org/jira/browse/FLINK-11341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gongwenzhou updated FLINK-11341: External issue URL: (was: https://github.com/apache/flink/pull/7500) External issue ID: (was: #7500) > Javadoc description does not match the parameter name > - > > Key: FLINK-11341 > URL: https://issues.apache.org/jira/browse/FLINK-11341 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Affects Versions: 1.7.1 >Reporter: Gongwenzhou >Priority: Trivial > Labels: pull-request-available > Fix For: 1.7.1 > > > Javadoc description does not match the parameter name -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11341) Javadoc description does not match the parameter name
[ https://issues.apache.org/jira/browse/FLINK-11341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gongwenzhou updated FLINK-11341: Docs Text: (was: https://github.com/apache/flink/pull/7500) External issue URL: https://github.com/apache/flink/pull/7500 External issue ID: #7500 Labels: pull-request-available (was: ) > Javadoc description does not match the parameter name > - > > Key: FLINK-11341 > URL: https://issues.apache.org/jira/browse/FLINK-11341 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Affects Versions: 1.7.1 >Reporter: Gongwenzhou >Priority: Trivial > Labels: pull-request-available > Fix For: 1.7.1 > > > Javadoc description does not match the parameter name -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11341) Javadoc description does not match the parameter name
[ https://issues.apache.org/jira/browse/FLINK-11341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gongwenzhou updated FLINK-11341: Docs Text: https://github.com/apache/flink/pull/7500 > Javadoc description does not match the parameter name > - > > Key: FLINK-11341 > URL: https://issues.apache.org/jira/browse/FLINK-11341 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Affects Versions: 1.7.1 >Reporter: Gongwenzhou >Priority: Trivial > Fix For: 1.7.1 > > > Javadoc description does not match the parameter name -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11341) Javadoc description does not match the parameter name
[ https://issues.apache.org/jira/browse/FLINK-11341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gongwenzhou updated FLINK-11341: External issue URL: https://github.com/apache/flink/pull/7500 > Javadoc description does not match the parameter name > - > > Key: FLINK-11341 > URL: https://issues.apache.org/jira/browse/FLINK-11341 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Affects Versions: 1.7.1 >Reporter: Gongwenzhou >Priority: Trivial > Fix For: 1.7.1 > > > Javadoc description does not match the parameter name -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11341) Javadoc description does not match the parameter name
Gongwenzhou created FLINK-11341: --- Summary: Javadoc description does not match the parameter name Key: FLINK-11341 URL: https://issues.apache.org/jira/browse/FLINK-11341 Project: Flink Issue Type: Improvement Components: Local Runtime Affects Versions: 1.7.1 Reporter: Gongwenzhou Fix For: 1.7.1 Javadoc description does not match the parameter name -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] klion26 commented on issue #7500: Javadoc description does not match the parameter name
klion26 commented on issue #7500: Javadoc description does not match the parameter name URL: https://github.com/apache/flink/pull/7500#issuecomment-454654385 @Asura7969 thank you for your contribution. Please update the title & description, you can find examples [here](https://github.com/apache/flink/pulls), and the [Contributing Code](https://flink.apache.org/contribute-code.html) for reference. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #7494: [FLINK-11331][table][docs] Fix errors in tableApi.md and functions.md
hequn8128 commented on a change in pull request #7494: [FLINK-11331][table][docs] Fix errors in tableApi.md and functions.md URL: https://github.com/apache/flink/pull/7494#discussion_r248153101 ## File path: docs/dev/table/functions.md ## @@ -3843,6 +3877,18 @@ NUMERIC.months + + +{% highlight scala %} +NUMERIC.week +NUMERIC.weeks +{% endhighlight %} + + +Creates an interval of milliseconds for NUMERIC weeks. Review comment: Good point! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sunjincheng121 commented on a change in pull request #7286: [FLINK-8739] [table] Optimize DISTINCT aggregates to use the same distinct accumulator if possible
sunjincheng121 commented on a change in pull request #7286: [FLINK-8739] [table] Optimize DISTINCT aggregates to use the same distinct accumulator if possible URL: https://github.com/apache/flink/pull/7286#discussion_r248148523 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala ## @@ -553,6 +384,20 @@ class HarnessTestBase extends StreamingWithStateTestBase { stateField.get(generatedAggregation).asInstanceOf[DataView] } + def getGeneratedAggregationFields( + operator: AbstractUdfStreamOperator[_, _], + funcName: String, + funcClass: Class[_]): Array[Field] = { +val function = funcClass.getDeclaredField(funcName) +function.setAccessible(true) +val generatedAggregation = + function.get(operator.getUserFunction).asInstanceOf[GeneratedAggregations] Review comment: Yes, I have had find that part. thanks! :-) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sunjincheng121 commented on a change in pull request #7494: [FLINK-11331][table][docs] Fix errors in tableApi.md and functions.md
sunjincheng121 commented on a change in pull request #7494: [FLINK-11331][table][docs] Fix errors in tableApi.md and functions.md URL: https://github.com/apache/flink/pull/7494#discussion_r248148147 ## File path: docs/dev/table/functions.md ## @@ -3843,6 +3877,18 @@ NUMERIC.months + + +{% highlight scala %} +NUMERIC.week +NUMERIC.weeks +{% endhighlight %} + + +Creates an interval of milliseconds for NUMERIC weeks. Review comment: We can add an example: e.g.: E.g., 1.weeks returns 60480 ms. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dianfu commented on a change in pull request #7286: [FLINK-8739] [table] Optimize DISTINCT aggregates to use the same distinct accumulator if possible
dianfu commented on a change in pull request #7286: [FLINK-8739] [table] Optimize DISTINCT aggregates to use the same distinct accumulator if possible URL: https://github.com/apache/flink/pull/7286#discussion_r248148152 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala ## @@ -553,6 +384,20 @@ class HarnessTestBase extends StreamingWithStateTestBase { stateField.get(generatedAggregation).asInstanceOf[DataView] } + def getGeneratedAggregationFields( + operator: AbstractUdfStreamOperator[_, _], + funcName: String, + funcClass: Class[_]): Array[Field] = { +val function = funcClass.getDeclaredField(funcName) +function.setAccessible(true) +val generatedAggregation = + function.get(operator.getUserFunction).asInstanceOf[GeneratedAggregations] Review comment: Yes, we should have such test cases and there are such kind of check in GroupAggregateHarnessTest.testDistinctAggregateWithRetract and GroupAggregateHarnessTest.testDistinctAggregateWithDifferentArgumentOrder This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dianfu commented on issue #7286: [FLINK-8739] [table] Optimize DISTINCT aggregates to use the same distinct accumulator if possible
dianfu commented on issue #7286: [FLINK-8739] [table] Optimize DISTINCT aggregates to use the same distinct accumulator if possible URL: https://github.com/apache/flink/pull/7286#issuecomment-454647436 @sunjincheng121 Thanks a lot for the review. Updated the PR accordingly. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] wujinhu commented on issue #7384: [FLINK-11012] Introduce abstract superclass for filesystem IT cases
wujinhu commented on issue #7384: [FLINK-11012] Introduce abstract superclass for filesystem IT cases URL: https://github.com/apache/flink/pull/7384#issuecomment-454645527 Test results from this PR: 1. Ran tests except flink-s3-fs-presto moudle MyMacBook:flink-filesystems wujinhu$ IT_CASE_S3_BUCKET=...IT_CASE_S3_ACCESS_KEY= mvn clean install -pl '!flink-s3-fs-presto' [INFO] [INFO] Reactor Summary: [INFO] [INFO] flink-filesystems .. SUCCESS [ 3.019 s] [INFO] flink-hadoop-fs SUCCESS [ 10.914 s] [INFO] flink-mapr-fs .. SUCCESS [ 3.593 s] [INFO] flink-filesystems :: flink-fs-hadoop-shaded SUCCESS [ 6.652 s] [INFO] flink-s3-fs-base ... SUCCESS [ 10.916 s] [INFO] flink-s3-fs-hadoop . SUCCESS [03:58 min] [INFO] flink-swift-fs-hadoop .. SUCCESS [ 19.842 s] [INFO] flink-oss-fs-hadoop SUCCESS [ 15.325 s] [INFO] [INFO] BUILD SUCCESS [INFO] [INFO] Total time: 05:09 min [INFO] Finished at: 2019-01-16T12:10:22+08:00 [INFO] Final Memory: 101M/1495M [INFO] 2. Run tests for flink-s3-fs-presto moudle MyMacBook:flink-s3-fs-presto wujinhu$ IT_CASE_S3_BUCKET=...IT_CASE_S3_ACCESS_KEY= mvn clean install [INFO] --- [INFO] T E S T S [INFO] --- [INFO] Running org.apache.flink.fs.s3presto.PrestoS3FileSystemBehaviorITCase [INFO] Running org.apache.flink.fs.s3presto.PrestoS3FileSystemITCase [WARNING] Tests run: 8, Failures: 0, Errors: 0, Skipped: 2, Time elapsed: 11.915 s - in org.apache.flink.fs.s3presto.PrestoS3FileSystemBehaviorITCase [INFO] Tests run: 6, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 41.194 s - in org.apache.flink.fs.s3presto.PrestoS3FileSystemITCase [INFO] [INFO] Results: [INFO] [WARNING] Tests run: 14, Failures: 0, Errors: 0, Skipped: 2 [INFO] [INFO] . [INFO] [INFO] BUILD SUCCESS [INFO] [INFO] Total time: 01:03 min [INFO] Finished at: 2019-01-16T12:11:47+08:00 [INFO] Final Memory: 66M/1512M [INFO] This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dianfu commented on a change in pull request #7286: [FLINK-8739] [table] Optimize DISTINCT aggregates to use the same distinct accumulator if possible
dianfu commented on a change in pull request #7286: [FLINK-8739] [table] Optimize DISTINCT aggregates to use the same distinct accumulator if possible URL: https://github.com/apache/flink/pull/7286#discussion_r248146739 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala ## @@ -119,6 +119,39 @@ class AggregateITCase extends StreamingWithStateTestBase { assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) } + @Test + def testDistinctAGGWithDifferentArgumentOrder(): Unit = { Review comment: Make sense. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Asura7969 opened a new pull request #7500: Javadoc description does not match the parameter name
Asura7969 opened a new pull request #7500: Javadoc description does not match the parameter name URL: https://github.com/apache/flink/pull/7500 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] lamber-ken commented on issue #5930: [FLINK-9263][state] Fix concurrency problem in DefaultOperatorStateBackend.
lamber-ken commented on issue #5930: [FLINK-9263][state] Fix concurrency problem in DefaultOperatorStateBackend. URL: https://github.com/apache/flink/pull/5930#issuecomment-454644307 @sihuazhou :+1: , I met this problem in flink-1.4.2 version This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhuzhurk commented on a change in pull request #7255: [FLINK-10945] Use InputDependencyConstraint to avoid resource dead…
zhuzhurk commented on a change in pull request #7255: [FLINK-10945] Use InputDependencyConstraint to avoid resource dead… URL: https://github.com/apache/flink/pull/7255#discussion_r248145667 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ## @@ -746,23 +750,16 @@ public boolean checkInputDependencyConstraints() { } /** -* An input is consumable when -* 1. the source result is PIPELINED and one of the result partition has produced data. -* 2. the source result is BLOCKING and is FINISHED(all partitions are FINISHED). +* Get whether an input of the vertex is consumable. +* An input is consumable when when any partition in it is consumable. +* +* Note that a BLOCKING result partition is only consumable when all partitions in the result are FINISHED. * * @return whether the input is consumable */ public boolean isInputConsumable(int inputNumber) { Review comment: Changed it to default scope as it will be invoked by `Execution` and tests. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhuzhurk commented on a change in pull request #7255: [FLINK-10945] Use InputDependencyConstraint to avoid resource dead…
zhuzhurk commented on a change in pull request #7255: [FLINK-10945] Use InputDependencyConstraint to avoid resource dead… URL: https://github.com/apache/flink/pull/7255#discussion_r248145662 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ## @@ -689,7 +693,7 @@ void scheduleOrUpdateConsumers(ResultPartitionID partitionId) { if (partition.getIntermediateResult().getResultType().isPipelined()) { // Schedule or update receivers of this partition - partition.markSomePipelinedDataProduced(); + partition.markDataProduced(); Review comment: Sure, `partition.markDataProduced` is fine for BLOCKING result. I moved it out from the `if` clause. In my understanding, the `ExecutionVertex.scheduleOrUpdateConsumers` is for PIPELINED partition currently. Only PIPELINED partition will send the `scheduleOrUpdateConsumers` message and the method will throw exception if the partition is BLOCKING. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhuzhurk commented on a change in pull request #7255: [FLINK-10945] Use InputDependencyConstraint to avoid resource dead…
zhuzhurk commented on a change in pull request #7255: [FLINK-10945] Use InputDependencyConstraint to avoid resource dead… URL: https://github.com/apache/flink/pull/7255#discussion_r248145670 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ## @@ -736,7 +740,7 @@ void sendPartitionInfos() { * @return whether the input constraint is satisfied */ public boolean checkInputDependencyConstraints() { Review comment: Changed it to default scope as it will be invoked by tests. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sunjincheng121 edited a comment on issue #7366: [hotfix][docs][table] Fix typo in User-defined Functions
sunjincheng121 edited a comment on issue #7366: [hotfix][docs][table] Fix typo in User-defined Functions URL: https://github.com/apache/flink/pull/7366#issuecomment-454643541 Thanks @KarmaGYZ and @hequn8128,` udfs.md` also has such a mistake. And I recommended to search globally for associated docs in the IDE. +1 for @hequn8128's suggestion. Thanks, Jincheng, This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sunjincheng121 commented on issue #7366: [hotfix][docs][table] Fix typo in User-defined Functions
sunjincheng121 commented on issue #7366: [hotfix][docs][table] Fix typo in User-defined Functions URL: https://github.com/apache/flink/pull/7366#issuecomment-454643541 Thanks @KarmaGYZ and @hequn8128,` udfs.md` also has such a mistake. And iI recommended to search globally for associated docs in the IDE. +1 for @hequn8128's suggestion. Thanks, Jincheng, This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] intsmaze commented on issue #7490: restoreFunctionState() add parameter getOperatorStateBackend()
intsmaze commented on issue #7490: restoreFunctionState() add parameter getOperatorStateBackend() URL: https://github.com/apache/flink/pull/7490#issuecomment-454635907 > > > -1 > > I would like to avoid these kind of standalone refactorings, unless the change is under some context which makes it necessary. > > Moreover, I think the `snapshotFunctionState` has the `OperatorStateBackend` parameter because the snapshot context doesn't provide access to the state stores. OTOH, the `StateInitializationContext` does provide access to the state stores already. Thank you for your suggestion, I will close the comment later. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] intsmaze removed a comment on issue #7490: restoreFunctionState() add parameter getOperatorStateBackend()
intsmaze removed a comment on issue #7490: restoreFunctionState() add parameter getOperatorStateBackend() URL: https://github.com/apache/flink/pull/7490#issuecomment-454635813 Thank you for your suggestion, I will close the comment later. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] intsmaze commented on issue #7490: restoreFunctionState() add parameter getOperatorStateBackend()
intsmaze commented on issue #7490: restoreFunctionState() add parameter getOperatorStateBackend() URL: https://github.com/apache/flink/pull/7490#issuecomment-454635813 Thank you for your suggestion, I will close the comment later. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] wujinhu edited a comment on issue #7384: [FLINK-11012] Introduce abstract superclass for filesystem IT cases
wujinhu edited a comment on issue #7384: [FLINK-11012] Introduce abstract superclass for filesystem IT cases URL: https://github.com/apache/flink/pull/7384#issuecomment-454632574 @StefanRRichter I found an issue when I run mvn clean install with s3 credentials, it fails. [INFO] [INFO] force-shading .. SUCCESS [ 1.742 s] [INFO] flink .. SUCCESS [ 8.802 s] [INFO] flink-annotations .. SUCCESS [ 1.359 s] [INFO] flink-shaded-hadoop SUCCESS [ 0.173 s] [INFO] flink-shaded-hadoop2 ... SUCCESS [ 12.045 s] [INFO] flink-shaded-hadoop2-uber .. SUCCESS [ 17.859 s] [INFO] flink-shaded-yarn-tests SUCCESS [ 12.220 s] [INFO] flink-shaded-curator ... SUCCESS [ 0.812 s] [INFO] flink-metrics .. SUCCESS [ 0.108 s] [INFO] flink-metrics-core . SUCCESS [ 2.664 s] [INFO] flink-test-utils-parent SUCCESS [ 0.124 s] [INFO] flink-test-utils-junit . SUCCESS [ 1.834 s] [INFO] flink-core . SUCCESS [01:15 min] [INFO] flink-java . SUCCESS [ 29.085 s] [INFO] flink-queryable-state .. SUCCESS [ 0.105 s] [INFO] flink-queryable-state-client-java .. SUCCESS [ 2.713 s] [INFO] flink-filesystems .. SUCCESS [ 0.125 s] [INFO] flink-hadoop-fs SUCCESS [ 11.096 s] [INFO] flink-runtime .. SUCCESS [12:00 min] [INFO] flink-scala SUCCESS [01:09 min] [INFO] flink-mapr-fs .. SUCCESS [ 3.911 s] [INFO] flink-filesystems :: flink-fs-hadoop-shaded SUCCESS [ 5.436 s] [INFO] flink-s3-fs-base ... SUCCESS [ 10.906 s] [INFO] flink-s3-fs-hadoop . SUCCESS [04:04 min] [INFO] flink-s3-fs-presto . FAILURE [ 22.152 s] [INFO] flink-swift-fs-hadoop .. SKIPPED Error message: [ERROR] Errors: [ERROR] PrestoS3FileSystemBehaviorITCase>FileSystemBehaviorTestSuite.cleanup:81 » NoSuchMethod [ERROR] PrestoS3FileSystemBehaviorITCase>FileSystemBehaviorTestSuite.cleanup:81 » NoSuchMethod [ERROR] PrestoS3FileSystemBehaviorITCase>FileSystemBehaviorTestSuite.cleanup:81 » NoSuchMethod [ERROR] org.apache.flink.fs.s3presto.PrestoS3FileSystemBehaviorITCase.testMkdirsFailsForExistingFile(org.apache.flink.fs.s3presto.PrestoS3FileSystemBehaviorITCase) [ERROR] Run 1: PrestoS3FileSystemBehaviorITCase>FileSystemBehaviorTestSuite.testMkdirsFailsForExistingFile:154->FileSystemBehaviorTestSuite.assumeNotObjectStore:207 » AssumptionViolated [ERROR] Run 2: PrestoS3FileSystemBehaviorITCase>FileSystemBehaviorTestSuite.cleanup:81 » NoSuchMethod [INFO] [ERROR] org.apache.flink.fs.s3presto.PrestoS3FileSystemBehaviorITCase.testMkdirsFailsWithExistingParentFile(org.apache.flink.fs.s3presto.PrestoS3FileSystemBehaviorITCase) [ERROR] Run 1: PrestoS3FileSystemBehaviorITCase>FileSystemBehaviorTestSuite.testMkdirsFailsWithExistingParentFile:172->FileSystemBehaviorTestSuite.assumeNotObjectStore:207 » AssumptionViolated [ERROR] Run 2: PrestoS3FileSystemBehaviorITCase>FileSystemBehaviorTestSuite.cleanup:81 » NoSuchMethod [INFO] [ERROR] org.apache.flink.fs.s3presto.PrestoS3FileSystemBehaviorITCase.testMkdirsReturnsTrueForExistingDirectory(org.apache.flink.fs.s3presto.PrestoS3FileSystemBehaviorITCase) [ERROR] Run 1: PrestoS3FileSystemBehaviorITCase>FileSystemBehaviorTestSuite.testMkdirsReturnsTrueForExistingDirectory:145->FileSystemBehaviorTestSuite.createRandomFileInDirectory:203->FileSystemBehaviorTestSuite.createFile:196 » NoSuchMethod [ERROR] Run 2: PrestoS3FileSystemBehaviorITCase>FileSystemBehaviorTestSuite.cleanup:81 » NoSuchMethod [INFO] [ERROR] PrestoS3FileSystemBehaviorITCase>FileSystemBehaviorTestSuite.cleanup:81 » NoSuchMethod [ERROR] PrestoS3FileSystemBehaviorITCase>FileSystemBehaviorTestSuite.cleanup:81 » NoSuchMethod [ERROR] PrestoS3FileSystemITCase.testConfigKeysForwarding:103 » NoSuchMethod com.googl... [ERROR] PrestoS3FileSystemITCase.testConfigKeysForwarding:103 » NoSuchMethod com.googl... [ERROR] PrestoS3FileSystemITCase.testDirectoryListing:191 » NoSuchMethod com.google.co... [ERROR] PrestoS3FileSystemITCase.testDirectoryListing:191 » NoSuchMethod com.google.co... [ERROR] PrestoS3FileSystemITCase.testSimpleFileWriteAndRead:171 »
[GitHub] wujinhu commented on issue #7384: [FLINK-11012] Introduce abstract superclass for filesystem IT cases
wujinhu commented on issue #7384: [FLINK-11012] Introduce abstract superclass for filesystem IT cases URL: https://github.com/apache/flink/pull/7384#issuecomment-454632574 @StefanRRichter I found an issue when I run mvn clean install with s3 credentials, it fails. [INFO] [INFO] force-shading .. SUCCESS [ 1.742 s] [INFO] flink .. SUCCESS [ 8.802 s] [INFO] flink-annotations .. SUCCESS [ 1.359 s] [INFO] flink-shaded-hadoop SUCCESS [ 0.173 s] [INFO] flink-shaded-hadoop2 ... SUCCESS [ 12.045 s] [INFO] flink-shaded-hadoop2-uber .. SUCCESS [ 17.859 s] [INFO] flink-shaded-yarn-tests SUCCESS [ 12.220 s] [INFO] flink-shaded-curator ... SUCCESS [ 0.812 s] [INFO] flink-metrics .. SUCCESS [ 0.108 s] [INFO] flink-metrics-core . SUCCESS [ 2.664 s] [INFO] flink-test-utils-parent SUCCESS [ 0.124 s] [INFO] flink-test-utils-junit . SUCCESS [ 1.834 s] [INFO] flink-core . SUCCESS [01:15 min] [INFO] flink-java . SUCCESS [ 29.085 s] [INFO] flink-queryable-state .. SUCCESS [ 0.105 s] [INFO] flink-queryable-state-client-java .. SUCCESS [ 2.713 s] [INFO] flink-filesystems .. SUCCESS [ 0.125 s] [INFO] flink-hadoop-fs SUCCESS [ 11.096 s] [INFO] flink-runtime .. SUCCESS [12:00 min] [INFO] flink-scala SUCCESS [01:09 min] [INFO] flink-mapr-fs .. SUCCESS [ 3.911 s] [INFO] flink-filesystems :: flink-fs-hadoop-shaded SUCCESS [ 5.436 s] [INFO] flink-s3-fs-base ... SUCCESS [ 10.906 s] [INFO] flink-s3-fs-hadoop . SUCCESS [04:04 min] [INFO] flink-s3-fs-presto . FAILURE [ 22.152 s] [INFO] flink-swift-fs-hadoop .. SKIPPED Error message: [ERROR] Errors: [ERROR] PrestoS3FileSystemBehaviorITCase>FileSystemBehaviorTestSuite.cleanup:81 » NoSuchMethod [ERROR] PrestoS3FileSystemBehaviorITCase>FileSystemBehaviorTestSuite.cleanup:81 » NoSuchMethod [ERROR] PrestoS3FileSystemBehaviorITCase>FileSystemBehaviorTestSuite.cleanup:81 » NoSuchMethod [ERROR] org.apache.flink.fs.s3presto.PrestoS3FileSystemBehaviorITCase.testMkdirsFailsForExistingFile(org.apache.flink.fs.s3presto.PrestoS3FileSystemBehaviorITCase) [ERROR] Run 1: PrestoS3FileSystemBehaviorITCase>FileSystemBehaviorTestSuite.testMkdirsFailsForExistingFile:154->FileSystemBehaviorTestSuite.assumeNotObjectStore:207 » AssumptionViolated [ERROR] Run 2: PrestoS3FileSystemBehaviorITCase>FileSystemBehaviorTestSuite.cleanup:81 » NoSuchMethod [INFO] [ERROR] org.apache.flink.fs.s3presto.PrestoS3FileSystemBehaviorITCase.testMkdirsFailsWithExistingParentFile(org.apache.flink.fs.s3presto.PrestoS3FileSystemBehaviorITCase) [ERROR] Run 1: PrestoS3FileSystemBehaviorITCase>FileSystemBehaviorTestSuite.testMkdirsFailsWithExistingParentFile:172->FileSystemBehaviorTestSuite.assumeNotObjectStore:207 » AssumptionViolated [ERROR] Run 2: PrestoS3FileSystemBehaviorITCase>FileSystemBehaviorTestSuite.cleanup:81 » NoSuchMethod [INFO] [ERROR] org.apache.flink.fs.s3presto.PrestoS3FileSystemBehaviorITCase.testMkdirsReturnsTrueForExistingDirectory(org.apache.flink.fs.s3presto.PrestoS3FileSystemBehaviorITCase) [ERROR] Run 1: PrestoS3FileSystemBehaviorITCase>FileSystemBehaviorTestSuite.testMkdirsReturnsTrueForExistingDirectory:145->FileSystemBehaviorTestSuite.createRandomFileInDirectory:203->FileSystemBehaviorTestSuite.createFile:196 » NoSuchMethod [ERROR] Run 2: PrestoS3FileSystemBehaviorITCase>FileSystemBehaviorTestSuite.cleanup:81 » NoSuchMethod [INFO] [ERROR] PrestoS3FileSystemBehaviorITCase>FileSystemBehaviorTestSuite.cleanup:81 » NoSuchMethod [ERROR] PrestoS3FileSystemBehaviorITCase>FileSystemBehaviorTestSuite.cleanup:81 » NoSuchMethod [ERROR] PrestoS3FileSystemITCase.testConfigKeysForwarding:103 » NoSuchMethod com.googl... [ERROR] PrestoS3FileSystemITCase.testConfigKeysForwarding:103 » NoSuchMethod com.googl... [ERROR] PrestoS3FileSystemITCase.testDirectoryListing:191 » NoSuchMethod com.google.co... [ERROR] PrestoS3FileSystemITCase.testDirectoryListing:191 » NoSuchMethod com.google.co... [ERROR] PrestoS3FileSystemITCase.testSimpleFileWriteAndRead:171 » NoSuchMethod
[GitHub] sunjincheng121 commented on a change in pull request #7286: [FLINK-8739] [table] Optimize DISTINCT aggregates to use the same distinct accumulator if possible
sunjincheng121 commented on a change in pull request #7286: [FLINK-8739] [table] Optimize DISTINCT aggregates to use the same distinct accumulator if possible URL: https://github.com/apache/flink/pull/7286#discussion_r248126964 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala ## @@ -119,6 +119,39 @@ class AggregateITCase extends StreamingWithStateTestBase { assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) } + @Test + def testDistinctAGGWithDifferentArgumentOrder(): Unit = { Review comment: Can we remove this itcase, and improve `testDistinctAggregate` and the different argument order, then we can reduce the it test times. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sunjincheng121 commented on a change in pull request #7286: [FLINK-8739] [table] Optimize DISTINCT aggregates to use the same distinct accumulator if possible
sunjincheng121 commented on a change in pull request #7286: [FLINK-8739] [table] Optimize DISTINCT aggregates to use the same distinct accumulator if possible URL: https://github.com/apache/flink/pull/7286#discussion_r247729153 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala ## @@ -88,9 +93,9 @@ class SqlITCase extends StreamingWithStateTestBase { env.execute() val expected = Seq( - "Hello World,1,9,1970-01-01 00:00:00.014", // window starts at [9L] till {14L} - "Hello,1,16,1970-01-01 00:00:00.021", // window starts at [16L] till {21L}, not merged - "Hello,3,6,1970-01-01 00:00:00.015"// window starts at [1L,2L], + "Hello World,1,9,1,1,1970-01-01 00:00:00.014", // window starts at [9L] till {14L} + "Hello,1,16,1,1,1970-01-01 00:00:00.021", // window starts at [16L] till {21L}, not merged Review comment: File line length exceeds 100 characters. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sunjincheng121 commented on a change in pull request #7286: [FLINK-8739] [table] Optimize DISTINCT aggregates to use the same distinct accumulator if possible
sunjincheng121 commented on a change in pull request #7286: [FLINK-8739] [table] Optimize DISTINCT aggregates to use the same distinct accumulator if possible URL: https://github.com/apache/flink/pull/7286#discussion_r248129709 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala ## @@ -553,6 +384,20 @@ class HarnessTestBase extends StreamingWithStateTestBase { stateField.get(generatedAggregation).asInstanceOf[DataView] } + def getGeneratedAggregationFields( + operator: AbstractUdfStreamOperator[_, _], + funcName: String, + funcClass: Class[_]): Array[Field] = { +val function = funcClass.getDeclaredField(funcName) +function.setAccessible(true) +val generatedAggregation = + function.get(operator.getUserFunction).asInstanceOf[GeneratedAggregations] Review comment: Can we add a test case that check is the member variable of the test `GeneratedAggregations meeting the expectations? i.e: check the number of test variables or `NonWindowedAggregationHelper` which named `acc[Num]_distinctValueMap_dataview` ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sunjincheng121 commented on a change in pull request #7286: [FLINK-8739] [table] Optimize DISTINCT aggregates to use the same distinct accumulator if possible
sunjincheng121 commented on a change in pull request #7286: [FLINK-8739] [table] Optimize DISTINCT aggregates to use the same distinct accumulator if possible URL: https://github.com/apache/flink/pull/7286#discussion_r248131099 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala ## @@ -651,7 +651,7 @@ class MatchCodeGenerator( class AggBuilder(variable: String) { -private val aggregates = new mutable.ListBuffer[RexCall]() +private val aggregates = new mutable.ListBuffer[RexCall]() Review comment: code format: remove extra spaces. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sunjincheng121 commented on a change in pull request #7286: [FLINK-8739] [table] Optimize DISTINCT aggregates to use the same distinct accumulator if possible
sunjincheng121 commented on a change in pull request #7286: [FLINK-8739] [table] Optimize DISTINCT aggregates to use the same distinct accumulator if possible URL: https://github.com/apache/flink/pull/7286#discussion_r248130185 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ## @@ -1308,6 +1335,7 @@ object AggregateUtil { * * @param aggregateCalls calcite's aggregate function * @param aggregateInputType input type of given aggregates +* @param inputFieldsCount number of input fields, Review comment: remove the `,`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] EronWright commented on issue #7389: [FLINK-11237] [table] Enhance LocalExecutor to wrap TableEnvironment w/ classloader
EronWright commented on issue #7389: [FLINK-11237] [table] Enhance LocalExecutor to wrap TableEnvironment w/ classloader URL: https://github.com/apache/flink/pull/7389#issuecomment-454610732 @twalthr any reservation about merging this? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] EronWright commented on issue #7390: [FLINK-11240] [table] External Catalog Factory and Descriptor
EronWright commented on issue #7390: [FLINK-11240] [table] External Catalog Factory and Descriptor URL: https://github.com/apache/flink/pull/7390#issuecomment-454610562 @twalthr any reservation about merging this? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Fokko edited a comment on issue #7406: [FLINK-11259] Bump Zookeeper to 3.4.13
Fokko edited a comment on issue #7406: [FLINK-11259] Bump Zookeeper to 3.4.13 URL: https://github.com/apache/flink/pull/7406#issuecomment-454593135 `Kafka09SecuredRunITCase` is failing. Let's see which minor patch of Zookeeper fails the build. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Fokko commented on issue #7406: [FLINK-11259] Bump Zookeeper to 3.4.13
Fokko commented on issue #7406: [FLINK-11259] Bump Zookeeper to 3.4.13 URL: https://github.com/apache/flink/pull/7406#issuecomment-454593135 `Kafka09SecuredRunITCase` is failing. Lets see which minor patch of Zookeeper fails the build. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11339) Bump exec-maven-plugin from 1.5.0 to 1.6.0
[ https://issues.apache.org/jira/browse/FLINK-11339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11339: --- Labels: pull-request-available (was: ) > Bump exec-maven-plugin from 1.5.0 to 1.6.0 > --- > > Key: FLINK-11339 > URL: https://issues.apache.org/jira/browse/FLINK-11339 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Fokko Driesprong >Assignee: Fokko Driesprong >Priority: Major > Labels: pull-request-available > > Bump exec-maven-plugin from 1.5.0 to 1.6.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11338) Bump maven-enforcer-plugin from 3.0.0-M1 to 3.0.0-M2
[ https://issues.apache.org/jira/browse/FLINK-11338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11338: --- Labels: pull-request-available (was: ) > Bump maven-enforcer-plugin from 3.0.0-M1 to 3.0.0-M2 > > > Key: FLINK-11338 > URL: https://issues.apache.org/jira/browse/FLINK-11338 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Fokko Driesprong >Assignee: Fokko Driesprong >Priority: Major > Labels: pull-request-available > > Bump maven-enforcer-plugin from 3.0.0-M1 to 3.0.0-M2 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11340) Bump commons-configuration from 1.7 to 1.10
[ https://issues.apache.org/jira/browse/FLINK-11340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11340: --- Labels: pull-request-available (was: ) > Bump commons-configuration from 1.7 to 1.10 > --- > > Key: FLINK-11340 > URL: https://issues.apache.org/jira/browse/FLINK-11340 > Project: Flink > Issue Type: Improvement > Components: Configuration, Core >Reporter: Fokko Driesprong >Assignee: Fokko Driesprong >Priority: Major > Labels: pull-request-available > > Bump commons-configuration from 1.7 to 1.10 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] Fokko opened a new pull request #7499: [FLINK-11340] Bump commons-configuration from 1.7 to 1.10
Fokko opened a new pull request #7499: [FLINK-11340] Bump commons-configuration from 1.7 to 1.10 URL: https://github.com/apache/flink/pull/7499 https://jira.apache.org/jira/browse/FLINK-11340 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-11340) Bump commons-configuration from 1.7 to 1.10
Fokko Driesprong created FLINK-11340: Summary: Bump commons-configuration from 1.7 to 1.10 Key: FLINK-11340 URL: https://issues.apache.org/jira/browse/FLINK-11340 Project: Flink Issue Type: Improvement Components: Configuration, Core Reporter: Fokko Driesprong Assignee: Fokko Driesprong Bump commons-configuration from 1.7 to 1.10 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] Fokko opened a new pull request #7498: [FLINK-11339] Bump exec-maven-plugin from 1.5.0 to 1.6.0
Fokko opened a new pull request #7498: [FLINK-11339] Bump exec-maven-plugin from 1.5.0 to 1.6.0 URL: https://github.com/apache/flink/pull/7498 https://jira.apache.org/jira/browse/FLINK-11339 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-11339) Bump exec-maven-plugin from 1.5.0 to 1.6.0
Fokko Driesprong created FLINK-11339: Summary: Bump exec-maven-plugin from 1.5.0 to 1.6.0 Key: FLINK-11339 URL: https://issues.apache.org/jira/browse/FLINK-11339 Project: Flink Issue Type: Improvement Components: Build System Reporter: Fokko Driesprong Assignee: Fokko Driesprong Bump exec-maven-plugin from 1.5.0 to 1.6.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] Fokko opened a new pull request #7497: [FLINK-11338] Bump maven-enforcer-plugin from 3.0.0-M1 to 3.0.0-M2
Fokko opened a new pull request #7497: [FLINK-11338] Bump maven-enforcer-plugin from 3.0.0-M1 to 3.0.0-M2 URL: https://github.com/apache/flink/pull/7497 https://jira.apache.org/jira/browse/FLINK-11338 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-11338) Bump maven-enforcer-plugin from 3.0.0-M1 to 3.0.0-M2
Fokko Driesprong created FLINK-11338: Summary: Bump maven-enforcer-plugin from 3.0.0-M1 to 3.0.0-M2 Key: FLINK-11338 URL: https://issues.apache.org/jira/browse/FLINK-11338 Project: Flink Issue Type: Improvement Components: Build System Reporter: Fokko Driesprong Assignee: Fokko Driesprong Bump maven-enforcer-plugin from 3.0.0-M1 to 3.0.0-M2 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] bmeriaux commented on issue #6715: [FLINK-8424] update cassandra and driver version to latest
bmeriaux commented on issue #6715: [FLINK-8424] update cassandra and driver version to latest URL: https://github.com/apache/flink/pull/6715#issuecomment-454577593 yes, it seems caused by the async save, we close the sink before the end of all the request. The solutions seems to do like in the CassandraSinkBase by using a semaphore to know the number of pending async requests and flush them on close. So there is some rework on all the *OutputFormat classes I will not have to do this before some weeks This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] TisonKun commented on issue #7486: [FLINK-11316] [tests] Drop JarFileCreator
TisonKun commented on issue #7486: [FLINK-11316] [tests] Drop JarFileCreator URL: https://github.com/apache/flink/pull/7486#issuecomment-454547534 Thanks @GJL for being an assignee. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Fokko commented on issue #7487: [FLINK-11321] Clarify NPE on fetching nonexistent topic
Fokko commented on issue #7487: [FLINK-11321] Clarify NPE on fetching nonexistent topic URL: https://github.com/apache/flink/pull/7487#issuecomment-454530755 Green on my side: https://travis-ci.org/Fokko/flink/builds/479971865 :-) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] addisonj commented on issue #7460: [FLINK-11187] [s3] Use file over stream for writes
addisonj commented on issue #7460: [FLINK-11187] [s3] Use file over stream for writes URL: https://github.com/apache/flink/pull/7460#issuecomment-454491319 @aljoscha addressed changes, let me know if there is anything else! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7451: [FLINK-11270][build] Do not include hadoop in flink-dist by default
tillrohrmann commented on a change in pull request #7451: [FLINK-11270][build] Do not include hadoop in flink-dist by default URL: https://github.com/apache/flink/pull/7451#discussion_r247995703 ## File path: flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java ## @@ -508,18 +517,40 @@ private static void start(YarnConfiguration conf, String principal, String keyta } System.setProperty("user.home", homeDir.getAbsolutePath()); String uberjarStartLoc = ".."; + + // find flink-dist jar in flink-dist module LOG.info("Trying to locate uberjar in {}", new File(uberjarStartLoc)); - flinkUberjar = findFile(uberjarStartLoc, new RootDirFilenameFilter()); - Assert.assertNotNull("Flink uberjar not found", flinkUberjar); - String flinkDistRootDir = flinkUberjar.getParentFile().getParent(); - flinkLibFolder = flinkUberjar.getParentFile(); // the uberjar is located in lib/ - Assert.assertNotNull("Flink flinkLibFolder not found", flinkLibFolder); + final File originalFlinkDistJar = findFile(uberjarStartLoc, new RootDirFilenameFilter()); + + // copy entirety of distribution into a temporary location + final Path originalFlinkDistRootDir = originalFlinkDistJar.getParentFile().getParentFile().toPath(); + System.out.println("dist=" + originalFlinkDistRootDir); + final Path flinkDistRootDir = tmp.newFolder("tmp_dist_directory").toPath(); + + FileUtils.copyDirectory(originalFlinkDistRootDir.toFile(), flinkDistRootDir.toFile()); + + flinkLibFolder = flinkDistRootDir.resolve("lib").toFile(); Assert.assertTrue("lib folder not found", flinkLibFolder.exists()); Assert.assertTrue("lib folder not found", flinkLibFolder.isDirectory()); + try (Stream libJars = Files.list(flinkLibFolder.toPath())) { + final Optional flinkDistJarOptional = + libJars.map(Path::getFileName) + .map(Path::toString) + .filter(RootDirFilenameFilter::isFlinkDistJar) + .map(fileName -> flinkLibFolder.toPath().resolve(Paths.get(fileName))) + .map(Path::toFile) + .findAny(); + flinkUberjar = flinkDistJarOptional.orElseThrow(() -> new AssertionError("Unable to locate flink-dist jar.")); + } - if (!flinkUberjar.exists()) { - Assert.fail("Unable to locate yarn-uberjar.jar"); + // copy flink-shaded-hadoop2 into dist, since it is not included by default + // the hadoop jar was copied into the dependencies directory during the build using the maven-dependency-plugin + final Path relHadoopPath; + try (Stream dependencyJars = Files.list(Paths.get("target/dependencies"))) { + relHadoopPath = dependencyJars.filter(jar -> jar.getFileName().toString().startsWith("flink-shaded-hadoop2")) + .findAny().orElseThrow(() -> new AssertionError("Unable to locate flink-shaded-hadoop2 jar.")); } + Files.copy(relHadoopPath, flinkLibFolder.toPath().resolve("flink-shaded-hadoop2.jar")); Review comment: Do we strictly need to copy everything together? Wouldn't it also work to refactor the `YarnTestBase` a bit so that we have a method `getShipFiles` which returns all files in `/lib` plus the `flink-shaded-hadoop2.jar`? That way we would save a lot of copy operations. `flink-dist` is currently 347 MB large. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7451: [FLINK-11270][build] Do not include hadoop in flink-dist by default
tillrohrmann commented on a change in pull request #7451: [FLINK-11270][build] Do not include hadoop in flink-dist by default URL: https://github.com/apache/flink/pull/7451#discussion_r247993825 ## File path: tools/releasing/create_binary_release.sh ## @@ -96,14 +96,14 @@ make_binary_release() { } if [ "$SCALA_VERSION" == "none" ] && [ "$HADOOP_VERSION" == "none" ]; then - make_binary_release "" "-DwithoutHadoop" "2.12" - make_binary_release "" "-DwithoutHadoop" "2.11" + make_binary_release "" "" "2.12" + make_binary_release "" "" "2.11" elif [ "$SCALA_VERSION" == none ] && [ "$HADOOP_VERSION" != "none" ] then - make_binary_release "hadoop2" "-Dhadoop.version=$HADOOP_VERSION" "2.11" + make_binary_release "hadoop2" "-Pinclude-hadoop -Dhadoop.version=$HADOOP_VERSION" "2.11" elif [ "$SCALA_VERSION" != none ] && [ "$HADOOP_VERSION" == "none" ] then - make_binary_release "" "-DwithoutHadoop" "$SCALA_VERSION" + make_binary_release "" "" "$SCALA_VERSION" else - make_binary_release "hadoop2x" "-Dhadoop.version=$HADOOP_VERSION" "$SCALA_VERSION" + make_binary_release "hadoop2x" "-Pinclude-hadoop -Dhadoop.version=$HADOOP_VERSION" "$SCALA_VERSION" Review comment: Does it make a difference whether it is `-Dinclude-hadoop` or `-Pinclude-hadoop`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer URL: https://github.com/apache/flink/pull/7320#discussion_r247968937 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ## @@ -3902,6 +3906,84 @@ public void testParallelAsyncSnapshots() throws Exception { } } + @Test + public void testNonConcurrentSnapshotTransformerAccess() throws Exception { + Random rnd = new Random(); + BlockerCheckpointStreamFactory streamFactory = new BlockerCheckpointStreamFactory(1024 * 1024); + AbstractKeyedStateBackend backend = null; + try { + backend = createKeyedBackend(IntSerializer.INSTANCE); + InternalValueState valueState = backend.createInternalState( + VoidNamespaceSerializer.INSTANCE, + new ValueStateDescriptor<>("test", StringSerializer.INSTANCE), + createSingleThreadAccessCheckingStateSnapshotTransformFactory()); + + valueState.setCurrentNamespace(VoidNamespace.INSTANCE); + + for (int i = 0; i < 100; i++) { + backend.setCurrentKey(i); + valueState.update(StringUtils.getRandomString(rnd,5, 10)); + } + + CheckpointOptions checkpointOptions = CheckpointOptions.forCheckpointWithDefaultLocation(); + + RunnableFuture> snapshot1 = + backend.snapshot(1L, 0L, streamFactory, checkpointOptions); + + RunnableFuture> snapshot2 = + backend.snapshot(2L, 0L, streamFactory, checkpointOptions); + + Thread runner1 = new Thread(snapshot1, "snapshot1"); + runner1.start(); + Thread runner2 = new Thread(snapshot2, "snapshot2"); + runner2.start(); + + runner1.join(); + runner2.join(); + + snapshot1.get(); + snapshot2.get(); + } finally { + if (backend != null) { + IOUtils.closeQuietly(backend); + backend.dispose(); + } + } + } + + private static StateSnapshotTransformFactory + createSingleThreadAccessCheckingStateSnapshotTransformFactory() { + return new StateSnapshotTransformFactory() { + @Override + public Optional> createForDeserializedState() { + return createStateSnapshotTransformer(); + } + + @Override + public Optional> createForSerializedState() { + return createStateSnapshotTransformer(); + } + + private Optional> createStateSnapshotTransformer() { + return Optional.of(new StateSnapshotTransformer() { + private Thread currentThread = null; + + @Nullable + @Override + public T1 filterOrTransform(@Nullable T1 value) { + if (currentThread == null) { + currentThread = Thread.currentThread(); + } else { + assertEquals("Concurrent access from another thread", + currentThread, Thread.currentThread()); + } Review comment: The current implementation assumes that there is only a single element to transform, right? Otherwise it should fail with the second element. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer URL: https://github.com/apache/flink/pull/7320#discussion_r247980006 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ## @@ -3902,6 +3906,84 @@ public void testParallelAsyncSnapshots() throws Exception { } } + @Test + public void testNonConcurrentSnapshotTransformerAccess() throws Exception { + Random rnd = new Random(); + BlockerCheckpointStreamFactory streamFactory = new BlockerCheckpointStreamFactory(1024 * 1024); + AbstractKeyedStateBackend backend = null; + try { + backend = createKeyedBackend(IntSerializer.INSTANCE); + InternalValueState valueState = backend.createInternalState( + VoidNamespaceSerializer.INSTANCE, + new ValueStateDescriptor<>("test", StringSerializer.INSTANCE), Review comment: Would be good to not only test with the `ValueStateDescriptor`. I think there might be a bug with the RocksDB list and map transformer. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer URL: https://github.com/apache/flink/pull/7320#discussion_r247967726 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotTransformers.java ## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory; + +import javax.annotation.Nullable; Review comment: this import should be separated according to Flink's import guidelines: https://flink.apache.org/contribute-code.html#code-style This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer URL: https://github.com/apache/flink/pull/7320#discussion_r247977733 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSnapshotTransformFactoryAdaptor.java ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.runtime.state.StateSnapshotTransformer; +import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory; + +import java.util.Optional; + +abstract class RocksDBSnapshotTransformFactoryAdaptor implements StateSnapshotTransformFactory { + @Override + public Optional> createForDeserializedState() { + throw new UnsupportedOperationException("Only serialized state filtering is supported in RocksDB backend"); + } + + @SuppressWarnings("unchecked") + static StateSnapshotTransformFactory wrapStateSnapshotTransformerFactory( + StateDescriptor stateDesc, + StateSnapshotTransformFactory snapshotTransformFactory) { + if (stateDesc instanceof ListStateDescriptor) { + Optional> original = snapshotTransformFactory.createForDeserializedState(); + return new RocksDBSnapshotTransformFactoryAdaptor() { + @Override + public Optional> createForSerializedState() { + return original.map(est -> (StateSnapshotTransformer) createRocksDBListStateTransformer(stateDesc, est)); + } + }; + } else if (stateDesc instanceof MapStateDescriptor) { + Optional> original = snapshotTransformFactory.createForSerializedState(); + return new RocksDBSnapshotTransformFactoryAdaptor() { + @Override + public Optional> createForSerializedState() { + return original.map(RocksDBMapState.StateSnapshotTransformerWrapper::new); + } + }; + } else { + return new RocksDBSnapshotTransformFactoryAdaptor() { + @Override + public Optional> createForSerializedState() { + return snapshotTransformFactory.createForSerializedState(); + } + }; + } + } + + @SuppressWarnings("unchecked") + private static StateSnapshotTransformer createRocksDBListStateTransformer( + StateDescriptor stateDesc, + StateSnapshotTransformer elementTransformer) { + return (StateSnapshotTransformer) new RocksDBListState.StateSnapshotTransformerWrapper<>( + elementTransformer, ((ListStateDescriptor) stateDesc).getElementSerializer()); + } +} Review comment: I think there is no need for all this type casting: ``` return new RocksDBSnapshotTransformFactoryAdaptor() { @Override public Optional> createForSerializedState() { return original.map(est -> createRocksDBListStateTransformer(((ListStateDescriptor) stateDesc), est)); } private StateSnapshotTransformer createRocksDBListStateTransformer( ListStateDescriptor stateDesc, StateSnapshotTransformer elementTransformer) { return new RocksDBListState.StateSnapshotTransformerWrapper<>( elementTransformer, stateDesc.getElementSerializer()); } }; ``` This also includes changes to the the anonymous class.
[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer URL: https://github.com/apache/flink/pull/7320#discussion_r247979598 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSnapshotTransformFactoryAdaptor.java ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.runtime.state.StateSnapshotTransformer; +import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory; + +import java.util.Optional; + +abstract class RocksDBSnapshotTransformFactoryAdaptor implements StateSnapshotTransformFactory { + @Override + public Optional> createForDeserializedState() { + throw new UnsupportedOperationException("Only serialized state filtering is supported in RocksDB backend"); + } + + @SuppressWarnings("unchecked") + static StateSnapshotTransformFactory wrapStateSnapshotTransformerFactory( + StateDescriptor stateDesc, + StateSnapshotTransformFactory snapshotTransformFactory) { + if (stateDesc instanceof ListStateDescriptor) { + Optional> original = snapshotTransformFactory.createForDeserializedState(); Review comment: Is it safe to share this instance of the `StateSnapshotTransformer` in the `RocksDBSnapshotTransformFactoryAdaptor`? I think this can lead to the same problem we are trying to fix here. Maybe adding a test for this would be good if I'm not wrong here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer URL: https://github.com/apache/flink/pull/7320#discussion_r247968390 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotTransformers.java ## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +import static org.apache.flink.runtime.state.StateSnapshotTransformer.CollectionStateSnapshotTransformer.TransformStrategy.STOP_ON_FIRST_INCLUDED; + +/** Collection of common state snapshot transformers and their factories. */ +public class StateSnapshotTransformers { Review comment: Why did you group the implementations under `StateSnapshotTransformers`? They could also simply live in their own package, right? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer URL: https://github.com/apache/flink/pull/7320#discussion_r247969937 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ## @@ -3902,6 +3906,84 @@ public void testParallelAsyncSnapshots() throws Exception { } } + @Test + public void testNonConcurrentSnapshotTransformerAccess() throws Exception { + Random rnd = new Random(); + BlockerCheckpointStreamFactory streamFactory = new BlockerCheckpointStreamFactory(1024 * 1024); + AbstractKeyedStateBackend backend = null; + try { + backend = createKeyedBackend(IntSerializer.INSTANCE); + InternalValueState valueState = backend.createInternalState( + VoidNamespaceSerializer.INSTANCE, + new ValueStateDescriptor<>("test", StringSerializer.INSTANCE), + createSingleThreadAccessCheckingStateSnapshotTransformFactory()); + + valueState.setCurrentNamespace(VoidNamespace.INSTANCE); + + for (int i = 0; i < 100; i++) { + backend.setCurrentKey(i); + valueState.update(StringUtils.getRandomString(rnd,5, 10)); Review comment: whitespace missing This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer URL: https://github.com/apache/flink/pull/7320#discussion_r247966443 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ## @@ -3902,6 +3906,84 @@ public void testParallelAsyncSnapshots() throws Exception { } } + @Test + public void testNonConcurrentSnapshotTransformerAccess() throws Exception { + Random rnd = new Random(); + BlockerCheckpointStreamFactory streamFactory = new BlockerCheckpointStreamFactory(1024 * 1024); + AbstractKeyedStateBackend backend = null; + try { + backend = createKeyedBackend(IntSerializer.INSTANCE); + InternalValueState valueState = backend.createInternalState( + VoidNamespaceSerializer.INSTANCE, + new ValueStateDescriptor<>("test", StringSerializer.INSTANCE), + createSingleThreadAccessCheckingStateSnapshotTransformFactory()); + + valueState.setCurrentNamespace(VoidNamespace.INSTANCE); + + for (int i = 0; i < 100; i++) { + backend.setCurrentKey(i); + valueState.update(StringUtils.getRandomString(rnd,5, 10)); + } + + CheckpointOptions checkpointOptions = CheckpointOptions.forCheckpointWithDefaultLocation(); + + RunnableFuture> snapshot1 = + backend.snapshot(1L, 0L, streamFactory, checkpointOptions); + + RunnableFuture> snapshot2 = + backend.snapshot(2L, 0L, streamFactory, checkpointOptions); + + Thread runner1 = new Thread(snapshot1, "snapshot1"); + runner1.start(); + Thread runner2 = new Thread(snapshot2, "snapshot2"); + runner2.start(); + + runner1.join(); + runner2.join(); + + snapshot1.get(); + snapshot2.get(); + } finally { + if (backend != null) { + IOUtils.closeQuietly(backend); + backend.dispose(); + } + } + } + + private static StateSnapshotTransformFactory + createSingleThreadAccessCheckingStateSnapshotTransformFactory() { Review comment: This is only my personal taste, but I like it more if the return type and the function name is on the same line. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer URL: https://github.com/apache/flink/pull/7320#discussion_r247960807 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java ## @@ -145,13 +146,13 @@ private RegisteredKeyValueStateBackendMetaInfo( return stateSerializerProvider.previousSchemaSerializer(); } - @Nullable - public StateSnapshotTransformer getSnapshotTransformer() { - return snapshotTransformer; + @Nonnull + public StateSnapshotTransformFactory getStateSnapshotTransformFactory() { + return stateSnapshotTransformFactory; } - public void updateSnapshotTransformer(StateSnapshotTransformer snapshotTransformer) { - this.snapshotTransformer = snapshotTransformer; + public void updateSnapshotTransformerFactory(StateSnapshotTransformFactory stateSnapshotTransformFactory) { Review comment: `er` not needed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer URL: https://github.com/apache/flink/pull/7320#discussion_r247968937 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ## @@ -3902,6 +3906,84 @@ public void testParallelAsyncSnapshots() throws Exception { } } + @Test + public void testNonConcurrentSnapshotTransformerAccess() throws Exception { + Random rnd = new Random(); + BlockerCheckpointStreamFactory streamFactory = new BlockerCheckpointStreamFactory(1024 * 1024); + AbstractKeyedStateBackend backend = null; + try { + backend = createKeyedBackend(IntSerializer.INSTANCE); + InternalValueState valueState = backend.createInternalState( + VoidNamespaceSerializer.INSTANCE, + new ValueStateDescriptor<>("test", StringSerializer.INSTANCE), + createSingleThreadAccessCheckingStateSnapshotTransformFactory()); + + valueState.setCurrentNamespace(VoidNamespace.INSTANCE); + + for (int i = 0; i < 100; i++) { + backend.setCurrentKey(i); + valueState.update(StringUtils.getRandomString(rnd,5, 10)); + } + + CheckpointOptions checkpointOptions = CheckpointOptions.forCheckpointWithDefaultLocation(); + + RunnableFuture> snapshot1 = + backend.snapshot(1L, 0L, streamFactory, checkpointOptions); + + RunnableFuture> snapshot2 = + backend.snapshot(2L, 0L, streamFactory, checkpointOptions); + + Thread runner1 = new Thread(snapshot1, "snapshot1"); + runner1.start(); + Thread runner2 = new Thread(snapshot2, "snapshot2"); + runner2.start(); + + runner1.join(); + runner2.join(); + + snapshot1.get(); + snapshot2.get(); + } finally { + if (backend != null) { + IOUtils.closeQuietly(backend); + backend.dispose(); + } + } + } + + private static StateSnapshotTransformFactory + createSingleThreadAccessCheckingStateSnapshotTransformFactory() { + return new StateSnapshotTransformFactory() { + @Override + public Optional> createForDeserializedState() { + return createStateSnapshotTransformer(); + } + + @Override + public Optional> createForSerializedState() { + return createStateSnapshotTransformer(); + } + + private Optional> createStateSnapshotTransformer() { + return Optional.of(new StateSnapshotTransformer() { + private Thread currentThread = null; + + @Nullable + @Override + public T1 filterOrTransform(@Nullable T1 value) { + if (currentThread == null) { + currentThread = Thread.currentThread(); + } else { + assertEquals("Concurrent access from another thread", + currentThread, Thread.currentThread()); + } Review comment: The current implementation assumes that there is only a single element to transform, right? Otherwise it should fail with the second element. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer URL: https://github.com/apache/flink/pull/7320#discussion_r247973376 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSnapshotTransformFactoryAdaptor.java ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.runtime.state.StateSnapshotTransformer; +import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory; + +import java.util.Optional; + +abstract class RocksDBSnapshotTransformFactoryAdaptor implements StateSnapshotTransformFactory { + @Override + public Optional> createForDeserializedState() { + throw new UnsupportedOperationException("Only serialized state filtering is supported in RocksDB backend"); + } + + @SuppressWarnings("unchecked") + static StateSnapshotTransformFactory wrapStateSnapshotTransformerFactory( + StateDescriptor stateDesc, + StateSnapshotTransformFactory snapshotTransformFactory) { + if (stateDesc instanceof ListStateDescriptor) { + Optional> original = snapshotTransformFactory.createForDeserializedState(); + return new RocksDBSnapshotTransformFactoryAdaptor() { + @Override + public Optional> createForSerializedState() { + return original.map(est -> (StateSnapshotTransformer) createRocksDBListStateTransformer(stateDesc, est)); + } + }; + } else if (stateDesc instanceof MapStateDescriptor) { + Optional> original = snapshotTransformFactory.createForSerializedState(); + return new RocksDBSnapshotTransformFactoryAdaptor() { + @Override + public Optional> createForSerializedState() { + return original.map(RocksDBMapState.StateSnapshotTransformerWrapper::new); + } + }; + } else { + return new RocksDBSnapshotTransformFactoryAdaptor() { + @Override + public Optional> createForSerializedState() { + return snapshotTransformFactory.createForSerializedState(); + } + }; + } + } + + @SuppressWarnings("unchecked") + private static StateSnapshotTransformer createRocksDBListStateTransformer( Review comment: This method could be moved into the inner class which actually uses it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer
tillrohrmann commented on a change in pull request #7320: [FLINK-11171] Avoid concurrent usage of StateSnapshotTransformer URL: https://github.com/apache/flink/pull/7320#discussion_r247973273 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSnapshotTransformFactoryAdaptor.java ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.runtime.state.StateSnapshotTransformer; +import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory; + +import java.util.Optional; + +abstract class RocksDBSnapshotTransformFactoryAdaptor implements StateSnapshotTransformFactory { + @Override + public Optional> createForDeserializedState() { + throw new UnsupportedOperationException("Only serialized state filtering is supported in RocksDB backend"); + } + + @SuppressWarnings("unchecked") + static StateSnapshotTransformFactory wrapStateSnapshotTransformerFactory( + StateDescriptor stateDesc, + StateSnapshotTransformFactory snapshotTransformFactory) { + if (stateDesc instanceof ListStateDescriptor) { + Optional> original = snapshotTransformFactory.createForDeserializedState(); + return new RocksDBSnapshotTransformFactoryAdaptor() { Review comment: Why using anonymous classes and not giving the class a proper name? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Fokko commented on issue #7488: [FLINK-11322] Use try-with-resource on FlinkKafkaConsumer010
Fokko commented on issue #7488: [FLINK-11322] Use try-with-resource on FlinkKafkaConsumer010 URL: https://github.com/apache/flink/pull/7488#issuecomment-454461813 @tzulitai Fixed some more. Let me know what you think. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-10724) Refactor failure handling in check point coordinator
[ https://issues.apache.org/jira/browse/FLINK-10724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16743199#comment-16743199 ] Andrey Zagrebin edited comment on FLINK-10724 at 1/15/19 4:31 PM: -- Hi [~yanghua], I think it is better to find a committer to shepherd this effort. I can help with reviewing the implementation but I cannot merge it because I am not a committer. Maybe, [~thw] could help with it, he created the duplicated issue. was (Author: azagrebin): Hi [~yanghua], I think it is better to find a committer to shepherd this effort. I can help with reviewing the implementation but I cannot merge it because I am not a committer. > Refactor failure handling in check point coordinator > > > Key: FLINK-10724 > URL: https://issues.apache.org/jira/browse/FLINK-10724 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Andrey Zagrebin >Assignee: vinoyang >Priority: Major > > At the moment failure handling of asynchronously triggered checkpoint in > check point coordinator happens in different places. We could organise it > similar way as failure handling of synchronous triggering of checkpoint in > *CheckpointTriggerResult* where we classify error cases. This will simplify > e.g. integration of error counter for FLINK-4810. > See also discussion here: [https://github.com/apache/flink/pull/6567] > The specific design document : > https://docs.google.com/document/d/1ce7RtecuTxcVUJlnU44hzcO2Dwq9g4Oyd8_biy94hJc/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10724) Refactor failure handling in check point coordinator
[ https://issues.apache.org/jira/browse/FLINK-10724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16743199#comment-16743199 ] Andrey Zagrebin commented on FLINK-10724: - Hi [~yanghua], I think it is better to find a committer to shepherd this effort. I can help with reviewing the implementation but I cannot merge it because I am not a committer. > Refactor failure handling in check point coordinator > > > Key: FLINK-10724 > URL: https://issues.apache.org/jira/browse/FLINK-10724 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Andrey Zagrebin >Assignee: vinoyang >Priority: Major > > At the moment failure handling of asynchronously triggered checkpoint in > check point coordinator happens in different places. We could organise it > similar way as failure handling of synchronous triggering of checkpoint in > *CheckpointTriggerResult* where we classify error cases. This will simplify > e.g. integration of error counter for FLINK-4810. > See also discussion here: [https://github.com/apache/flink/pull/6567] > The specific design document : > https://docs.google.com/document/d/1ce7RtecuTxcVUJlnU44hzcO2Dwq9g4Oyd8_biy94hJc/edit?usp=sharing -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r247962668 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestData.scala ## @@ -95,4 +95,41 @@ object StreamTestData { data.+=(((3, 3), "three")) env.fromCollection(data) } + + def getSmall3TupleUpsertStream(env: StreamExecutionEnvironment): + DataStream[(Boolean, (Int, Long, String))] = { +val data = new mutable.MutableList[(Boolean, (Int, Long, String))] +data.+=((true, (1, 1L, "Hi"))) +data.+=((true, (2, 2L, "Hello"))) +data.+=((true, (3, 2L, "Hello world"))) +env.fromCollection(data) + } + + def get3TupleUpsertStream(env: StreamExecutionEnvironment): Review comment: Is this method used somewhere? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r247961118 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/FromUpsertStreamTest.scala ## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.stream.sql + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.api.java.typeutils.{RowTypeInfo, TupleTypeInfo} +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.Types +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.utils.TableTestUtil.{UpsertTableNode, term, unaryNode} +import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase} +import org.apache.flink.types.Row +import org.junit.Test +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import java.lang.{Boolean => JBool} + +class FromUpsertStreamTest extends TableTestBase { + + private val streamUtil: StreamTableTestUtil = streamTestUtil() + + @Test + def testRemoveUpsertToRetraction() = { +streamUtil.addTableFromUpsert[(Boolean, (Int, String, Long))]( + "MyTable", 'a, 'b.key, 'c, 'proctime.proctime, 'rowtime.rowtime) + +val sql = "SELECT a, b, c, proctime, rowtime FROM MyTable" + +val expected = + unaryNode( +"DataStreamCalc", +UpsertTableNode(0), +term("select", "a", "b", "c", "PROCTIME(proctime) AS proctime", + "CAST(rowtime) AS rowtime") + ) +streamUtil.verifySql(sql, expected) + } + + @Test + def testMaterializeTimeIndicatorAndCalcUpsertToRetractionTranspose() = { +streamUtil.addTableFromUpsert[(Boolean, (Int, String, Long))]( + "MyTable", 'a, 'b.key, 'c, 'proctime.proctime, 'rowtime.rowtime) + +val sql = "SELECT b as b1, c, proctime as proctime1, rowtime as rowtime1 FROM MyTable" + +val expected = + unaryNode( +"DataStreamCalc", +unaryNode( + "DataStreamUpsertToRetraction", + unaryNode( +"DataStreamCalc", +UpsertTableNode(0), +term("select", "a", "b", "c", "PROCTIME(proctime) AS proctime", + "CAST(rowtime) AS rowtime") + ), + term("keys", "b"), + term("select", "a", "b", "c", "proctime", "rowtime") +), +term("select", "b AS b1", "c", "proctime AS proctime1", "rowtime AS rowtime1")) +streamUtil.verifySql(sql, expected, true) + } + + @Test + def testCalcCannotTransposeUpsertToRetraction() = { Review comment: What does this test case test? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services